Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/replication/slot.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * slot.c
4
 *     Replication slot management.
5
 *
6
 *
7
 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/replication/slot.c
12
 *
13
 * NOTES
14
 *
15
 * Replication slots are used to keep state about replication streams
16
 * originating from this cluster.  Their primary purpose is to prevent the
17
 * premature removal of WAL or of old tuple versions in a manner that would
18
 * interfere with replication; they are also useful for monitoring purposes.
19
 * Slots need to be permanent (to allow restarts), crash-safe, and allocatable
20
 * on standbys (to support cascading setups).  The requirement that slots be
21
 * usable on standbys precludes storing them in the system catalogs.
22
 *
23
 * Each replication slot gets its own directory inside the directory
24
 * $PGDATA / PG_REPLSLOT_DIR.  Inside that directory the state file will
25
 * contain the slot's own data.  Additional data can be stored alongside that
26
 * file if required.  While the server is running, the state data is also
27
 * cached in memory for efficiency.
28
 *
29
 * ReplicationSlotAllocationLock must be taken in exclusive mode to allocate
30
 * or free a slot. ReplicationSlotControlLock must be taken in shared mode
31
 * to iterate over the slots, and in exclusive mode to change the in_use flag
32
 * of a slot.  The remaining data in each slot is protected by its mutex.
33
 *
34
 *-------------------------------------------------------------------------
35
 */
36
37
#include "postgres.h"
38
39
#include <unistd.h>
40
#include <sys/stat.h>
41
42
#include "access/transam.h"
43
#include "access/xlog_internal.h"
44
#include "access/xlogrecovery.h"
45
#include "common/file_utils.h"
46
#include "common/string.h"
47
#include "miscadmin.h"
48
#include "pgstat.h"
49
#include "postmaster/interrupt.h"
50
#include "replication/slotsync.h"
51
#include "replication/slot.h"
52
#include "replication/walsender_private.h"
53
#include "storage/fd.h"
54
#include "storage/ipc.h"
55
#include "storage/proc.h"
56
#include "storage/procarray.h"
57
#include "utils/builtins.h"
58
#include "utils/guc_hooks.h"
59
#include "utils/injection_point.h"
60
#include "utils/varlena.h"
61
62
/*
63
 * Replication slot on-disk data structure.
64
 */
65
typedef struct ReplicationSlotOnDisk
66
{
67
  /* first part of this struct needs to be version independent */
68
69
  /* data not covered by checksum */
70
  uint32    magic;
71
  pg_crc32c checksum;
72
73
  /* data covered by checksum */
74
  uint32    version;
75
  uint32    length;
76
77
  /*
78
   * The actual data in the slot that follows can differ based on the above
79
   * 'version'.
80
   */
81
82
  ReplicationSlotPersistentData slotdata;
83
} ReplicationSlotOnDisk;
84
85
/*
86
 * Struct for the configuration of synchronized_standby_slots.
87
 *
88
 * Note: this must be a flat representation that can be held in a single chunk
89
 * of guc_malloc'd memory, so that it can be stored as the "extra" data for the
90
 * synchronized_standby_slots GUC.
91
 */
92
typedef struct
93
{
94
  /* Number of slot names in the slot_names[] */
95
  int     nslotnames;
96
97
  /*
98
   * slot_names contains 'nslotnames' consecutive null-terminated C strings.
99
   */
100
  char    slot_names[FLEXIBLE_ARRAY_MEMBER];
101
} SyncStandbySlotsConfigData;
102
103
/*
104
 * Lookup table for slot invalidation causes.
105
 */
106
typedef struct SlotInvalidationCauseMap
107
{
108
  ReplicationSlotInvalidationCause cause;
109
  const char *cause_name;
110
} SlotInvalidationCauseMap;
111
112
static const SlotInvalidationCauseMap SlotInvalidationCauses[] = {
113
  {RS_INVAL_NONE, "none"},
114
  {RS_INVAL_WAL_REMOVED, "wal_removed"},
115
  {RS_INVAL_HORIZON, "rows_removed"},
116
  {RS_INVAL_WAL_LEVEL, "wal_level_insufficient"},
117
  {RS_INVAL_IDLE_TIMEOUT, "idle_timeout"},
118
};
119
120
/*
121
 * Ensure that the lookup table is up-to-date with the enums defined in
122
 * ReplicationSlotInvalidationCause.
123
 */
124
StaticAssertDecl(lengthof(SlotInvalidationCauses) == (RS_INVAL_MAX_CAUSES + 1),
125
         "array length mismatch");
126
127
/* size of version independent data */
128
#define ReplicationSlotOnDiskConstantSize \
129
0
  offsetof(ReplicationSlotOnDisk, slotdata)
130
/* size of the part of the slot not covered by the checksum */
131
#define ReplicationSlotOnDiskNotChecksummedSize  \
132
  offsetof(ReplicationSlotOnDisk, version)
133
/* size of the part covered by the checksum */
134
#define ReplicationSlotOnDiskChecksummedSize \
135
  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskNotChecksummedSize
136
/* size of the slot data that is version dependent */
137
#define ReplicationSlotOnDiskV2Size \
138
0
  sizeof(ReplicationSlotOnDisk) - ReplicationSlotOnDiskConstantSize
139
140
0
#define SLOT_MAGIC    0x1051CA1  /* format identifier */
141
0
#define SLOT_VERSION  5    /* version for new files */
142
143
/* Control array for replication slot management */
144
ReplicationSlotCtlData *ReplicationSlotCtl = NULL;
145
146
/* My backend's replication slot in the shared memory array */
147
ReplicationSlot *MyReplicationSlot = NULL;
148
149
/* GUC variables */
150
int     max_replication_slots = 10; /* the maximum number of replication
151
                     * slots */
152
153
/*
154
 * Invalidate replication slots that have remained idle longer than this
155
 * duration; '0' disables it.
156
 */
157
int     idle_replication_slot_timeout_mins = 0;
158
159
/*
160
 * This GUC lists streaming replication standby server slot names that
161
 * logical WAL sender processes will wait for.
162
 */
163
char     *synchronized_standby_slots;
164
165
/* This is the parsed and cached configuration for synchronized_standby_slots */
166
static SyncStandbySlotsConfigData *synchronized_standby_slots_config;
167
168
/*
169
 * Oldest LSN that has been confirmed to be flushed to the standbys
170
 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
171
 */
172
static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
173
174
static void ReplicationSlotShmemExit(int code, Datum arg);
175
static void ReplicationSlotDropPtr(ReplicationSlot *slot);
176
177
/* internal persistency functions */
178
static void RestoreSlotFromDisk(const char *name);
179
static void CreateSlotOnDisk(ReplicationSlot *slot);
180
static void SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel);
181
182
/*
183
 * Report shared-memory space needed by ReplicationSlotsShmemInit.
184
 */
185
Size
186
ReplicationSlotsShmemSize(void)
187
0
{
188
0
  Size    size = 0;
189
190
0
  if (max_replication_slots == 0)
191
0
    return size;
192
193
0
  size = offsetof(ReplicationSlotCtlData, replication_slots);
194
0
  size = add_size(size,
195
0
          mul_size(max_replication_slots, sizeof(ReplicationSlot)));
196
197
0
  return size;
198
0
}
199
200
/*
201
 * Allocate and initialize shared memory for replication slots.
202
 */
203
void
204
ReplicationSlotsShmemInit(void)
205
0
{
206
0
  bool    found;
207
208
0
  if (max_replication_slots == 0)
209
0
    return;
210
211
0
  ReplicationSlotCtl = (ReplicationSlotCtlData *)
212
0
    ShmemInitStruct("ReplicationSlot Ctl", ReplicationSlotsShmemSize(),
213
0
            &found);
214
215
0
  if (!found)
216
0
  {
217
0
    int     i;
218
219
    /* First time through, so initialize */
220
0
    MemSet(ReplicationSlotCtl, 0, ReplicationSlotsShmemSize());
221
222
0
    for (i = 0; i < max_replication_slots; i++)
223
0
    {
224
0
      ReplicationSlot *slot = &ReplicationSlotCtl->replication_slots[i];
225
226
      /* everything else is zeroed by the memset above */
227
0
      SpinLockInit(&slot->mutex);
228
0
      LWLockInitialize(&slot->io_in_progress_lock,
229
0
               LWTRANCHE_REPLICATION_SLOT_IO);
230
0
      ConditionVariableInit(&slot->active_cv);
231
0
    }
232
0
  }
233
0
}
234
235
/*
236
 * Register the callback for replication slot cleanup and releasing.
237
 */
238
void
239
ReplicationSlotInitialize(void)
240
0
{
241
0
  before_shmem_exit(ReplicationSlotShmemExit, 0);
242
0
}
243
244
/*
245
 * Release and cleanup replication slots.
246
 */
247
static void
248
ReplicationSlotShmemExit(int code, Datum arg)
249
0
{
250
  /* Make sure active replication slots are released */
251
0
  if (MyReplicationSlot != NULL)
252
0
    ReplicationSlotRelease();
253
254
  /* Also cleanup all the temporary slots. */
255
0
  ReplicationSlotCleanup(false);
256
0
}
257
258
/*
259
 * Check whether the passed slot name is valid and report errors at elevel.
260
 *
261
 * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
262
 * the name to be used as a directory name on every supported OS.
263
 *
264
 * Returns whether the directory name is valid or not if elevel < ERROR.
265
 */
266
bool
267
ReplicationSlotValidateName(const char *name, int elevel)
268
0
{
269
0
  const char *cp;
270
271
0
  if (strlen(name) == 0)
272
0
  {
273
0
    ereport(elevel,
274
0
        (errcode(ERRCODE_INVALID_NAME),
275
0
         errmsg("replication slot name \"%s\" is too short",
276
0
            name)));
277
0
    return false;
278
0
  }
279
280
0
  if (strlen(name) >= NAMEDATALEN)
281
0
  {
282
0
    ereport(elevel,
283
0
        (errcode(ERRCODE_NAME_TOO_LONG),
284
0
         errmsg("replication slot name \"%s\" is too long",
285
0
            name)));
286
0
    return false;
287
0
  }
288
289
0
  for (cp = name; *cp; cp++)
290
0
  {
291
0
    if (!((*cp >= 'a' && *cp <= 'z')
292
0
        || (*cp >= '0' && *cp <= '9')
293
0
        || (*cp == '_')))
294
0
    {
295
0
      ereport(elevel,
296
0
          (errcode(ERRCODE_INVALID_NAME),
297
0
           errmsg("replication slot name \"%s\" contains invalid character",
298
0
              name),
299
0
           errhint("Replication slot names may only contain lower case letters, numbers, and the underscore character.")));
300
0
      return false;
301
0
    }
302
0
  }
303
0
  return true;
304
0
}
305
306
/*
307
 * Create a new replication slot and mark it as used by this backend.
308
 *
309
 * name: Name of the slot
310
 * db_specific: logical decoding is db specific; if the slot is going to
311
 *     be used for that pass true, otherwise false.
312
 * two_phase: Allows decoding of prepared transactions. We allow this option
313
 *     to be enabled only at the slot creation time. If we allow this option
314
 *     to be changed during decoding then it is quite possible that we skip
315
 *     prepare first time because this option was not enabled. Now next time
316
 *     during getting changes, if the two_phase option is enabled it can skip
317
 *     prepare because by that time start decoding point has been moved. So the
318
 *     user will only get commit prepared.
319
 * failover: If enabled, allows the slot to be synced to standbys so
320
 *     that logical replication can be resumed after failover.
321
 * synced: True if the slot is synchronized from the primary server.
322
 */
323
void
324
ReplicationSlotCreate(const char *name, bool db_specific,
325
            ReplicationSlotPersistency persistency,
326
            bool two_phase, bool failover, bool synced)
327
0
{
328
0
  ReplicationSlot *slot = NULL;
329
0
  int     i;
330
331
0
  Assert(MyReplicationSlot == NULL);
332
333
0
  ReplicationSlotValidateName(name, ERROR);
334
335
0
  if (failover)
336
0
  {
337
    /*
338
     * Do not allow users to create the failover enabled slots on the
339
     * standby as we do not support sync to the cascading standby.
340
     *
341
     * However, failover enabled slots can be created during slot
342
     * synchronization because we need to retain the same values as the
343
     * remote slot.
344
     */
345
0
    if (RecoveryInProgress() && !IsSyncingReplicationSlots())
346
0
      ereport(ERROR,
347
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
348
0
          errmsg("cannot enable failover for a replication slot created on the standby"));
349
350
    /*
351
     * Do not allow users to create failover enabled temporary slots,
352
     * because temporary slots will not be synced to the standby.
353
     *
354
     * However, failover enabled temporary slots can be created during
355
     * slot synchronization. See the comments atop slotsync.c for details.
356
     */
357
0
    if (persistency == RS_TEMPORARY && !IsSyncingReplicationSlots())
358
0
      ereport(ERROR,
359
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
360
0
          errmsg("cannot enable failover for a temporary replication slot"));
361
0
  }
362
363
  /*
364
   * If some other backend ran this code concurrently with us, we'd likely
365
   * both allocate the same slot, and that would be bad.  We'd also be at
366
   * risk of missing a name collision.  Also, we don't want to try to create
367
   * a new slot while somebody's busy cleaning up an old one, because we
368
   * might both be monkeying with the same directory.
369
   */
370
0
  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
371
372
  /*
373
   * Check for name collision, and identify an allocatable slot.  We need to
374
   * hold ReplicationSlotControlLock in shared mode for this, so that nobody
375
   * else can change the in_use flags while we're looking at them.
376
   */
377
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
378
0
  for (i = 0; i < max_replication_slots; i++)
379
0
  {
380
0
    ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
381
382
0
    if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
383
0
      ereport(ERROR,
384
0
          (errcode(ERRCODE_DUPLICATE_OBJECT),
385
0
           errmsg("replication slot \"%s\" already exists", name)));
386
0
    if (!s->in_use && slot == NULL)
387
0
      slot = s;
388
0
  }
389
0
  LWLockRelease(ReplicationSlotControlLock);
390
391
  /* If all slots are in use, we're out of luck. */
392
0
  if (slot == NULL)
393
0
    ereport(ERROR,
394
0
        (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
395
0
         errmsg("all replication slots are in use"),
396
0
         errhint("Free one or increase \"max_replication_slots\".")));
397
398
  /*
399
   * Since this slot is not in use, nobody should be looking at any part of
400
   * it other than the in_use field unless they're trying to allocate it.
401
   * And since we hold ReplicationSlotAllocationLock, nobody except us can
402
   * be doing that.  So it's safe to initialize the slot.
403
   */
404
0
  Assert(!slot->in_use);
405
0
  Assert(slot->active_pid == 0);
406
407
  /* first initialize persistent data */
408
0
  memset(&slot->data, 0, sizeof(ReplicationSlotPersistentData));
409
0
  namestrcpy(&slot->data.name, name);
410
0
  slot->data.database = db_specific ? MyDatabaseId : InvalidOid;
411
0
  slot->data.persistency = persistency;
412
0
  slot->data.two_phase = two_phase;
413
0
  slot->data.two_phase_at = InvalidXLogRecPtr;
414
0
  slot->data.failover = failover;
415
0
  slot->data.synced = synced;
416
417
  /* and then data only present in shared memory */
418
0
  slot->just_dirtied = false;
419
0
  slot->dirty = false;
420
0
  slot->effective_xmin = InvalidTransactionId;
421
0
  slot->effective_catalog_xmin = InvalidTransactionId;
422
0
  slot->candidate_catalog_xmin = InvalidTransactionId;
423
0
  slot->candidate_xmin_lsn = InvalidXLogRecPtr;
424
0
  slot->candidate_restart_valid = InvalidXLogRecPtr;
425
0
  slot->candidate_restart_lsn = InvalidXLogRecPtr;
426
0
  slot->last_saved_confirmed_flush = InvalidXLogRecPtr;
427
0
  slot->last_saved_restart_lsn = InvalidXLogRecPtr;
428
0
  slot->inactive_since = 0;
429
430
  /*
431
   * Create the slot on disk.  We haven't actually marked the slot allocated
432
   * yet, so no special cleanup is required if this errors out.
433
   */
434
0
  CreateSlotOnDisk(slot);
435
436
  /*
437
   * We need to briefly prevent any other backend from iterating over the
438
   * slots while we flip the in_use flag. We also need to set the active
439
   * flag while holding the ControlLock as otherwise a concurrent
440
   * ReplicationSlotAcquire() could acquire the slot as well.
441
   */
442
0
  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
443
444
0
  slot->in_use = true;
445
446
  /* We can now mark the slot active, and that makes it our slot. */
447
0
  SpinLockAcquire(&slot->mutex);
448
0
  Assert(slot->active_pid == 0);
449
0
  slot->active_pid = MyProcPid;
450
0
  SpinLockRelease(&slot->mutex);
451
0
  MyReplicationSlot = slot;
452
453
0
  LWLockRelease(ReplicationSlotControlLock);
454
455
  /*
456
   * Create statistics entry for the new logical slot. We don't collect any
457
   * stats for physical slots, so no need to create an entry for the same.
458
   * See ReplicationSlotDropPtr for why we need to do this before releasing
459
   * ReplicationSlotAllocationLock.
460
   */
461
0
  if (SlotIsLogical(slot))
462
0
    pgstat_create_replslot(slot);
463
464
  /*
465
   * Now that the slot has been marked as in_use and active, it's safe to
466
   * let somebody else try to allocate a slot.
467
   */
468
0
  LWLockRelease(ReplicationSlotAllocationLock);
469
470
  /* Let everybody know we've modified this slot */
471
0
  ConditionVariableBroadcast(&slot->active_cv);
472
0
}
473
474
/*
475
 * Search for the named replication slot.
476
 *
477
 * Return the replication slot if found, otherwise NULL.
478
 */
479
ReplicationSlot *
480
SearchNamedReplicationSlot(const char *name, bool need_lock)
481
0
{
482
0
  int     i;
483
0
  ReplicationSlot *slot = NULL;
484
485
0
  if (need_lock)
486
0
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
487
488
0
  for (i = 0; i < max_replication_slots; i++)
489
0
  {
490
0
    ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
491
492
0
    if (s->in_use && strcmp(name, NameStr(s->data.name)) == 0)
493
0
    {
494
0
      slot = s;
495
0
      break;
496
0
    }
497
0
  }
498
499
0
  if (need_lock)
500
0
    LWLockRelease(ReplicationSlotControlLock);
501
502
0
  return slot;
503
0
}
504
505
/*
506
 * Return the index of the replication slot in
507
 * ReplicationSlotCtl->replication_slots.
508
 *
509
 * This is mainly useful to have an efficient key for storing replication slot
510
 * stats.
511
 */
512
int
513
ReplicationSlotIndex(ReplicationSlot *slot)
514
0
{
515
0
  Assert(slot >= ReplicationSlotCtl->replication_slots &&
516
0
       slot < ReplicationSlotCtl->replication_slots + max_replication_slots);
517
518
0
  return slot - ReplicationSlotCtl->replication_slots;
519
0
}
520
521
/*
522
 * If the slot at 'index' is unused, return false. Otherwise 'name' is set to
523
 * the slot's name and true is returned.
524
 *
525
 * This likely is only useful for pgstat_replslot.c during shutdown, in other
526
 * cases there are obvious TOCTOU issues.
527
 */
528
bool
529
ReplicationSlotName(int index, Name name)
530
0
{
531
0
  ReplicationSlot *slot;
532
0
  bool    found;
533
534
0
  slot = &ReplicationSlotCtl->replication_slots[index];
535
536
  /*
537
   * Ensure that the slot cannot be dropped while we copy the name. Don't
538
   * need the spinlock as the name of an existing slot cannot change.
539
   */
540
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
541
0
  found = slot->in_use;
542
0
  if (slot->in_use)
543
0
    namestrcpy(name, NameStr(slot->data.name));
544
0
  LWLockRelease(ReplicationSlotControlLock);
545
546
0
  return found;
547
0
}
548
549
/*
550
 * Find a previously created slot and mark it as used by this process.
551
 *
552
 * An error is raised if nowait is true and the slot is currently in use. If
553
 * nowait is false, we sleep until the slot is released by the owning process.
554
 *
555
 * An error is raised if error_if_invalid is true and the slot is found to
556
 * be invalid. It should always be set to true, except when we are temporarily
557
 * acquiring the slot and don't intend to change it.
558
 */
559
void
560
ReplicationSlotAcquire(const char *name, bool nowait, bool error_if_invalid)
561
0
{
562
0
  ReplicationSlot *s;
563
0
  int     active_pid;
564
565
0
  Assert(name != NULL);
566
567
0
retry:
568
0
  Assert(MyReplicationSlot == NULL);
569
570
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
571
572
  /* Check if the slot exits with the given name. */
573
0
  s = SearchNamedReplicationSlot(name, false);
574
0
  if (s == NULL || !s->in_use)
575
0
  {
576
0
    LWLockRelease(ReplicationSlotControlLock);
577
578
0
    ereport(ERROR,
579
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
580
0
         errmsg("replication slot \"%s\" does not exist",
581
0
            name)));
582
0
  }
583
584
  /*
585
   * This is the slot we want; check if it's active under some other
586
   * process.  In single user mode, we don't need this check.
587
   */
588
0
  if (IsUnderPostmaster)
589
0
  {
590
    /*
591
     * Get ready to sleep on the slot in case it is active.  (We may end
592
     * up not sleeping, but we don't want to do this while holding the
593
     * spinlock.)
594
     */
595
0
    if (!nowait)
596
0
      ConditionVariablePrepareToSleep(&s->active_cv);
597
598
    /*
599
     * It is important to reset the inactive_since under spinlock here to
600
     * avoid race conditions with slot invalidation. See comments related
601
     * to inactive_since in InvalidatePossiblyObsoleteSlot.
602
     */
603
0
    SpinLockAcquire(&s->mutex);
604
0
    if (s->active_pid == 0)
605
0
      s->active_pid = MyProcPid;
606
0
    active_pid = s->active_pid;
607
0
    ReplicationSlotSetInactiveSince(s, 0, false);
608
0
    SpinLockRelease(&s->mutex);
609
0
  }
610
0
  else
611
0
  {
612
0
    active_pid = MyProcPid;
613
0
    ReplicationSlotSetInactiveSince(s, 0, true);
614
0
  }
615
0
  LWLockRelease(ReplicationSlotControlLock);
616
617
  /*
618
   * If we found the slot but it's already active in another process, we
619
   * wait until the owning process signals us that it's been released, or
620
   * error out.
621
   */
622
0
  if (active_pid != MyProcPid)
623
0
  {
624
0
    if (!nowait)
625
0
    {
626
      /* Wait here until we get signaled, and then restart */
627
0
      ConditionVariableSleep(&s->active_cv,
628
0
                   WAIT_EVENT_REPLICATION_SLOT_DROP);
629
0
      ConditionVariableCancelSleep();
630
0
      goto retry;
631
0
    }
632
633
0
    ereport(ERROR,
634
0
        (errcode(ERRCODE_OBJECT_IN_USE),
635
0
         errmsg("replication slot \"%s\" is active for PID %d",
636
0
            NameStr(s->data.name), active_pid)));
637
0
  }
638
0
  else if (!nowait)
639
0
    ConditionVariableCancelSleep(); /* no sleep needed after all */
640
641
  /* We made this slot active, so it's ours now. */
642
0
  MyReplicationSlot = s;
643
644
  /*
645
   * We need to check for invalidation after making the slot ours to avoid
646
   * the possible race condition with the checkpointer that can otherwise
647
   * invalidate the slot immediately after the check.
648
   */
649
0
  if (error_if_invalid && s->data.invalidated != RS_INVAL_NONE)
650
0
    ereport(ERROR,
651
0
        errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
652
0
        errmsg("can no longer access replication slot \"%s\"",
653
0
             NameStr(s->data.name)),
654
0
        errdetail("This replication slot has been invalidated due to \"%s\".",
655
0
              GetSlotInvalidationCauseName(s->data.invalidated)));
656
657
  /* Let everybody know we've modified this slot */
658
0
  ConditionVariableBroadcast(&s->active_cv);
659
660
  /*
661
   * The call to pgstat_acquire_replslot() protects against stats for a
662
   * different slot, from before a restart or such, being present during
663
   * pgstat_report_replslot().
664
   */
665
0
  if (SlotIsLogical(s))
666
0
    pgstat_acquire_replslot(s);
667
668
669
0
  if (am_walsender)
670
0
  {
671
0
    ereport(log_replication_commands ? LOG : DEBUG1,
672
0
        SlotIsLogical(s)
673
0
        ? errmsg("acquired logical replication slot \"%s\"",
674
0
             NameStr(s->data.name))
675
0
        : errmsg("acquired physical replication slot \"%s\"",
676
0
             NameStr(s->data.name)));
677
0
  }
678
0
}
679
680
/*
681
 * Release the replication slot that this backend considers to own.
682
 *
683
 * This or another backend can re-acquire the slot later.
684
 * Resources this slot requires will be preserved.
685
 */
686
void
687
ReplicationSlotRelease(void)
688
0
{
689
0
  ReplicationSlot *slot = MyReplicationSlot;
690
0
  char     *slotname = NULL;  /* keep compiler quiet */
691
0
  bool    is_logical = false; /* keep compiler quiet */
692
0
  TimestampTz now = 0;
693
694
0
  Assert(slot != NULL && slot->active_pid != 0);
695
696
0
  if (am_walsender)
697
0
  {
698
0
    slotname = pstrdup(NameStr(slot->data.name));
699
0
    is_logical = SlotIsLogical(slot);
700
0
  }
701
702
0
  if (slot->data.persistency == RS_EPHEMERAL)
703
0
  {
704
    /*
705
     * Delete the slot. There is no !PANIC case where this is allowed to
706
     * fail, all that may happen is an incomplete cleanup of the on-disk
707
     * data.
708
     */
709
0
    ReplicationSlotDropAcquired();
710
0
  }
711
712
  /*
713
   * If slot needed to temporarily restrain both data and catalog xmin to
714
   * create the catalog snapshot, remove that temporary constraint.
715
   * Snapshots can only be exported while the initial snapshot is still
716
   * acquired.
717
   */
718
0
  if (!TransactionIdIsValid(slot->data.xmin) &&
719
0
    TransactionIdIsValid(slot->effective_xmin))
720
0
  {
721
0
    SpinLockAcquire(&slot->mutex);
722
0
    slot->effective_xmin = InvalidTransactionId;
723
0
    SpinLockRelease(&slot->mutex);
724
0
    ReplicationSlotsComputeRequiredXmin(false);
725
0
  }
726
727
  /*
728
   * Set the time since the slot has become inactive. We get the current
729
   * time beforehand to avoid system call while holding the spinlock.
730
   */
731
0
  now = GetCurrentTimestamp();
732
733
0
  if (slot->data.persistency == RS_PERSISTENT)
734
0
  {
735
    /*
736
     * Mark persistent slot inactive.  We're not freeing it, just
737
     * disconnecting, but wake up others that may be waiting for it.
738
     */
739
0
    SpinLockAcquire(&slot->mutex);
740
0
    slot->active_pid = 0;
741
0
    ReplicationSlotSetInactiveSince(slot, now, false);
742
0
    SpinLockRelease(&slot->mutex);
743
0
    ConditionVariableBroadcast(&slot->active_cv);
744
0
  }
745
0
  else
746
0
    ReplicationSlotSetInactiveSince(slot, now, true);
747
748
0
  MyReplicationSlot = NULL;
749
750
  /* might not have been set when we've been a plain slot */
751
0
  LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
752
0
  MyProc->statusFlags &= ~PROC_IN_LOGICAL_DECODING;
753
0
  ProcGlobal->statusFlags[MyProc->pgxactoff] = MyProc->statusFlags;
754
0
  LWLockRelease(ProcArrayLock);
755
756
0
  if (am_walsender)
757
0
  {
758
0
    ereport(log_replication_commands ? LOG : DEBUG1,
759
0
        is_logical
760
0
        ? errmsg("released logical replication slot \"%s\"",
761
0
             slotname)
762
0
        : errmsg("released physical replication slot \"%s\"",
763
0
             slotname));
764
765
0
    pfree(slotname);
766
0
  }
767
0
}
768
769
/*
770
 * Cleanup temporary slots created in current session.
771
 *
772
 * Cleanup only synced temporary slots if 'synced_only' is true, else
773
 * cleanup all temporary slots.
774
 */
775
void
776
ReplicationSlotCleanup(bool synced_only)
777
0
{
778
0
  int     i;
779
780
0
  Assert(MyReplicationSlot == NULL);
781
782
0
restart:
783
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
784
0
  for (i = 0; i < max_replication_slots; i++)
785
0
  {
786
0
    ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
787
788
0
    if (!s->in_use)
789
0
      continue;
790
791
0
    SpinLockAcquire(&s->mutex);
792
0
    if ((s->active_pid == MyProcPid &&
793
0
       (!synced_only || s->data.synced)))
794
0
    {
795
0
      Assert(s->data.persistency == RS_TEMPORARY);
796
0
      SpinLockRelease(&s->mutex);
797
0
      LWLockRelease(ReplicationSlotControlLock);  /* avoid deadlock */
798
799
0
      ReplicationSlotDropPtr(s);
800
801
0
      ConditionVariableBroadcast(&s->active_cv);
802
0
      goto restart;
803
0
    }
804
0
    else
805
0
      SpinLockRelease(&s->mutex);
806
0
  }
807
808
0
  LWLockRelease(ReplicationSlotControlLock);
809
0
}
810
811
/*
812
 * Permanently drop replication slot identified by the passed in name.
813
 */
814
void
815
ReplicationSlotDrop(const char *name, bool nowait)
816
0
{
817
0
  Assert(MyReplicationSlot == NULL);
818
819
0
  ReplicationSlotAcquire(name, nowait, false);
820
821
  /*
822
   * Do not allow users to drop the slots which are currently being synced
823
   * from the primary to the standby.
824
   */
825
0
  if (RecoveryInProgress() && MyReplicationSlot->data.synced)
826
0
    ereport(ERROR,
827
0
        errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
828
0
        errmsg("cannot drop replication slot \"%s\"", name),
829
0
        errdetail("This replication slot is being synchronized from the primary server."));
830
831
0
  ReplicationSlotDropAcquired();
832
0
}
833
834
/*
835
 * Change the definition of the slot identified by the specified name.
836
 */
837
void
838
ReplicationSlotAlter(const char *name, const bool *failover,
839
           const bool *two_phase)
840
0
{
841
0
  bool    update_slot = false;
842
843
0
  Assert(MyReplicationSlot == NULL);
844
0
  Assert(failover || two_phase);
845
846
0
  ReplicationSlotAcquire(name, false, true);
847
848
0
  if (SlotIsPhysical(MyReplicationSlot))
849
0
    ereport(ERROR,
850
0
        errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
851
0
        errmsg("cannot use %s with a physical replication slot",
852
0
             "ALTER_REPLICATION_SLOT"));
853
854
0
  if (RecoveryInProgress())
855
0
  {
856
    /*
857
     * Do not allow users to alter the slots which are currently being
858
     * synced from the primary to the standby.
859
     */
860
0
    if (MyReplicationSlot->data.synced)
861
0
      ereport(ERROR,
862
0
          errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
863
0
          errmsg("cannot alter replication slot \"%s\"", name),
864
0
          errdetail("This replication slot is being synchronized from the primary server."));
865
866
    /*
867
     * Do not allow users to enable failover on the standby as we do not
868
     * support sync to the cascading standby.
869
     */
870
0
    if (failover && *failover)
871
0
      ereport(ERROR,
872
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
873
0
          errmsg("cannot enable failover for a replication slot"
874
0
               " on the standby"));
875
0
  }
876
877
0
  if (failover)
878
0
  {
879
    /*
880
     * Do not allow users to enable failover for temporary slots as we do
881
     * not support syncing temporary slots to the standby.
882
     */
883
0
    if (*failover && MyReplicationSlot->data.persistency == RS_TEMPORARY)
884
0
      ereport(ERROR,
885
0
          errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
886
0
          errmsg("cannot enable failover for a temporary replication slot"));
887
888
0
    if (MyReplicationSlot->data.failover != *failover)
889
0
    {
890
0
      SpinLockAcquire(&MyReplicationSlot->mutex);
891
0
      MyReplicationSlot->data.failover = *failover;
892
0
      SpinLockRelease(&MyReplicationSlot->mutex);
893
894
0
      update_slot = true;
895
0
    }
896
0
  }
897
898
0
  if (two_phase && MyReplicationSlot->data.two_phase != *two_phase)
899
0
  {
900
0
    SpinLockAcquire(&MyReplicationSlot->mutex);
901
0
    MyReplicationSlot->data.two_phase = *two_phase;
902
0
    SpinLockRelease(&MyReplicationSlot->mutex);
903
904
0
    update_slot = true;
905
0
  }
906
907
0
  if (update_slot)
908
0
  {
909
0
    ReplicationSlotMarkDirty();
910
0
    ReplicationSlotSave();
911
0
  }
912
913
0
  ReplicationSlotRelease();
914
0
}
915
916
/*
917
 * Permanently drop the currently acquired replication slot.
918
 */
919
void
920
ReplicationSlotDropAcquired(void)
921
0
{
922
0
  ReplicationSlot *slot = MyReplicationSlot;
923
924
0
  Assert(MyReplicationSlot != NULL);
925
926
  /* slot isn't acquired anymore */
927
0
  MyReplicationSlot = NULL;
928
929
0
  ReplicationSlotDropPtr(slot);
930
0
}
931
932
/*
933
 * Permanently drop the replication slot which will be released by the point
934
 * this function returns.
935
 */
936
static void
937
ReplicationSlotDropPtr(ReplicationSlot *slot)
938
0
{
939
0
  char    path[MAXPGPATH];
940
0
  char    tmppath[MAXPGPATH];
941
942
  /*
943
   * If some other backend ran this code concurrently with us, we might try
944
   * to delete a slot with a certain name while someone else was trying to
945
   * create a slot with the same name.
946
   */
947
0
  LWLockAcquire(ReplicationSlotAllocationLock, LW_EXCLUSIVE);
948
949
  /* Generate pathnames. */
950
0
  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
951
0
  sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
952
953
  /*
954
   * Rename the slot directory on disk, so that we'll no longer recognize
955
   * this as a valid slot.  Note that if this fails, we've got to mark the
956
   * slot inactive before bailing out.  If we're dropping an ephemeral or a
957
   * temporary slot, we better never fail hard as the caller won't expect
958
   * the slot to survive and this might get called during error handling.
959
   */
960
0
  if (rename(path, tmppath) == 0)
961
0
  {
962
    /*
963
     * We need to fsync() the directory we just renamed and its parent to
964
     * make sure that our changes are on disk in a crash-safe fashion.  If
965
     * fsync() fails, we can't be sure whether the changes are on disk or
966
     * not.  For now, we handle that by panicking;
967
     * StartupReplicationSlots() will try to straighten it out after
968
     * restart.
969
     */
970
0
    START_CRIT_SECTION();
971
0
    fsync_fname(tmppath, true);
972
0
    fsync_fname(PG_REPLSLOT_DIR, true);
973
0
    END_CRIT_SECTION();
974
0
  }
975
0
  else
976
0
  {
977
0
    bool    fail_softly = slot->data.persistency != RS_PERSISTENT;
978
979
0
    SpinLockAcquire(&slot->mutex);
980
0
    slot->active_pid = 0;
981
0
    SpinLockRelease(&slot->mutex);
982
983
    /* wake up anyone waiting on this slot */
984
0
    ConditionVariableBroadcast(&slot->active_cv);
985
986
0
    ereport(fail_softly ? WARNING : ERROR,
987
0
        (errcode_for_file_access(),
988
0
         errmsg("could not rename file \"%s\" to \"%s\": %m",
989
0
            path, tmppath)));
990
0
  }
991
992
  /*
993
   * The slot is definitely gone.  Lock out concurrent scans of the array
994
   * long enough to kill it.  It's OK to clear the active PID here without
995
   * grabbing the mutex because nobody else can be scanning the array here,
996
   * and nobody can be attached to this slot and thus access it without
997
   * scanning the array.
998
   *
999
   * Also wake up processes waiting for it.
1000
   */
1001
0
  LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE);
1002
0
  slot->active_pid = 0;
1003
0
  slot->in_use = false;
1004
0
  LWLockRelease(ReplicationSlotControlLock);
1005
0
  ConditionVariableBroadcast(&slot->active_cv);
1006
1007
  /*
1008
   * Slot is dead and doesn't prevent resource removal anymore, recompute
1009
   * limits.
1010
   */
1011
0
  ReplicationSlotsComputeRequiredXmin(false);
1012
0
  ReplicationSlotsComputeRequiredLSN();
1013
1014
  /*
1015
   * If removing the directory fails, the worst thing that will happen is
1016
   * that the user won't be able to create a new slot with the same name
1017
   * until the next server restart.  We warn about it, but that's all.
1018
   */
1019
0
  if (!rmtree(tmppath, true))
1020
0
    ereport(WARNING,
1021
0
        (errmsg("could not remove directory \"%s\"", tmppath)));
1022
1023
  /*
1024
   * Drop the statistics entry for the replication slot.  Do this while
1025
   * holding ReplicationSlotAllocationLock so that we don't drop a
1026
   * statistics entry for another slot with the same name just created in
1027
   * another session.
1028
   */
1029
0
  if (SlotIsLogical(slot))
1030
0
    pgstat_drop_replslot(slot);
1031
1032
  /*
1033
   * We release this at the very end, so that nobody starts trying to create
1034
   * a slot while we're still cleaning up the detritus of the old one.
1035
   */
1036
0
  LWLockRelease(ReplicationSlotAllocationLock);
1037
0
}
1038
1039
/*
1040
 * Serialize the currently acquired slot's state from memory to disk, thereby
1041
 * guaranteeing the current state will survive a crash.
1042
 */
1043
void
1044
ReplicationSlotSave(void)
1045
0
{
1046
0
  char    path[MAXPGPATH];
1047
1048
0
  Assert(MyReplicationSlot != NULL);
1049
1050
0
  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(MyReplicationSlot->data.name));
1051
0
  SaveSlotToPath(MyReplicationSlot, path, ERROR);
1052
0
}
1053
1054
/*
1055
 * Signal that it would be useful if the currently acquired slot would be
1056
 * flushed out to disk.
1057
 *
1058
 * Note that the actual flush to disk can be delayed for a long time, if
1059
 * required for correctness explicitly do a ReplicationSlotSave().
1060
 */
1061
void
1062
ReplicationSlotMarkDirty(void)
1063
0
{
1064
0
  ReplicationSlot *slot = MyReplicationSlot;
1065
1066
0
  Assert(MyReplicationSlot != NULL);
1067
1068
0
  SpinLockAcquire(&slot->mutex);
1069
0
  MyReplicationSlot->just_dirtied = true;
1070
0
  MyReplicationSlot->dirty = true;
1071
0
  SpinLockRelease(&slot->mutex);
1072
0
}
1073
1074
/*
1075
 * Convert a slot that's marked as RS_EPHEMERAL or RS_TEMPORARY to a
1076
 * RS_PERSISTENT slot, guaranteeing it will be there after an eventual crash.
1077
 */
1078
void
1079
ReplicationSlotPersist(void)
1080
0
{
1081
0
  ReplicationSlot *slot = MyReplicationSlot;
1082
1083
0
  Assert(slot != NULL);
1084
0
  Assert(slot->data.persistency != RS_PERSISTENT);
1085
1086
0
  SpinLockAcquire(&slot->mutex);
1087
0
  slot->data.persistency = RS_PERSISTENT;
1088
0
  SpinLockRelease(&slot->mutex);
1089
1090
0
  ReplicationSlotMarkDirty();
1091
0
  ReplicationSlotSave();
1092
0
}
1093
1094
/*
1095
 * Compute the oldest xmin across all slots and store it in the ProcArray.
1096
 *
1097
 * If already_locked is true, ProcArrayLock has already been acquired
1098
 * exclusively.
1099
 */
1100
void
1101
ReplicationSlotsComputeRequiredXmin(bool already_locked)
1102
0
{
1103
0
  int     i;
1104
0
  TransactionId agg_xmin = InvalidTransactionId;
1105
0
  TransactionId agg_catalog_xmin = InvalidTransactionId;
1106
1107
0
  Assert(ReplicationSlotCtl != NULL);
1108
1109
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1110
1111
0
  for (i = 0; i < max_replication_slots; i++)
1112
0
  {
1113
0
    ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1114
0
    TransactionId effective_xmin;
1115
0
    TransactionId effective_catalog_xmin;
1116
0
    bool    invalidated;
1117
1118
0
    if (!s->in_use)
1119
0
      continue;
1120
1121
0
    SpinLockAcquire(&s->mutex);
1122
0
    effective_xmin = s->effective_xmin;
1123
0
    effective_catalog_xmin = s->effective_catalog_xmin;
1124
0
    invalidated = s->data.invalidated != RS_INVAL_NONE;
1125
0
    SpinLockRelease(&s->mutex);
1126
1127
    /* invalidated slots need not apply */
1128
0
    if (invalidated)
1129
0
      continue;
1130
1131
    /* check the data xmin */
1132
0
    if (TransactionIdIsValid(effective_xmin) &&
1133
0
      (!TransactionIdIsValid(agg_xmin) ||
1134
0
       TransactionIdPrecedes(effective_xmin, agg_xmin)))
1135
0
      agg_xmin = effective_xmin;
1136
1137
    /* check the catalog xmin */
1138
0
    if (TransactionIdIsValid(effective_catalog_xmin) &&
1139
0
      (!TransactionIdIsValid(agg_catalog_xmin) ||
1140
0
       TransactionIdPrecedes(effective_catalog_xmin, agg_catalog_xmin)))
1141
0
      agg_catalog_xmin = effective_catalog_xmin;
1142
0
  }
1143
1144
0
  LWLockRelease(ReplicationSlotControlLock);
1145
1146
0
  ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked);
1147
0
}
1148
1149
/*
1150
 * Compute the oldest restart LSN across all slots and inform xlog module.
1151
 *
1152
 * Note: while max_slot_wal_keep_size is theoretically relevant for this
1153
 * purpose, we don't try to account for that, because this module doesn't
1154
 * know what to compare against.
1155
 */
1156
void
1157
ReplicationSlotsComputeRequiredLSN(void)
1158
0
{
1159
0
  int     i;
1160
0
  XLogRecPtr  min_required = InvalidXLogRecPtr;
1161
1162
0
  Assert(ReplicationSlotCtl != NULL);
1163
1164
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1165
0
  for (i = 0; i < max_replication_slots; i++)
1166
0
  {
1167
0
    ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
1168
0
    XLogRecPtr  restart_lsn;
1169
0
    XLogRecPtr  last_saved_restart_lsn;
1170
0
    bool    invalidated;
1171
0
    ReplicationSlotPersistency persistency;
1172
1173
0
    if (!s->in_use)
1174
0
      continue;
1175
1176
0
    SpinLockAcquire(&s->mutex);
1177
0
    persistency = s->data.persistency;
1178
0
    restart_lsn = s->data.restart_lsn;
1179
0
    invalidated = s->data.invalidated != RS_INVAL_NONE;
1180
0
    last_saved_restart_lsn = s->last_saved_restart_lsn;
1181
0
    SpinLockRelease(&s->mutex);
1182
1183
    /* invalidated slots need not apply */
1184
0
    if (invalidated)
1185
0
      continue;
1186
1187
    /*
1188
     * For persistent slot use last_saved_restart_lsn to compute the
1189
     * oldest LSN for removal of WAL segments.  The segments between
1190
     * last_saved_restart_lsn and restart_lsn might be needed by a
1191
     * persistent slot in the case of database crash.  Non-persistent
1192
     * slots can't survive the database crash, so we don't care about
1193
     * last_saved_restart_lsn for them.
1194
     */
1195
0
    if (persistency == RS_PERSISTENT)
1196
0
    {
1197
0
      if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1198
0
        restart_lsn > last_saved_restart_lsn)
1199
0
      {
1200
0
        restart_lsn = last_saved_restart_lsn;
1201
0
      }
1202
0
    }
1203
1204
0
    if (restart_lsn != InvalidXLogRecPtr &&
1205
0
      (min_required == InvalidXLogRecPtr ||
1206
0
       restart_lsn < min_required))
1207
0
      min_required = restart_lsn;
1208
0
  }
1209
0
  LWLockRelease(ReplicationSlotControlLock);
1210
1211
0
  XLogSetReplicationSlotMinimumLSN(min_required);
1212
0
}
1213
1214
/*
1215
 * Compute the oldest WAL LSN required by *logical* decoding slots..
1216
 *
1217
 * Returns InvalidXLogRecPtr if logical decoding is disabled or no logical
1218
 * slots exist.
1219
 *
1220
 * NB: this returns a value >= ReplicationSlotsComputeRequiredLSN(), since it
1221
 * ignores physical replication slots.
1222
 *
1223
 * The results aren't required frequently, so we don't maintain a precomputed
1224
 * value like we do for ComputeRequiredLSN() and ComputeRequiredXmin().
1225
 */
1226
XLogRecPtr
1227
ReplicationSlotsComputeLogicalRestartLSN(void)
1228
0
{
1229
0
  XLogRecPtr  result = InvalidXLogRecPtr;
1230
0
  int     i;
1231
1232
0
  if (max_replication_slots <= 0)
1233
0
    return InvalidXLogRecPtr;
1234
1235
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1236
1237
0
  for (i = 0; i < max_replication_slots; i++)
1238
0
  {
1239
0
    ReplicationSlot *s;
1240
0
    XLogRecPtr  restart_lsn;
1241
0
    XLogRecPtr  last_saved_restart_lsn;
1242
0
    bool    invalidated;
1243
0
    ReplicationSlotPersistency persistency;
1244
1245
0
    s = &ReplicationSlotCtl->replication_slots[i];
1246
1247
    /* cannot change while ReplicationSlotCtlLock is held */
1248
0
    if (!s->in_use)
1249
0
      continue;
1250
1251
    /* we're only interested in logical slots */
1252
0
    if (!SlotIsLogical(s))
1253
0
      continue;
1254
1255
    /* read once, it's ok if it increases while we're checking */
1256
0
    SpinLockAcquire(&s->mutex);
1257
0
    persistency = s->data.persistency;
1258
0
    restart_lsn = s->data.restart_lsn;
1259
0
    invalidated = s->data.invalidated != RS_INVAL_NONE;
1260
0
    last_saved_restart_lsn = s->last_saved_restart_lsn;
1261
0
    SpinLockRelease(&s->mutex);
1262
1263
    /* invalidated slots need not apply */
1264
0
    if (invalidated)
1265
0
      continue;
1266
1267
    /*
1268
     * For persistent slot use last_saved_restart_lsn to compute the
1269
     * oldest LSN for removal of WAL segments.  The segments between
1270
     * last_saved_restart_lsn and restart_lsn might be needed by a
1271
     * persistent slot in the case of database crash.  Non-persistent
1272
     * slots can't survive the database crash, so we don't care about
1273
     * last_saved_restart_lsn for them.
1274
     */
1275
0
    if (persistency == RS_PERSISTENT)
1276
0
    {
1277
0
      if (last_saved_restart_lsn != InvalidXLogRecPtr &&
1278
0
        restart_lsn > last_saved_restart_lsn)
1279
0
      {
1280
0
        restart_lsn = last_saved_restart_lsn;
1281
0
      }
1282
0
    }
1283
1284
0
    if (restart_lsn == InvalidXLogRecPtr)
1285
0
      continue;
1286
1287
0
    if (result == InvalidXLogRecPtr ||
1288
0
      restart_lsn < result)
1289
0
      result = restart_lsn;
1290
0
  }
1291
1292
0
  LWLockRelease(ReplicationSlotControlLock);
1293
1294
0
  return result;
1295
0
}
1296
1297
/*
1298
 * ReplicationSlotsCountDBSlots -- count the number of slots that refer to the
1299
 * passed database oid.
1300
 *
1301
 * Returns true if there are any slots referencing the database. *nslots will
1302
 * be set to the absolute number of slots in the database, *nactive to ones
1303
 * currently active.
1304
 */
1305
bool
1306
ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive)
1307
0
{
1308
0
  int     i;
1309
1310
0
  *nslots = *nactive = 0;
1311
1312
0
  if (max_replication_slots <= 0)
1313
0
    return false;
1314
1315
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1316
0
  for (i = 0; i < max_replication_slots; i++)
1317
0
  {
1318
0
    ReplicationSlot *s;
1319
1320
0
    s = &ReplicationSlotCtl->replication_slots[i];
1321
1322
    /* cannot change while ReplicationSlotCtlLock is held */
1323
0
    if (!s->in_use)
1324
0
      continue;
1325
1326
    /* only logical slots are database specific, skip */
1327
0
    if (!SlotIsLogical(s))
1328
0
      continue;
1329
1330
    /* not our database, skip */
1331
0
    if (s->data.database != dboid)
1332
0
      continue;
1333
1334
    /* NB: intentionally counting invalidated slots */
1335
1336
    /* count slots with spinlock held */
1337
0
    SpinLockAcquire(&s->mutex);
1338
0
    (*nslots)++;
1339
0
    if (s->active_pid != 0)
1340
0
      (*nactive)++;
1341
0
    SpinLockRelease(&s->mutex);
1342
0
  }
1343
0
  LWLockRelease(ReplicationSlotControlLock);
1344
1345
0
  if (*nslots > 0)
1346
0
    return true;
1347
0
  return false;
1348
0
}
1349
1350
/*
1351
 * ReplicationSlotsDropDBSlots -- Drop all db-specific slots relating to the
1352
 * passed database oid. The caller should hold an exclusive lock on the
1353
 * pg_database oid for the database to prevent creation of new slots on the db
1354
 * or replay from existing slots.
1355
 *
1356
 * Another session that concurrently acquires an existing slot on the target DB
1357
 * (most likely to drop it) may cause this function to ERROR. If that happens
1358
 * it may have dropped some but not all slots.
1359
 *
1360
 * This routine isn't as efficient as it could be - but we don't drop
1361
 * databases often, especially databases with lots of slots.
1362
 */
1363
void
1364
ReplicationSlotsDropDBSlots(Oid dboid)
1365
0
{
1366
0
  int     i;
1367
1368
0
  if (max_replication_slots <= 0)
1369
0
    return;
1370
1371
0
restart:
1372
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1373
0
  for (i = 0; i < max_replication_slots; i++)
1374
0
  {
1375
0
    ReplicationSlot *s;
1376
0
    char     *slotname;
1377
0
    int     active_pid;
1378
1379
0
    s = &ReplicationSlotCtl->replication_slots[i];
1380
1381
    /* cannot change while ReplicationSlotCtlLock is held */
1382
0
    if (!s->in_use)
1383
0
      continue;
1384
1385
    /* only logical slots are database specific, skip */
1386
0
    if (!SlotIsLogical(s))
1387
0
      continue;
1388
1389
    /* not our database, skip */
1390
0
    if (s->data.database != dboid)
1391
0
      continue;
1392
1393
    /* NB: intentionally including invalidated slots */
1394
1395
    /* acquire slot, so ReplicationSlotDropAcquired can be reused  */
1396
0
    SpinLockAcquire(&s->mutex);
1397
    /* can't change while ReplicationSlotControlLock is held */
1398
0
    slotname = NameStr(s->data.name);
1399
0
    active_pid = s->active_pid;
1400
0
    if (active_pid == 0)
1401
0
    {
1402
0
      MyReplicationSlot = s;
1403
0
      s->active_pid = MyProcPid;
1404
0
    }
1405
0
    SpinLockRelease(&s->mutex);
1406
1407
    /*
1408
     * Even though we hold an exclusive lock on the database object a
1409
     * logical slot for that DB can still be active, e.g. if it's
1410
     * concurrently being dropped by a backend connected to another DB.
1411
     *
1412
     * That's fairly unlikely in practice, so we'll just bail out.
1413
     *
1414
     * The slot sync worker holds a shared lock on the database before
1415
     * operating on synced logical slots to avoid conflict with the drop
1416
     * happening here. The persistent synced slots are thus safe but there
1417
     * is a possibility that the slot sync worker has created a temporary
1418
     * slot (which stays active even on release) and we are trying to drop
1419
     * that here. In practice, the chances of hitting this scenario are
1420
     * less as during slot synchronization, the temporary slot is
1421
     * immediately converted to persistent and thus is safe due to the
1422
     * shared lock taken on the database. So, we'll just bail out in such
1423
     * a case.
1424
     *
1425
     * XXX: We can consider shutting down the slot sync worker before
1426
     * trying to drop synced temporary slots here.
1427
     */
1428
0
    if (active_pid)
1429
0
      ereport(ERROR,
1430
0
          (errcode(ERRCODE_OBJECT_IN_USE),
1431
0
           errmsg("replication slot \"%s\" is active for PID %d",
1432
0
              slotname, active_pid)));
1433
1434
    /*
1435
     * To avoid duplicating ReplicationSlotDropAcquired() and to avoid
1436
     * holding ReplicationSlotControlLock over filesystem operations,
1437
     * release ReplicationSlotControlLock and use
1438
     * ReplicationSlotDropAcquired.
1439
     *
1440
     * As that means the set of slots could change, restart scan from the
1441
     * beginning each time we release the lock.
1442
     */
1443
0
    LWLockRelease(ReplicationSlotControlLock);
1444
0
    ReplicationSlotDropAcquired();
1445
0
    goto restart;
1446
0
  }
1447
0
  LWLockRelease(ReplicationSlotControlLock);
1448
0
}
1449
1450
1451
/*
1452
 * Check whether the server's configuration supports using replication
1453
 * slots.
1454
 */
1455
void
1456
CheckSlotRequirements(void)
1457
0
{
1458
  /*
1459
   * NB: Adding a new requirement likely means that RestoreSlotFromDisk()
1460
   * needs the same check.
1461
   */
1462
1463
0
  if (max_replication_slots == 0)
1464
0
    ereport(ERROR,
1465
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1466
0
         errmsg("replication slots can only be used if \"max_replication_slots\" > 0")));
1467
1468
0
  if (wal_level < WAL_LEVEL_REPLICA)
1469
0
    ereport(ERROR,
1470
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1471
0
         errmsg("replication slots can only be used if \"wal_level\" >= \"replica\"")));
1472
0
}
1473
1474
/*
1475
 * Check whether the user has privilege to use replication slots.
1476
 */
1477
void
1478
CheckSlotPermissions(void)
1479
0
{
1480
0
  if (!has_rolreplication(GetUserId()))
1481
0
    ereport(ERROR,
1482
0
        (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
1483
0
         errmsg("permission denied to use replication slots"),
1484
0
         errdetail("Only roles with the %s attribute may use replication slots.",
1485
0
               "REPLICATION")));
1486
0
}
1487
1488
/*
1489
 * Reserve WAL for the currently active slot.
1490
 *
1491
 * Compute and set restart_lsn in a manner that's appropriate for the type of
1492
 * the slot and concurrency safe.
1493
 */
1494
void
1495
ReplicationSlotReserveWal(void)
1496
0
{
1497
0
  ReplicationSlot *slot = MyReplicationSlot;
1498
1499
0
  Assert(slot != NULL);
1500
0
  Assert(slot->data.restart_lsn == InvalidXLogRecPtr);
1501
0
  Assert(slot->last_saved_restart_lsn == InvalidXLogRecPtr);
1502
1503
  /*
1504
   * The replication slot mechanism is used to prevent removal of required
1505
   * WAL. As there is no interlock between this routine and checkpoints, WAL
1506
   * segments could concurrently be removed when a now stale return value of
1507
   * ReplicationSlotsComputeRequiredLSN() is used. In the unlikely case that
1508
   * this happens we'll just retry.
1509
   */
1510
0
  while (true)
1511
0
  {
1512
0
    XLogSegNo segno;
1513
0
    XLogRecPtr  restart_lsn;
1514
1515
    /*
1516
     * For logical slots log a standby snapshot and start logical decoding
1517
     * at exactly that position. That allows the slot to start up more
1518
     * quickly. But on a standby we cannot do WAL writes, so just use the
1519
     * replay pointer; effectively, an attempt to create a logical slot on
1520
     * standby will cause it to wait for an xl_running_xact record to be
1521
     * logged independently on the primary, so that a snapshot can be
1522
     * built using the record.
1523
     *
1524
     * None of this is needed (or indeed helpful) for physical slots as
1525
     * they'll start replay at the last logged checkpoint anyway. Instead
1526
     * return the location of the last redo LSN. While that slightly
1527
     * increases the chance that we have to retry, it's where a base
1528
     * backup has to start replay at.
1529
     */
1530
0
    if (SlotIsPhysical(slot))
1531
0
      restart_lsn = GetRedoRecPtr();
1532
0
    else if (RecoveryInProgress())
1533
0
      restart_lsn = GetXLogReplayRecPtr(NULL);
1534
0
    else
1535
0
      restart_lsn = GetXLogInsertRecPtr();
1536
1537
0
    SpinLockAcquire(&slot->mutex);
1538
0
    slot->data.restart_lsn = restart_lsn;
1539
0
    SpinLockRelease(&slot->mutex);
1540
1541
    /* prevent WAL removal as fast as possible */
1542
0
    ReplicationSlotsComputeRequiredLSN();
1543
1544
    /*
1545
     * If all required WAL is still there, great, otherwise retry. The
1546
     * slot should prevent further removal of WAL, unless there's a
1547
     * concurrent ReplicationSlotsComputeRequiredLSN() after we've written
1548
     * the new restart_lsn above, so normally we should never need to loop
1549
     * more than twice.
1550
     */
1551
0
    XLByteToSeg(slot->data.restart_lsn, segno, wal_segment_size);
1552
0
    if (XLogGetLastRemovedSegno() < segno)
1553
0
      break;
1554
0
  }
1555
1556
0
  if (!RecoveryInProgress() && SlotIsLogical(slot))
1557
0
  {
1558
0
    XLogRecPtr  flushptr;
1559
1560
    /* make sure we have enough information to start */
1561
0
    flushptr = LogStandbySnapshot();
1562
1563
    /* and make sure it's fsynced to disk */
1564
0
    XLogFlush(flushptr);
1565
0
  }
1566
0
}
1567
1568
/*
1569
 * Report that replication slot needs to be invalidated
1570
 */
1571
static void
1572
ReportSlotInvalidation(ReplicationSlotInvalidationCause cause,
1573
             bool terminating,
1574
             int pid,
1575
             NameData slotname,
1576
             XLogRecPtr restart_lsn,
1577
             XLogRecPtr oldestLSN,
1578
             TransactionId snapshotConflictHorizon,
1579
             long slot_idle_seconds)
1580
0
{
1581
0
  StringInfoData err_detail;
1582
0
  StringInfoData err_hint;
1583
1584
0
  initStringInfo(&err_detail);
1585
0
  initStringInfo(&err_hint);
1586
1587
0
  switch (cause)
1588
0
  {
1589
0
    case RS_INVAL_WAL_REMOVED:
1590
0
      {
1591
0
        uint64    ex = oldestLSN - restart_lsn;
1592
1593
0
        appendStringInfo(&err_detail,
1594
0
                 ngettext("The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " byte.",
1595
0
                      "The slot's restart_lsn %X/%X exceeds the limit by %" PRIu64 " bytes.",
1596
0
                      ex),
1597
0
                 LSN_FORMAT_ARGS(restart_lsn),
1598
0
                 ex);
1599
        /* translator: %s is a GUC variable name */
1600
0
        appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1601
0
                 "max_slot_wal_keep_size");
1602
0
        break;
1603
0
      }
1604
0
    case RS_INVAL_HORIZON:
1605
0
      appendStringInfo(&err_detail, _("The slot conflicted with xid horizon %u."),
1606
0
               snapshotConflictHorizon);
1607
0
      break;
1608
1609
0
    case RS_INVAL_WAL_LEVEL:
1610
0
      appendStringInfoString(&err_detail, _("Logical decoding on standby requires \"wal_level\" >= \"logical\" on the primary server."));
1611
0
      break;
1612
1613
0
    case RS_INVAL_IDLE_TIMEOUT:
1614
0
      {
1615
0
        int     minutes = slot_idle_seconds / SECS_PER_MINUTE;
1616
0
        int     secs = slot_idle_seconds % SECS_PER_MINUTE;
1617
1618
        /* translator: %s is a GUC variable name */
1619
0
        appendStringInfo(&err_detail, _("The slot's idle time of %dmin %02ds exceeds the configured \"%s\" duration of %dmin."),
1620
0
                 minutes, secs, "idle_replication_slot_timeout",
1621
0
                 idle_replication_slot_timeout_mins);
1622
        /* translator: %s is a GUC variable name */
1623
0
        appendStringInfo(&err_hint, _("You might need to increase \"%s\"."),
1624
0
                 "idle_replication_slot_timeout");
1625
0
        break;
1626
0
      }
1627
0
    case RS_INVAL_NONE:
1628
0
      pg_unreachable();
1629
0
  }
1630
1631
0
  ereport(LOG,
1632
0
      terminating ?
1633
0
      errmsg("terminating process %d to release replication slot \"%s\"",
1634
0
           pid, NameStr(slotname)) :
1635
0
      errmsg("invalidating obsolete replication slot \"%s\"",
1636
0
           NameStr(slotname)),
1637
0
      errdetail_internal("%s", err_detail.data),
1638
0
      err_hint.len ? errhint("%s", err_hint.data) : 0);
1639
1640
0
  pfree(err_detail.data);
1641
0
  pfree(err_hint.data);
1642
0
}
1643
1644
/*
1645
 * Can we invalidate an idle replication slot?
1646
 *
1647
 * Idle timeout invalidation is allowed only when:
1648
 *
1649
 * 1. Idle timeout is set
1650
 * 2. Slot has reserved WAL
1651
 * 3. Slot is inactive
1652
 * 4. The slot is not being synced from the primary while the server is in
1653
 *    recovery. This is because synced slots are always considered to be
1654
 *    inactive because they don't perform logical decoding to produce changes.
1655
 */
1656
static inline bool
1657
CanInvalidateIdleSlot(ReplicationSlot *s)
1658
0
{
1659
0
  return (idle_replication_slot_timeout_mins != 0 &&
1660
0
      !XLogRecPtrIsInvalid(s->data.restart_lsn) &&
1661
0
      s->inactive_since > 0 &&
1662
0
      !(RecoveryInProgress() && s->data.synced));
1663
0
}
1664
1665
/*
1666
 * DetermineSlotInvalidationCause - Determine the cause for which a slot
1667
 * becomes invalid among the given possible causes.
1668
 *
1669
 * This function sequentially checks all possible invalidation causes and
1670
 * returns the first one for which the slot is eligible for invalidation.
1671
 */
1672
static ReplicationSlotInvalidationCause
1673
DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
1674
                 XLogRecPtr oldestLSN, Oid dboid,
1675
                 TransactionId snapshotConflictHorizon,
1676
                 TransactionId initial_effective_xmin,
1677
                 TransactionId initial_catalog_effective_xmin,
1678
                 XLogRecPtr initial_restart_lsn,
1679
                 TimestampTz *inactive_since, TimestampTz now)
1680
0
{
1681
0
  Assert(possible_causes != RS_INVAL_NONE);
1682
1683
0
  if (possible_causes & RS_INVAL_WAL_REMOVED)
1684
0
  {
1685
0
    if (initial_restart_lsn != InvalidXLogRecPtr &&
1686
0
      initial_restart_lsn < oldestLSN)
1687
0
      return RS_INVAL_WAL_REMOVED;
1688
0
  }
1689
1690
0
  if (possible_causes & RS_INVAL_HORIZON)
1691
0
  {
1692
    /* invalid DB oid signals a shared relation */
1693
0
    if (SlotIsLogical(s) &&
1694
0
      (dboid == InvalidOid || dboid == s->data.database))
1695
0
    {
1696
0
      if (TransactionIdIsValid(initial_effective_xmin) &&
1697
0
        TransactionIdPrecedesOrEquals(initial_effective_xmin,
1698
0
                        snapshotConflictHorizon))
1699
0
        return RS_INVAL_HORIZON;
1700
0
      else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
1701
0
           TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
1702
0
                           snapshotConflictHorizon))
1703
0
        return RS_INVAL_HORIZON;
1704
0
    }
1705
0
  }
1706
1707
0
  if (possible_causes & RS_INVAL_WAL_LEVEL)
1708
0
  {
1709
0
    if (SlotIsLogical(s))
1710
0
      return RS_INVAL_WAL_LEVEL;
1711
0
  }
1712
1713
0
  if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1714
0
  {
1715
0
    Assert(now > 0);
1716
1717
0
    if (CanInvalidateIdleSlot(s))
1718
0
    {
1719
      /*
1720
       * We simulate the invalidation due to idle_timeout as the minimum
1721
       * time idle time is one minute which makes tests take a long
1722
       * time.
1723
       */
1724
#ifdef USE_INJECTION_POINTS
1725
      if (IS_INJECTION_POINT_ATTACHED("slot-timeout-inval"))
1726
      {
1727
        *inactive_since = 0;  /* since the beginning of time */
1728
        return RS_INVAL_IDLE_TIMEOUT;
1729
      }
1730
#endif
1731
1732
      /*
1733
       * Check if the slot needs to be invalidated due to
1734
       * idle_replication_slot_timeout GUC.
1735
       */
1736
0
      if (TimestampDifferenceExceedsSeconds(s->inactive_since, now,
1737
0
                          idle_replication_slot_timeout_mins * SECS_PER_MINUTE))
1738
0
      {
1739
0
        *inactive_since = s->inactive_since;
1740
0
        return RS_INVAL_IDLE_TIMEOUT;
1741
0
      }
1742
0
    }
1743
0
  }
1744
1745
0
  return RS_INVAL_NONE;
1746
0
}
1747
1748
/*
1749
 * Helper for InvalidateObsoleteReplicationSlots
1750
 *
1751
 * Acquires the given slot and mark it invalid, if necessary and possible.
1752
 *
1753
 * Returns whether ReplicationSlotControlLock was released in the interim (and
1754
 * in that case we're not holding the lock at return, otherwise we are).
1755
 *
1756
 * Sets *invalidated true if the slot was invalidated. (Untouched otherwise.)
1757
 *
1758
 * This is inherently racy, because we release the LWLock
1759
 * for syscalls, so caller must restart if we return true.
1760
 */
1761
static bool
1762
InvalidatePossiblyObsoleteSlot(uint32 possible_causes,
1763
                 ReplicationSlot *s,
1764
                 XLogRecPtr oldestLSN,
1765
                 Oid dboid, TransactionId snapshotConflictHorizon,
1766
                 bool *invalidated)
1767
0
{
1768
0
  int     last_signaled_pid = 0;
1769
0
  bool    released_lock = false;
1770
0
  bool    terminated = false;
1771
0
  TransactionId initial_effective_xmin = InvalidTransactionId;
1772
0
  TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
1773
0
  XLogRecPtr  initial_restart_lsn = InvalidXLogRecPtr;
1774
0
  ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
1775
0
  TimestampTz inactive_since = 0;
1776
1777
0
  for (;;)
1778
0
  {
1779
0
    XLogRecPtr  restart_lsn;
1780
0
    NameData  slotname;
1781
0
    int     active_pid = 0;
1782
0
    ReplicationSlotInvalidationCause invalidation_cause = RS_INVAL_NONE;
1783
0
    TimestampTz now = 0;
1784
0
    long    slot_idle_secs = 0;
1785
1786
0
    Assert(LWLockHeldByMeInMode(ReplicationSlotControlLock, LW_SHARED));
1787
1788
0
    if (!s->in_use)
1789
0
    {
1790
0
      if (released_lock)
1791
0
        LWLockRelease(ReplicationSlotControlLock);
1792
0
      break;
1793
0
    }
1794
1795
0
    if (possible_causes & RS_INVAL_IDLE_TIMEOUT)
1796
0
    {
1797
      /*
1798
       * Assign the current time here to avoid system call overhead
1799
       * while holding the spinlock in subsequent code.
1800
       */
1801
0
      now = GetCurrentTimestamp();
1802
0
    }
1803
1804
    /*
1805
     * Check if the slot needs to be invalidated. If it needs to be
1806
     * invalidated, and is not currently acquired, acquire it and mark it
1807
     * as having been invalidated.  We do this with the spinlock held to
1808
     * avoid race conditions -- for example the restart_lsn could move
1809
     * forward, or the slot could be dropped.
1810
     */
1811
0
    SpinLockAcquire(&s->mutex);
1812
1813
0
    Assert(s->data.restart_lsn >= s->last_saved_restart_lsn);
1814
1815
0
    restart_lsn = s->data.restart_lsn;
1816
1817
    /* we do nothing if the slot is already invalid */
1818
0
    if (s->data.invalidated == RS_INVAL_NONE)
1819
0
    {
1820
      /*
1821
       * The slot's mutex will be released soon, and it is possible that
1822
       * those values change since the process holding the slot has been
1823
       * terminated (if any), so record them here to ensure that we
1824
       * would report the correct invalidation cause.
1825
       *
1826
       * Unlike other slot attributes, slot's inactive_since can't be
1827
       * changed until the acquired slot is released or the owning
1828
       * process is terminated. So, the inactive slot can only be
1829
       * invalidated immediately without being terminated.
1830
       */
1831
0
      if (!terminated)
1832
0
      {
1833
0
        initial_restart_lsn = s->data.restart_lsn;
1834
0
        initial_effective_xmin = s->effective_xmin;
1835
0
        initial_catalog_effective_xmin = s->effective_catalog_xmin;
1836
0
      }
1837
1838
0
      invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
1839
0
                                s, oldestLSN,
1840
0
                                dboid,
1841
0
                                snapshotConflictHorizon,
1842
0
                                initial_effective_xmin,
1843
0
                                initial_catalog_effective_xmin,
1844
0
                                initial_restart_lsn,
1845
0
                                &inactive_since,
1846
0
                                now);
1847
0
    }
1848
1849
    /*
1850
     * The invalidation cause recorded previously should not change while
1851
     * the process owning the slot (if any) has been terminated.
1852
     */
1853
0
    Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
1854
0
         invalidation_cause_prev != invalidation_cause));
1855
1856
    /* if there's no invalidation, we're done */
1857
0
    if (invalidation_cause == RS_INVAL_NONE)
1858
0
    {
1859
0
      SpinLockRelease(&s->mutex);
1860
0
      if (released_lock)
1861
0
        LWLockRelease(ReplicationSlotControlLock);
1862
0
      break;
1863
0
    }
1864
1865
0
    slotname = s->data.name;
1866
0
    active_pid = s->active_pid;
1867
1868
    /*
1869
     * If the slot can be acquired, do so and mark it invalidated
1870
     * immediately.  Otherwise we'll signal the owning process, below, and
1871
     * retry.
1872
     */
1873
0
    if (active_pid == 0)
1874
0
    {
1875
0
      MyReplicationSlot = s;
1876
0
      s->active_pid = MyProcPid;
1877
0
      s->data.invalidated = invalidation_cause;
1878
1879
      /*
1880
       * XXX: We should consider not overwriting restart_lsn and instead
1881
       * just rely on .invalidated.
1882
       */
1883
0
      if (invalidation_cause == RS_INVAL_WAL_REMOVED)
1884
0
      {
1885
0
        s->data.restart_lsn = InvalidXLogRecPtr;
1886
0
        s->last_saved_restart_lsn = InvalidXLogRecPtr;
1887
0
      }
1888
1889
      /* Let caller know */
1890
0
      *invalidated = true;
1891
0
    }
1892
1893
0
    SpinLockRelease(&s->mutex);
1894
1895
    /*
1896
     * The logical replication slots shouldn't be invalidated as GUC
1897
     * max_slot_wal_keep_size is set to -1 and
1898
     * idle_replication_slot_timeout is set to 0 during the binary
1899
     * upgrade. See check_old_cluster_for_valid_slots() where we ensure
1900
     * that no invalidated before the upgrade.
1901
     */
1902
0
    Assert(!(*invalidated && SlotIsLogical(s) && IsBinaryUpgrade));
1903
1904
    /*
1905
     * Calculate the idle time duration of the slot if slot is marked
1906
     * invalidated with RS_INVAL_IDLE_TIMEOUT.
1907
     */
1908
0
    if (invalidation_cause == RS_INVAL_IDLE_TIMEOUT)
1909
0
    {
1910
0
      int     slot_idle_usecs;
1911
1912
0
      TimestampDifference(inactive_since, now, &slot_idle_secs,
1913
0
                &slot_idle_usecs);
1914
0
    }
1915
1916
0
    if (active_pid != 0)
1917
0
    {
1918
      /*
1919
       * Prepare the sleep on the slot's condition variable before
1920
       * releasing the lock, to close a possible race condition if the
1921
       * slot is released before the sleep below.
1922
       */
1923
0
      ConditionVariablePrepareToSleep(&s->active_cv);
1924
1925
0
      LWLockRelease(ReplicationSlotControlLock);
1926
0
      released_lock = true;
1927
1928
      /*
1929
       * Signal to terminate the process that owns the slot, if we
1930
       * haven't already signalled it.  (Avoidance of repeated
1931
       * signalling is the only reason for there to be a loop in this
1932
       * routine; otherwise we could rely on caller's restart loop.)
1933
       *
1934
       * There is the race condition that other process may own the slot
1935
       * after its current owner process is terminated and before this
1936
       * process owns it. To handle that, we signal only if the PID of
1937
       * the owning process has changed from the previous time. (This
1938
       * logic assumes that the same PID is not reused very quickly.)
1939
       */
1940
0
      if (last_signaled_pid != active_pid)
1941
0
      {
1942
0
        ReportSlotInvalidation(invalidation_cause, true, active_pid,
1943
0
                     slotname, restart_lsn,
1944
0
                     oldestLSN, snapshotConflictHorizon,
1945
0
                     slot_idle_secs);
1946
1947
0
        if (MyBackendType == B_STARTUP)
1948
0
          (void) SendProcSignal(active_pid,
1949
0
                      PROCSIG_RECOVERY_CONFLICT_LOGICALSLOT,
1950
0
                      INVALID_PROC_NUMBER);
1951
0
        else
1952
0
          (void) kill(active_pid, SIGTERM);
1953
1954
0
        last_signaled_pid = active_pid;
1955
0
        terminated = true;
1956
0
        invalidation_cause_prev = invalidation_cause;
1957
0
      }
1958
1959
      /* Wait until the slot is released. */
1960
0
      ConditionVariableSleep(&s->active_cv,
1961
0
                   WAIT_EVENT_REPLICATION_SLOT_DROP);
1962
1963
      /*
1964
       * Re-acquire lock and start over; we expect to invalidate the
1965
       * slot next time (unless another process acquires the slot in the
1966
       * meantime).
1967
       */
1968
0
      LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
1969
0
      continue;
1970
0
    }
1971
0
    else
1972
0
    {
1973
      /*
1974
       * We hold the slot now and have already invalidated it; flush it
1975
       * to ensure that state persists.
1976
       *
1977
       * Don't want to hold ReplicationSlotControlLock across file
1978
       * system operations, so release it now but be sure to tell caller
1979
       * to restart from scratch.
1980
       */
1981
0
      LWLockRelease(ReplicationSlotControlLock);
1982
0
      released_lock = true;
1983
1984
      /* Make sure the invalidated state persists across server restart */
1985
0
      ReplicationSlotMarkDirty();
1986
0
      ReplicationSlotSave();
1987
0
      ReplicationSlotRelease();
1988
1989
0
      ReportSlotInvalidation(invalidation_cause, false, active_pid,
1990
0
                   slotname, restart_lsn,
1991
0
                   oldestLSN, snapshotConflictHorizon,
1992
0
                   slot_idle_secs);
1993
1994
      /* done with this slot for now */
1995
0
      break;
1996
0
    }
1997
0
  }
1998
1999
0
  Assert(released_lock == !LWLockHeldByMe(ReplicationSlotControlLock));
2000
2001
0
  return released_lock;
2002
0
}
2003
2004
/*
2005
 * Invalidate slots that require resources about to be removed.
2006
 *
2007
 * Returns true when any slot have got invalidated.
2008
 *
2009
 * Whether a slot needs to be invalidated depends on the invalidation cause.
2010
 * A slot is invalidated if it:
2011
 * - RS_INVAL_WAL_REMOVED: requires a LSN older than the given segment
2012
 * - RS_INVAL_HORIZON: requires a snapshot <= the given horizon in the given
2013
 *   db; dboid may be InvalidOid for shared relations
2014
 * - RS_INVAL_WAL_LEVEL: is logical and wal_level is insufficient
2015
 * - RS_INVAL_IDLE_TIMEOUT: has been idle longer than the configured
2016
 *   "idle_replication_slot_timeout" duration.
2017
 *
2018
 * Note: This function attempts to invalidate the slot for multiple possible
2019
 * causes in a single pass, minimizing redundant iterations. The "cause"
2020
 * parameter can be a MASK representing one or more of the defined causes.
2021
 *
2022
 * NB - this runs as part of checkpoint, so avoid raising errors if possible.
2023
 */
2024
bool
2025
InvalidateObsoleteReplicationSlots(uint32 possible_causes,
2026
                   XLogSegNo oldestSegno, Oid dboid,
2027
                   TransactionId snapshotConflictHorizon)
2028
0
{
2029
0
  XLogRecPtr  oldestLSN;
2030
0
  bool    invalidated = false;
2031
2032
0
  Assert(!(possible_causes & RS_INVAL_HORIZON) || TransactionIdIsValid(snapshotConflictHorizon));
2033
0
  Assert(!(possible_causes & RS_INVAL_WAL_REMOVED) || oldestSegno > 0);
2034
0
  Assert(possible_causes != RS_INVAL_NONE);
2035
2036
0
  if (max_replication_slots == 0)
2037
0
    return invalidated;
2038
2039
0
  XLogSegNoOffsetToRecPtr(oldestSegno, 0, wal_segment_size, oldestLSN);
2040
2041
0
restart:
2042
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2043
0
  for (int i = 0; i < max_replication_slots; i++)
2044
0
  {
2045
0
    ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2046
2047
0
    if (!s->in_use)
2048
0
      continue;
2049
2050
0
    if (InvalidatePossiblyObsoleteSlot(possible_causes, s, oldestLSN, dboid,
2051
0
                       snapshotConflictHorizon,
2052
0
                       &invalidated))
2053
0
    {
2054
      /* if the lock was released, start from scratch */
2055
0
      goto restart;
2056
0
    }
2057
0
  }
2058
0
  LWLockRelease(ReplicationSlotControlLock);
2059
2060
  /*
2061
   * If any slots have been invalidated, recalculate the resource limits.
2062
   */
2063
0
  if (invalidated)
2064
0
  {
2065
0
    ReplicationSlotsComputeRequiredXmin(false);
2066
0
    ReplicationSlotsComputeRequiredLSN();
2067
0
  }
2068
2069
0
  return invalidated;
2070
0
}
2071
2072
/*
2073
 * Flush all replication slots to disk.
2074
 *
2075
 * It is convenient to flush dirty replication slots at the time of checkpoint.
2076
 * Additionally, in case of a shutdown checkpoint, we also identify the slots
2077
 * for which the confirmed_flush LSN has been updated since the last time it
2078
 * was saved and flush them.
2079
 */
2080
void
2081
CheckPointReplicationSlots(bool is_shutdown)
2082
0
{
2083
0
  int     i;
2084
2085
0
  elog(DEBUG1, "performing replication slot checkpoint");
2086
2087
  /*
2088
   * Prevent any slot from being created/dropped while we're active. As we
2089
   * explicitly do *not* want to block iterating over replication_slots or
2090
   * acquiring a slot we cannot take the control lock - but that's OK,
2091
   * because holding ReplicationSlotAllocationLock is strictly stronger, and
2092
   * enough to guarantee that nobody can change the in_use bits on us.
2093
   */
2094
0
  LWLockAcquire(ReplicationSlotAllocationLock, LW_SHARED);
2095
2096
0
  for (i = 0; i < max_replication_slots; i++)
2097
0
  {
2098
0
    ReplicationSlot *s = &ReplicationSlotCtl->replication_slots[i];
2099
0
    char    path[MAXPGPATH];
2100
2101
0
    if (!s->in_use)
2102
0
      continue;
2103
2104
    /* save the slot to disk, locking is handled in SaveSlotToPath() */
2105
0
    sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(s->data.name));
2106
2107
    /*
2108
     * Slot's data is not flushed each time the confirmed_flush LSN is
2109
     * updated as that could lead to frequent writes.  However, we decide
2110
     * to force a flush of all logical slot's data at the time of shutdown
2111
     * if the confirmed_flush LSN is changed since we last flushed it to
2112
     * disk.  This helps in avoiding an unnecessary retreat of the
2113
     * confirmed_flush LSN after restart.
2114
     */
2115
0
    if (is_shutdown && SlotIsLogical(s))
2116
0
    {
2117
0
      SpinLockAcquire(&s->mutex);
2118
2119
0
      if (s->data.invalidated == RS_INVAL_NONE &&
2120
0
        s->data.confirmed_flush > s->last_saved_confirmed_flush)
2121
0
      {
2122
0
        s->just_dirtied = true;
2123
0
        s->dirty = true;
2124
0
      }
2125
0
      SpinLockRelease(&s->mutex);
2126
0
    }
2127
2128
0
    SaveSlotToPath(s, path, LOG);
2129
0
  }
2130
0
  LWLockRelease(ReplicationSlotAllocationLock);
2131
2132
  /*
2133
   * Recompute the required LSN as SaveSlotToPath() updated
2134
   * last_saved_restart_lsn for slots.
2135
   */
2136
0
  ReplicationSlotsComputeRequiredLSN();
2137
0
}
2138
2139
/*
2140
 * Load all replication slots from disk into memory at server startup. This
2141
 * needs to be run before we start crash recovery.
2142
 */
2143
void
2144
StartupReplicationSlots(void)
2145
0
{
2146
0
  DIR      *replication_dir;
2147
0
  struct dirent *replication_de;
2148
2149
0
  elog(DEBUG1, "starting up replication slots");
2150
2151
  /* restore all slots by iterating over all on-disk entries */
2152
0
  replication_dir = AllocateDir(PG_REPLSLOT_DIR);
2153
0
  while ((replication_de = ReadDir(replication_dir, PG_REPLSLOT_DIR)) != NULL)
2154
0
  {
2155
0
    char    path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2156
0
    PGFileType  de_type;
2157
2158
0
    if (strcmp(replication_de->d_name, ".") == 0 ||
2159
0
      strcmp(replication_de->d_name, "..") == 0)
2160
0
      continue;
2161
2162
0
    snprintf(path, sizeof(path), "%s/%s", PG_REPLSLOT_DIR, replication_de->d_name);
2163
0
    de_type = get_dirent_type(path, replication_de, false, DEBUG1);
2164
2165
    /* we're only creating directories here, skip if it's not our's */
2166
0
    if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_DIR)
2167
0
      continue;
2168
2169
    /* we crashed while a slot was being setup or deleted, clean up */
2170
0
    if (pg_str_endswith(replication_de->d_name, ".tmp"))
2171
0
    {
2172
0
      if (!rmtree(path, true))
2173
0
      {
2174
0
        ereport(WARNING,
2175
0
            (errmsg("could not remove directory \"%s\"",
2176
0
                path)));
2177
0
        continue;
2178
0
      }
2179
0
      fsync_fname(PG_REPLSLOT_DIR, true);
2180
0
      continue;
2181
0
    }
2182
2183
    /* looks like a slot in a normal state, restore */
2184
0
    RestoreSlotFromDisk(replication_de->d_name);
2185
0
  }
2186
0
  FreeDir(replication_dir);
2187
2188
  /* currently no slots exist, we're done. */
2189
0
  if (max_replication_slots <= 0)
2190
0
    return;
2191
2192
  /* Now that we have recovered all the data, compute replication xmin */
2193
0
  ReplicationSlotsComputeRequiredXmin(false);
2194
0
  ReplicationSlotsComputeRequiredLSN();
2195
0
}
2196
2197
/* ----
2198
 * Manipulation of on-disk state of replication slots
2199
 *
2200
 * NB: none of the routines below should take any notice whether a slot is the
2201
 * current one or not, that's all handled a layer above.
2202
 * ----
2203
 */
2204
static void
2205
CreateSlotOnDisk(ReplicationSlot *slot)
2206
0
{
2207
0
  char    tmppath[MAXPGPATH];
2208
0
  char    path[MAXPGPATH];
2209
0
  struct stat st;
2210
2211
  /*
2212
   * No need to take out the io_in_progress_lock, nobody else can see this
2213
   * slot yet, so nobody else will write. We're reusing SaveSlotToPath which
2214
   * takes out the lock, if we'd take the lock here, we'd deadlock.
2215
   */
2216
2217
0
  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2218
0
  sprintf(tmppath, "%s/%s.tmp", PG_REPLSLOT_DIR, NameStr(slot->data.name));
2219
2220
  /*
2221
   * It's just barely possible that some previous effort to create or drop a
2222
   * slot with this name left a temp directory lying around. If that seems
2223
   * to be the case, try to remove it.  If the rmtree() fails, we'll error
2224
   * out at the MakePGDirectory() below, so we don't bother checking
2225
   * success.
2226
   */
2227
0
  if (stat(tmppath, &st) == 0 && S_ISDIR(st.st_mode))
2228
0
    rmtree(tmppath, true);
2229
2230
  /* Create and fsync the temporary slot directory. */
2231
0
  if (MakePGDirectory(tmppath) < 0)
2232
0
    ereport(ERROR,
2233
0
        (errcode_for_file_access(),
2234
0
         errmsg("could not create directory \"%s\": %m",
2235
0
            tmppath)));
2236
0
  fsync_fname(tmppath, true);
2237
2238
  /* Write the actual state file. */
2239
0
  slot->dirty = true;     /* signal that we really need to write */
2240
0
  SaveSlotToPath(slot, tmppath, ERROR);
2241
2242
  /* Rename the directory into place. */
2243
0
  if (rename(tmppath, path) != 0)
2244
0
    ereport(ERROR,
2245
0
        (errcode_for_file_access(),
2246
0
         errmsg("could not rename file \"%s\" to \"%s\": %m",
2247
0
            tmppath, path)));
2248
2249
  /*
2250
   * If we'd now fail - really unlikely - we wouldn't know whether this slot
2251
   * would persist after an OS crash or not - so, force a restart. The
2252
   * restart would try to fsync this again till it works.
2253
   */
2254
0
  START_CRIT_SECTION();
2255
2256
0
  fsync_fname(path, true);
2257
0
  fsync_fname(PG_REPLSLOT_DIR, true);
2258
2259
0
  END_CRIT_SECTION();
2260
0
}
2261
2262
/*
2263
 * Shared functionality between saving and creating a replication slot.
2264
 */
2265
static void
2266
SaveSlotToPath(ReplicationSlot *slot, const char *dir, int elevel)
2267
0
{
2268
0
  char    tmppath[MAXPGPATH];
2269
0
  char    path[MAXPGPATH];
2270
0
  int     fd;
2271
0
  ReplicationSlotOnDisk cp;
2272
0
  bool    was_dirty;
2273
2274
  /* first check whether there's something to write out */
2275
0
  SpinLockAcquire(&slot->mutex);
2276
0
  was_dirty = slot->dirty;
2277
0
  slot->just_dirtied = false;
2278
0
  SpinLockRelease(&slot->mutex);
2279
2280
  /* and don't do anything if there's nothing to write */
2281
0
  if (!was_dirty)
2282
0
    return;
2283
2284
0
  LWLockAcquire(&slot->io_in_progress_lock, LW_EXCLUSIVE);
2285
2286
  /* silence valgrind :( */
2287
0
  memset(&cp, 0, sizeof(ReplicationSlotOnDisk));
2288
2289
0
  sprintf(tmppath, "%s/state.tmp", dir);
2290
0
  sprintf(path, "%s/state", dir);
2291
2292
0
  fd = OpenTransientFile(tmppath, O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
2293
0
  if (fd < 0)
2294
0
  {
2295
    /*
2296
     * If not an ERROR, then release the lock before returning.  In case
2297
     * of an ERROR, the error recovery path automatically releases the
2298
     * lock, but no harm in explicitly releasing even in that case.  Note
2299
     * that LWLockRelease() could affect errno.
2300
     */
2301
0
    int     save_errno = errno;
2302
2303
0
    LWLockRelease(&slot->io_in_progress_lock);
2304
0
    errno = save_errno;
2305
0
    ereport(elevel,
2306
0
        (errcode_for_file_access(),
2307
0
         errmsg("could not create file \"%s\": %m",
2308
0
            tmppath)));
2309
0
    return;
2310
0
  }
2311
2312
0
  cp.magic = SLOT_MAGIC;
2313
0
  INIT_CRC32C(cp.checksum);
2314
0
  cp.version = SLOT_VERSION;
2315
0
  cp.length = ReplicationSlotOnDiskV2Size;
2316
2317
0
  SpinLockAcquire(&slot->mutex);
2318
2319
0
  memcpy(&cp.slotdata, &slot->data, sizeof(ReplicationSlotPersistentData));
2320
2321
0
  SpinLockRelease(&slot->mutex);
2322
2323
0
  COMP_CRC32C(cp.checksum,
2324
0
        (char *) (&cp) + ReplicationSlotOnDiskNotChecksummedSize,
2325
0
        ReplicationSlotOnDiskChecksummedSize);
2326
0
  FIN_CRC32C(cp.checksum);
2327
2328
0
  errno = 0;
2329
0
  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_WRITE);
2330
0
  if ((write(fd, &cp, sizeof(cp))) != sizeof(cp))
2331
0
  {
2332
0
    int     save_errno = errno;
2333
2334
0
    pgstat_report_wait_end();
2335
0
    CloseTransientFile(fd);
2336
0
    LWLockRelease(&slot->io_in_progress_lock);
2337
2338
    /* if write didn't set errno, assume problem is no disk space */
2339
0
    errno = save_errno ? save_errno : ENOSPC;
2340
0
    ereport(elevel,
2341
0
        (errcode_for_file_access(),
2342
0
         errmsg("could not write to file \"%s\": %m",
2343
0
            tmppath)));
2344
0
    return;
2345
0
  }
2346
0
  pgstat_report_wait_end();
2347
2348
  /* fsync the temporary file */
2349
0
  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_SYNC);
2350
0
  if (pg_fsync(fd) != 0)
2351
0
  {
2352
0
    int     save_errno = errno;
2353
2354
0
    pgstat_report_wait_end();
2355
0
    CloseTransientFile(fd);
2356
0
    LWLockRelease(&slot->io_in_progress_lock);
2357
0
    errno = save_errno;
2358
0
    ereport(elevel,
2359
0
        (errcode_for_file_access(),
2360
0
         errmsg("could not fsync file \"%s\": %m",
2361
0
            tmppath)));
2362
0
    return;
2363
0
  }
2364
0
  pgstat_report_wait_end();
2365
2366
0
  if (CloseTransientFile(fd) != 0)
2367
0
  {
2368
0
    int     save_errno = errno;
2369
2370
0
    LWLockRelease(&slot->io_in_progress_lock);
2371
0
    errno = save_errno;
2372
0
    ereport(elevel,
2373
0
        (errcode_for_file_access(),
2374
0
         errmsg("could not close file \"%s\": %m",
2375
0
            tmppath)));
2376
0
    return;
2377
0
  }
2378
2379
  /* rename to permanent file, fsync file and directory */
2380
0
  if (rename(tmppath, path) != 0)
2381
0
  {
2382
0
    int     save_errno = errno;
2383
2384
0
    LWLockRelease(&slot->io_in_progress_lock);
2385
0
    errno = save_errno;
2386
0
    ereport(elevel,
2387
0
        (errcode_for_file_access(),
2388
0
         errmsg("could not rename file \"%s\" to \"%s\": %m",
2389
0
            tmppath, path)));
2390
0
    return;
2391
0
  }
2392
2393
  /*
2394
   * Check CreateSlotOnDisk() for the reasoning of using a critical section.
2395
   */
2396
0
  START_CRIT_SECTION();
2397
2398
0
  fsync_fname(path, false);
2399
0
  fsync_fname(dir, true);
2400
0
  fsync_fname(PG_REPLSLOT_DIR, true);
2401
2402
0
  END_CRIT_SECTION();
2403
2404
  /*
2405
   * Successfully wrote, unset dirty bit, unless somebody dirtied again
2406
   * already and remember the confirmed_flush LSN value.
2407
   */
2408
0
  SpinLockAcquire(&slot->mutex);
2409
0
  if (!slot->just_dirtied)
2410
0
    slot->dirty = false;
2411
0
  slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2412
0
  slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2413
0
  SpinLockRelease(&slot->mutex);
2414
2415
0
  LWLockRelease(&slot->io_in_progress_lock);
2416
0
}
2417
2418
/*
2419
 * Load a single slot from disk into memory.
2420
 */
2421
static void
2422
RestoreSlotFromDisk(const char *name)
2423
0
{
2424
0
  ReplicationSlotOnDisk cp;
2425
0
  int     i;
2426
0
  char    slotdir[MAXPGPATH + sizeof(PG_REPLSLOT_DIR)];
2427
0
  char    path[MAXPGPATH + sizeof(PG_REPLSLOT_DIR) + 10];
2428
0
  int     fd;
2429
0
  bool    restored = false;
2430
0
  int     readBytes;
2431
0
  pg_crc32c checksum;
2432
0
  TimestampTz now = 0;
2433
2434
  /* no need to lock here, no concurrent access allowed yet */
2435
2436
  /* delete temp file if it exists */
2437
0
  sprintf(slotdir, "%s/%s", PG_REPLSLOT_DIR, name);
2438
0
  sprintf(path, "%s/state.tmp", slotdir);
2439
0
  if (unlink(path) < 0 && errno != ENOENT)
2440
0
    ereport(PANIC,
2441
0
        (errcode_for_file_access(),
2442
0
         errmsg("could not remove file \"%s\": %m", path)));
2443
2444
0
  sprintf(path, "%s/state", slotdir);
2445
2446
0
  elog(DEBUG1, "restoring replication slot from \"%s\"", path);
2447
2448
  /* on some operating systems fsyncing a file requires O_RDWR */
2449
0
  fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
2450
2451
  /*
2452
   * We do not need to handle this as we are rename()ing the directory into
2453
   * place only after we fsync()ed the state file.
2454
   */
2455
0
  if (fd < 0)
2456
0
    ereport(PANIC,
2457
0
        (errcode_for_file_access(),
2458
0
         errmsg("could not open file \"%s\": %m", path)));
2459
2460
  /*
2461
   * Sync state file before we're reading from it. We might have crashed
2462
   * while it wasn't synced yet and we shouldn't continue on that basis.
2463
   */
2464
0
  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_RESTORE_SYNC);
2465
0
  if (pg_fsync(fd) != 0)
2466
0
    ereport(PANIC,
2467
0
        (errcode_for_file_access(),
2468
0
         errmsg("could not fsync file \"%s\": %m",
2469
0
            path)));
2470
0
  pgstat_report_wait_end();
2471
2472
  /* Also sync the parent directory */
2473
0
  START_CRIT_SECTION();
2474
0
  fsync_fname(slotdir, true);
2475
0
  END_CRIT_SECTION();
2476
2477
  /* read part of statefile that's guaranteed to be version independent */
2478
0
  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2479
0
  readBytes = read(fd, &cp, ReplicationSlotOnDiskConstantSize);
2480
0
  pgstat_report_wait_end();
2481
0
  if (readBytes != ReplicationSlotOnDiskConstantSize)
2482
0
  {
2483
0
    if (readBytes < 0)
2484
0
      ereport(PANIC,
2485
0
          (errcode_for_file_access(),
2486
0
           errmsg("could not read file \"%s\": %m", path)));
2487
0
    else
2488
0
      ereport(PANIC,
2489
0
          (errcode(ERRCODE_DATA_CORRUPTED),
2490
0
           errmsg("could not read file \"%s\": read %d of %zu",
2491
0
              path, readBytes,
2492
0
              (Size) ReplicationSlotOnDiskConstantSize)));
2493
0
  }
2494
2495
  /* verify magic */
2496
0
  if (cp.magic != SLOT_MAGIC)
2497
0
    ereport(PANIC,
2498
0
        (errcode(ERRCODE_DATA_CORRUPTED),
2499
0
         errmsg("replication slot file \"%s\" has wrong magic number: %u instead of %u",
2500
0
            path, cp.magic, SLOT_MAGIC)));
2501
2502
  /* verify version */
2503
0
  if (cp.version != SLOT_VERSION)
2504
0
    ereport(PANIC,
2505
0
        (errcode(ERRCODE_DATA_CORRUPTED),
2506
0
         errmsg("replication slot file \"%s\" has unsupported version %u",
2507
0
            path, cp.version)));
2508
2509
  /* boundary check on length */
2510
0
  if (cp.length != ReplicationSlotOnDiskV2Size)
2511
0
    ereport(PANIC,
2512
0
        (errcode(ERRCODE_DATA_CORRUPTED),
2513
0
         errmsg("replication slot file \"%s\" has corrupted length %u",
2514
0
            path, cp.length)));
2515
2516
  /* Now that we know the size, read the entire file */
2517
0
  pgstat_report_wait_start(WAIT_EVENT_REPLICATION_SLOT_READ);
2518
0
  readBytes = read(fd,
2519
0
           (char *) &cp + ReplicationSlotOnDiskConstantSize,
2520
0
           cp.length);
2521
0
  pgstat_report_wait_end();
2522
0
  if (readBytes != cp.length)
2523
0
  {
2524
0
    if (readBytes < 0)
2525
0
      ereport(PANIC,
2526
0
          (errcode_for_file_access(),
2527
0
           errmsg("could not read file \"%s\": %m", path)));
2528
0
    else
2529
0
      ereport(PANIC,
2530
0
          (errcode(ERRCODE_DATA_CORRUPTED),
2531
0
           errmsg("could not read file \"%s\": read %d of %zu",
2532
0
              path, readBytes, (Size) cp.length)));
2533
0
  }
2534
2535
0
  if (CloseTransientFile(fd) != 0)
2536
0
    ereport(PANIC,
2537
0
        (errcode_for_file_access(),
2538
0
         errmsg("could not close file \"%s\": %m", path)));
2539
2540
  /* now verify the CRC */
2541
0
  INIT_CRC32C(checksum);
2542
0
  COMP_CRC32C(checksum,
2543
0
        (char *) &cp + ReplicationSlotOnDiskNotChecksummedSize,
2544
0
        ReplicationSlotOnDiskChecksummedSize);
2545
0
  FIN_CRC32C(checksum);
2546
2547
0
  if (!EQ_CRC32C(checksum, cp.checksum))
2548
0
    ereport(PANIC,
2549
0
        (errmsg("checksum mismatch for replication slot file \"%s\": is %u, should be %u",
2550
0
            path, checksum, cp.checksum)));
2551
2552
  /*
2553
   * If we crashed with an ephemeral slot active, don't restore but delete
2554
   * it.
2555
   */
2556
0
  if (cp.slotdata.persistency != RS_PERSISTENT)
2557
0
  {
2558
0
    if (!rmtree(slotdir, true))
2559
0
    {
2560
0
      ereport(WARNING,
2561
0
          (errmsg("could not remove directory \"%s\"",
2562
0
              slotdir)));
2563
0
    }
2564
0
    fsync_fname(PG_REPLSLOT_DIR, true);
2565
0
    return;
2566
0
  }
2567
2568
  /*
2569
   * Verify that requirements for the specific slot type are met. That's
2570
   * important because if these aren't met we're not guaranteed to retain
2571
   * all the necessary resources for the slot.
2572
   *
2573
   * NB: We have to do so *after* the above checks for ephemeral slots,
2574
   * because otherwise a slot that shouldn't exist anymore could prevent
2575
   * restarts.
2576
   *
2577
   * NB: Changing the requirements here also requires adapting
2578
   * CheckSlotRequirements() and CheckLogicalDecodingRequirements().
2579
   */
2580
0
  if (cp.slotdata.database != InvalidOid)
2581
0
  {
2582
0
    if (wal_level < WAL_LEVEL_LOGICAL)
2583
0
      ereport(FATAL,
2584
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2585
0
           errmsg("logical replication slot \"%s\" exists, but \"wal_level\" < \"logical\"",
2586
0
              NameStr(cp.slotdata.name)),
2587
0
           errhint("Change \"wal_level\" to be \"logical\" or higher.")));
2588
2589
    /*
2590
     * In standby mode, the hot standby must be enabled. This check is
2591
     * necessary to ensure logical slots are invalidated when they become
2592
     * incompatible due to insufficient wal_level. Otherwise, if the
2593
     * primary reduces wal_level < logical while hot standby is disabled,
2594
     * logical slots would remain valid even after promotion.
2595
     */
2596
0
    if (StandbyMode && !EnableHotStandby)
2597
0
      ereport(FATAL,
2598
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2599
0
           errmsg("logical replication slot \"%s\" exists on the standby, but \"hot_standby\" = \"off\"",
2600
0
              NameStr(cp.slotdata.name)),
2601
0
           errhint("Change \"hot_standby\" to be \"on\".")));
2602
0
  }
2603
0
  else if (wal_level < WAL_LEVEL_REPLICA)
2604
0
    ereport(FATAL,
2605
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2606
0
         errmsg("physical replication slot \"%s\" exists, but \"wal_level\" < \"replica\"",
2607
0
            NameStr(cp.slotdata.name)),
2608
0
         errhint("Change \"wal_level\" to be \"replica\" or higher.")));
2609
2610
  /* nothing can be active yet, don't lock anything */
2611
0
  for (i = 0; i < max_replication_slots; i++)
2612
0
  {
2613
0
    ReplicationSlot *slot;
2614
2615
0
    slot = &ReplicationSlotCtl->replication_slots[i];
2616
2617
0
    if (slot->in_use)
2618
0
      continue;
2619
2620
    /* restore the entire set of persistent data */
2621
0
    memcpy(&slot->data, &cp.slotdata,
2622
0
         sizeof(ReplicationSlotPersistentData));
2623
2624
    /* initialize in memory state */
2625
0
    slot->effective_xmin = cp.slotdata.xmin;
2626
0
    slot->effective_catalog_xmin = cp.slotdata.catalog_xmin;
2627
0
    slot->last_saved_confirmed_flush = cp.slotdata.confirmed_flush;
2628
0
    slot->last_saved_restart_lsn = cp.slotdata.restart_lsn;
2629
2630
0
    slot->candidate_catalog_xmin = InvalidTransactionId;
2631
0
    slot->candidate_xmin_lsn = InvalidXLogRecPtr;
2632
0
    slot->candidate_restart_lsn = InvalidXLogRecPtr;
2633
0
    slot->candidate_restart_valid = InvalidXLogRecPtr;
2634
2635
0
    slot->in_use = true;
2636
0
    slot->active_pid = 0;
2637
2638
    /*
2639
     * Set the time since the slot has become inactive after loading the
2640
     * slot from the disk into memory. Whoever acquires the slot i.e.
2641
     * makes the slot active will reset it. Use the same inactive_since
2642
     * time for all the slots.
2643
     */
2644
0
    if (now == 0)
2645
0
      now = GetCurrentTimestamp();
2646
2647
0
    ReplicationSlotSetInactiveSince(slot, now, false);
2648
2649
0
    restored = true;
2650
0
    break;
2651
0
  }
2652
2653
0
  if (!restored)
2654
0
    ereport(FATAL,
2655
0
        (errmsg("too many replication slots active before shutdown"),
2656
0
         errhint("Increase \"max_replication_slots\" and try again.")));
2657
0
}
2658
2659
/*
2660
 * Maps an invalidation reason for a replication slot to
2661
 * ReplicationSlotInvalidationCause.
2662
 */
2663
ReplicationSlotInvalidationCause
2664
GetSlotInvalidationCause(const char *cause_name)
2665
0
{
2666
0
  Assert(cause_name);
2667
2668
  /* Search lookup table for the cause having this name */
2669
0
  for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2670
0
  {
2671
0
    if (strcmp(SlotInvalidationCauses[i].cause_name, cause_name) == 0)
2672
0
      return SlotInvalidationCauses[i].cause;
2673
0
  }
2674
2675
0
  Assert(false);
2676
0
  return RS_INVAL_NONE;   /* to keep compiler quiet */
2677
0
}
2678
2679
/*
2680
 * Maps an ReplicationSlotInvalidationCause to the invalidation
2681
 * reason for a replication slot.
2682
 */
2683
const char *
2684
GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause)
2685
0
{
2686
  /* Search lookup table for the name of this cause */
2687
0
  for (int i = 0; i <= RS_INVAL_MAX_CAUSES; i++)
2688
0
  {
2689
0
    if (SlotInvalidationCauses[i].cause == cause)
2690
0
      return SlotInvalidationCauses[i].cause_name;
2691
0
  }
2692
2693
0
  Assert(false);
2694
0
  return "none";       /* to keep compiler quiet */
2695
0
}
2696
2697
/*
2698
 * A helper function to validate slots specified in GUC synchronized_standby_slots.
2699
 *
2700
 * The rawname will be parsed, and the result will be saved into *elemlist.
2701
 */
2702
static bool
2703
validate_sync_standby_slots(char *rawname, List **elemlist)
2704
0
{
2705
0
  bool    ok;
2706
2707
  /* Verify syntax and parse string into a list of identifiers */
2708
0
  ok = SplitIdentifierString(rawname, ',', elemlist);
2709
2710
0
  if (!ok)
2711
0
  {
2712
0
    GUC_check_errdetail("List syntax is invalid.");
2713
0
  }
2714
0
  else if (MyProc)
2715
0
  {
2716
    /*
2717
     * Check that each specified slot exist and is physical.
2718
     *
2719
     * Because we need an LWLock, we cannot do this on processes without a
2720
     * PGPROC, so we skip it there; but see comments in
2721
     * StandbySlotsHaveCaughtup() as to why that's not a problem.
2722
     */
2723
0
    LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2724
2725
0
    foreach_ptr(char, name, *elemlist)
2726
0
    {
2727
0
      ReplicationSlot *slot;
2728
2729
0
      slot = SearchNamedReplicationSlot(name, false);
2730
2731
0
      if (!slot)
2732
0
      {
2733
0
        GUC_check_errdetail("Replication slot \"%s\" does not exist.",
2734
0
                  name);
2735
0
        ok = false;
2736
0
        break;
2737
0
      }
2738
2739
0
      if (!SlotIsPhysical(slot))
2740
0
      {
2741
0
        GUC_check_errdetail("\"%s\" is not a physical replication slot.",
2742
0
                  name);
2743
0
        ok = false;
2744
0
        break;
2745
0
      }
2746
0
    }
2747
2748
0
    LWLockRelease(ReplicationSlotControlLock);
2749
0
  }
2750
2751
0
  return ok;
2752
0
}
2753
2754
/*
2755
 * GUC check_hook for synchronized_standby_slots
2756
 */
2757
bool
2758
check_synchronized_standby_slots(char **newval, void **extra, GucSource source)
2759
0
{
2760
0
  char     *rawname;
2761
0
  char     *ptr;
2762
0
  List     *elemlist;
2763
0
  int     size;
2764
0
  bool    ok;
2765
0
  SyncStandbySlotsConfigData *config;
2766
2767
0
  if ((*newval)[0] == '\0')
2768
0
    return true;
2769
2770
  /* Need a modifiable copy of the GUC string */
2771
0
  rawname = pstrdup(*newval);
2772
2773
  /* Now verify if the specified slots exist and have correct type */
2774
0
  ok = validate_sync_standby_slots(rawname, &elemlist);
2775
2776
0
  if (!ok || elemlist == NIL)
2777
0
  {
2778
0
    pfree(rawname);
2779
0
    list_free(elemlist);
2780
0
    return ok;
2781
0
  }
2782
2783
  /* Compute the size required for the SyncStandbySlotsConfigData struct */
2784
0
  size = offsetof(SyncStandbySlotsConfigData, slot_names);
2785
0
  foreach_ptr(char, slot_name, elemlist)
2786
0
    size += strlen(slot_name) + 1;
2787
2788
  /* GUC extra value must be guc_malloc'd, not palloc'd */
2789
0
  config = (SyncStandbySlotsConfigData *) guc_malloc(LOG, size);
2790
0
  if (!config)
2791
0
    return false;
2792
2793
  /* Transform the data into SyncStandbySlotsConfigData */
2794
0
  config->nslotnames = list_length(elemlist);
2795
2796
0
  ptr = config->slot_names;
2797
0
  foreach_ptr(char, slot_name, elemlist)
2798
0
  {
2799
0
    strcpy(ptr, slot_name);
2800
0
    ptr += strlen(slot_name) + 1;
2801
0
  }
2802
2803
0
  *extra = config;
2804
2805
0
  pfree(rawname);
2806
0
  list_free(elemlist);
2807
0
  return true;
2808
0
}
2809
2810
/*
2811
 * GUC assign_hook for synchronized_standby_slots
2812
 */
2813
void
2814
assign_synchronized_standby_slots(const char *newval, void *extra)
2815
0
{
2816
  /*
2817
   * The standby slots may have changed, so we must recompute the oldest
2818
   * LSN.
2819
   */
2820
0
  ss_oldest_flush_lsn = InvalidXLogRecPtr;
2821
2822
0
  synchronized_standby_slots_config = (SyncStandbySlotsConfigData *) extra;
2823
0
}
2824
2825
/*
2826
 * Check if the passed slot_name is specified in the synchronized_standby_slots GUC.
2827
 */
2828
bool
2829
SlotExistsInSyncStandbySlots(const char *slot_name)
2830
0
{
2831
0
  const char *standby_slot_name;
2832
2833
  /* Return false if there is no value in synchronized_standby_slots */
2834
0
  if (synchronized_standby_slots_config == NULL)
2835
0
    return false;
2836
2837
  /*
2838
   * XXX: We are not expecting this list to be long so a linear search
2839
   * shouldn't hurt but if that turns out not to be true then we can cache
2840
   * this information for each WalSender as well.
2841
   */
2842
0
  standby_slot_name = synchronized_standby_slots_config->slot_names;
2843
0
  for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2844
0
  {
2845
0
    if (strcmp(standby_slot_name, slot_name) == 0)
2846
0
      return true;
2847
2848
0
    standby_slot_name += strlen(standby_slot_name) + 1;
2849
0
  }
2850
2851
0
  return false;
2852
0
}
2853
2854
/*
2855
 * Return true if the slots specified in synchronized_standby_slots have caught up to
2856
 * the given WAL location, false otherwise.
2857
 *
2858
 * The elevel parameter specifies the error level used for logging messages
2859
 * related to slots that do not exist, are invalidated, or are inactive.
2860
 */
2861
bool
2862
StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel)
2863
0
{
2864
0
  const char *name;
2865
0
  int     caught_up_slot_num = 0;
2866
0
  XLogRecPtr  min_restart_lsn = InvalidXLogRecPtr;
2867
2868
  /*
2869
   * Don't need to wait for the standbys to catch up if there is no value in
2870
   * synchronized_standby_slots.
2871
   */
2872
0
  if (synchronized_standby_slots_config == NULL)
2873
0
    return true;
2874
2875
  /*
2876
   * Don't need to wait for the standbys to catch up if we are on a standby
2877
   * server, since we do not support syncing slots to cascading standbys.
2878
   */
2879
0
  if (RecoveryInProgress())
2880
0
    return true;
2881
2882
  /*
2883
   * Don't need to wait for the standbys to catch up if they are already
2884
   * beyond the specified WAL location.
2885
   */
2886
0
  if (!XLogRecPtrIsInvalid(ss_oldest_flush_lsn) &&
2887
0
    ss_oldest_flush_lsn >= wait_for_lsn)
2888
0
    return true;
2889
2890
  /*
2891
   * To prevent concurrent slot dropping and creation while filtering the
2892
   * slots, take the ReplicationSlotControlLock outside of the loop.
2893
   */
2894
0
  LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
2895
2896
0
  name = synchronized_standby_slots_config->slot_names;
2897
0
  for (int i = 0; i < synchronized_standby_slots_config->nslotnames; i++)
2898
0
  {
2899
0
    XLogRecPtr  restart_lsn;
2900
0
    bool    invalidated;
2901
0
    bool    inactive;
2902
0
    ReplicationSlot *slot;
2903
2904
0
    slot = SearchNamedReplicationSlot(name, false);
2905
2906
    /*
2907
     * If a slot name provided in synchronized_standby_slots does not
2908
     * exist, report a message and exit the loop.
2909
     *
2910
     * Though validate_sync_standby_slots (the GUC check_hook) tries to
2911
     * avoid this, it can nonetheless happen because the user can specify
2912
     * a nonexistent slot name before server startup. That function cannot
2913
     * validate such a slot during startup, as ReplicationSlotCtl is not
2914
     * initialized by then.  Also, the user might have dropped one slot.
2915
     */
2916
0
    if (!slot)
2917
0
    {
2918
0
      ereport(elevel,
2919
0
          errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2920
0
          errmsg("replication slot \"%s\" specified in parameter \"%s\" does not exist",
2921
0
               name, "synchronized_standby_slots"),
2922
0
          errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2923
0
                name),
2924
0
          errhint("Create the replication slot \"%s\" or amend parameter \"%s\".",
2925
0
              name, "synchronized_standby_slots"));
2926
0
      break;
2927
0
    }
2928
2929
    /* Same as above: if a slot is not physical, exit the loop. */
2930
0
    if (SlotIsLogical(slot))
2931
0
    {
2932
0
      ereport(elevel,
2933
0
          errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2934
0
          errmsg("cannot specify logical replication slot \"%s\" in parameter \"%s\"",
2935
0
               name, "synchronized_standby_slots"),
2936
0
          errdetail("Logical replication is waiting for correction on replication slot \"%s\".",
2937
0
                name),
2938
0
          errhint("Remove the logical replication slot \"%s\" from parameter \"%s\".",
2939
0
              name, "synchronized_standby_slots"));
2940
0
      break;
2941
0
    }
2942
2943
0
    SpinLockAcquire(&slot->mutex);
2944
0
    restart_lsn = slot->data.restart_lsn;
2945
0
    invalidated = slot->data.invalidated != RS_INVAL_NONE;
2946
0
    inactive = slot->active_pid == 0;
2947
0
    SpinLockRelease(&slot->mutex);
2948
2949
0
    if (invalidated)
2950
0
    {
2951
      /* Specified physical slot has been invalidated */
2952
0
      ereport(elevel,
2953
0
          errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2954
0
          errmsg("physical replication slot \"%s\" specified in parameter \"%s\" has been invalidated",
2955
0
               name, "synchronized_standby_slots"),
2956
0
          errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2957
0
                name),
2958
0
          errhint("Drop and recreate the replication slot \"%s\", or amend parameter \"%s\".",
2959
0
              name, "synchronized_standby_slots"));
2960
0
      break;
2961
0
    }
2962
2963
0
    if (XLogRecPtrIsInvalid(restart_lsn) || restart_lsn < wait_for_lsn)
2964
0
    {
2965
      /* Log a message if no active_pid for this physical slot */
2966
0
      if (inactive)
2967
0
        ereport(elevel,
2968
0
            errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
2969
0
            errmsg("replication slot \"%s\" specified in parameter \"%s\" does not have active_pid",
2970
0
                 name, "synchronized_standby_slots"),
2971
0
            errdetail("Logical replication is waiting on the standby associated with replication slot \"%s\".",
2972
0
                  name),
2973
0
            errhint("Start the standby associated with the replication slot \"%s\", or amend parameter \"%s\".",
2974
0
                name, "synchronized_standby_slots"));
2975
2976
      /* Continue if the current slot hasn't caught up. */
2977
0
      break;
2978
0
    }
2979
2980
0
    Assert(restart_lsn >= wait_for_lsn);
2981
2982
0
    if (XLogRecPtrIsInvalid(min_restart_lsn) ||
2983
0
      min_restart_lsn > restart_lsn)
2984
0
      min_restart_lsn = restart_lsn;
2985
2986
0
    caught_up_slot_num++;
2987
2988
0
    name += strlen(name) + 1;
2989
0
  }
2990
2991
0
  LWLockRelease(ReplicationSlotControlLock);
2992
2993
  /*
2994
   * Return false if not all the standbys have caught up to the specified
2995
   * WAL location.
2996
   */
2997
0
  if (caught_up_slot_num != synchronized_standby_slots_config->nslotnames)
2998
0
    return false;
2999
3000
  /* The ss_oldest_flush_lsn must not retreat. */
3001
0
  Assert(XLogRecPtrIsInvalid(ss_oldest_flush_lsn) ||
3002
0
       min_restart_lsn >= ss_oldest_flush_lsn);
3003
3004
0
  ss_oldest_flush_lsn = min_restart_lsn;
3005
3006
0
  return true;
3007
0
}
3008
3009
/*
3010
 * Wait for physical standbys to confirm receiving the given lsn.
3011
 *
3012
 * Used by logical decoding SQL functions. It waits for physical standbys
3013
 * corresponding to the physical slots specified in the synchronized_standby_slots GUC.
3014
 */
3015
void
3016
WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn)
3017
0
{
3018
  /*
3019
   * Don't need to wait for the standby to catch up if the current acquired
3020
   * slot is not a logical failover slot, or there is no value in
3021
   * synchronized_standby_slots.
3022
   */
3023
0
  if (!MyReplicationSlot->data.failover || !synchronized_standby_slots_config)
3024
0
    return;
3025
3026
0
  ConditionVariablePrepareToSleep(&WalSndCtl->wal_confirm_rcv_cv);
3027
3028
0
  for (;;)
3029
0
  {
3030
0
    CHECK_FOR_INTERRUPTS();
3031
3032
0
    if (ConfigReloadPending)
3033
0
    {
3034
0
      ConfigReloadPending = false;
3035
0
      ProcessConfigFile(PGC_SIGHUP);
3036
0
    }
3037
3038
    /* Exit if done waiting for every slot. */
3039
0
    if (StandbySlotsHaveCaughtup(wait_for_lsn, WARNING))
3040
0
      break;
3041
3042
    /*
3043
     * Wait for the slots in the synchronized_standby_slots to catch up,
3044
     * but use a timeout (1s) so we can also check if the
3045
     * synchronized_standby_slots has been changed.
3046
     */
3047
0
    ConditionVariableTimedSleep(&WalSndCtl->wal_confirm_rcv_cv, 1000,
3048
0
                  WAIT_EVENT_WAIT_FOR_STANDBY_CONFIRMATION);
3049
0
  }
3050
3051
0
  ConditionVariableCancelSleep();
3052
0
}
3053
3054
/*
3055
 * GUC check_hook for idle_replication_slot_timeout
3056
 *
3057
 * The value of idle_replication_slot_timeout must be set to 0 during
3058
 * a binary upgrade. See start_postmaster() in pg_upgrade for more details.
3059
 */
3060
bool
3061
check_idle_replication_slot_timeout(int *newval, void **extra, GucSource source)
3062
0
{
3063
0
  if (IsBinaryUpgrade && *newval != 0)
3064
0
  {
3065
0
    GUC_check_errdetail("\"%s\" must be set to 0 during binary upgrade mode.",
3066
0
              "idle_replication_slot_timeout");
3067
0
    return false;
3068
0
  }
3069
3070
0
  return true;
3071
0
}