Coverage Report

Created: 2025-10-09 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/include/replication/slot.h
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 * slot.h
3
 *     Replication slot management.
4
 *
5
 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
6
 *
7
 *-------------------------------------------------------------------------
8
 */
9
#ifndef SLOT_H
10
#define SLOT_H
11
12
#include "access/xlog.h"
13
#include "access/xlogreader.h"
14
#include "storage/condition_variable.h"
15
#include "storage/lwlock.h"
16
#include "storage/shmem.h"
17
#include "storage/spin.h"
18
#include "replication/walreceiver.h"
19
20
/* directory to store replication slot data in */
21
0
#define PG_REPLSLOT_DIR     "pg_replslot"
22
23
/*
24
 * The reserved name for a replication slot used to retain dead tuples for
25
 * conflict detection in logical replication. See
26
 * maybe_advance_nonremovable_xid() for detail.
27
 */
28
0
#define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
29
30
/*
31
 * Behaviour of replication slots, upon release or crash.
32
 *
33
 * Slots marked as PERSISTENT are crash-safe and will not be dropped when
34
 * released. Slots marked as EPHEMERAL will be dropped when released or after
35
 * restarts.  Slots marked TEMPORARY will be dropped at the end of a session
36
 * or on error.
37
 *
38
 * EPHEMERAL is used as a not-quite-ready state when creating persistent
39
 * slots.  EPHEMERAL slots can be made PERSISTENT by calling
40
 * ReplicationSlotPersist().  For a slot that goes away at the end of a
41
 * session, TEMPORARY is the appropriate choice.
42
 */
43
typedef enum ReplicationSlotPersistency
44
{
45
  RS_PERSISTENT,
46
  RS_EPHEMERAL,
47
  RS_TEMPORARY,
48
} ReplicationSlotPersistency;
49
50
/*
51
 * Slots can be invalidated, e.g. due to max_slot_wal_keep_size. If so, the
52
 * 'invalidated' field is set to a value other than _NONE.
53
 *
54
 * When adding a new invalidation cause here, the value must be powers of 2
55
 * (e.g., 1, 2, 4...) for proper bitwise operations. Also, remember to update
56
 * RS_INVAL_MAX_CAUSES below, and SlotInvalidationCauses in slot.c.
57
 */
58
typedef enum ReplicationSlotInvalidationCause
59
{
60
  RS_INVAL_NONE = 0,
61
  /* required WAL has been removed */
62
  RS_INVAL_WAL_REMOVED = (1 << 0),
63
  /* required rows have been removed */
64
  RS_INVAL_HORIZON = (1 << 1),
65
  /* wal_level insufficient for slot */
66
  RS_INVAL_WAL_LEVEL = (1 << 2),
67
  /* idle slot timeout has occurred */
68
  RS_INVAL_IDLE_TIMEOUT = (1 << 3),
69
} ReplicationSlotInvalidationCause;
70
71
/* Maximum number of invalidation causes */
72
0
#define RS_INVAL_MAX_CAUSES 4
73
74
/*
75
 * On-Disk data of a replication slot, preserved across restarts.
76
 */
77
typedef struct ReplicationSlotPersistentData
78
{
79
  /* The slot's identifier */
80
  NameData  name;
81
82
  /* database the slot is active on */
83
  Oid     database;
84
85
  /*
86
   * The slot's behaviour when being dropped (or restored after a crash).
87
   */
88
  ReplicationSlotPersistency persistency;
89
90
  /*
91
   * xmin horizon for data
92
   *
93
   * NB: This may represent a value that hasn't been written to disk yet;
94
   * see notes for effective_xmin, below.
95
   */
96
  TransactionId xmin;
97
98
  /*
99
   * xmin horizon for catalog tuples
100
   *
101
   * NB: This may represent a value that hasn't been written to disk yet;
102
   * see notes for effective_xmin, below.
103
   */
104
  TransactionId catalog_xmin;
105
106
  /* oldest LSN that might be required by this replication slot */
107
  XLogRecPtr  restart_lsn;
108
109
  /* RS_INVAL_NONE if valid, or the reason for having been invalidated */
110
  ReplicationSlotInvalidationCause invalidated;
111
112
  /*
113
   * Oldest LSN that the client has acked receipt for.  This is used as the
114
   * start_lsn point in case the client doesn't specify one, and also as a
115
   * safety measure to jump forwards in case the client specifies a
116
   * start_lsn that's further in the past than this value.
117
   */
118
  XLogRecPtr  confirmed_flush;
119
120
  /*
121
   * LSN at which we enabled two_phase commit for this slot or LSN at which
122
   * we found a consistent point at the time of slot creation.
123
   */
124
  XLogRecPtr  two_phase_at;
125
126
  /*
127
   * Allow decoding of prepared transactions?
128
   */
129
  bool    two_phase;
130
131
  /* plugin name */
132
  NameData  plugin;
133
134
  /*
135
   * Was this slot synchronized from the primary server?
136
   */
137
  bool    synced;
138
139
  /*
140
   * Is this a failover slot (sync candidate for standbys)? Only relevant
141
   * for logical slots on the primary server.
142
   */
143
  bool    failover;
144
} ReplicationSlotPersistentData;
145
146
/*
147
 * Shared memory state of a single replication slot.
148
 *
149
 * The in-memory data of replication slots follows a locking model based
150
 * on two linked concepts:
151
 * - A replication slot's in_use flag is switched when added or discarded using
152
 * the LWLock ReplicationSlotControlLock, which needs to be hold in exclusive
153
 * mode when updating the flag by the backend owning the slot and doing the
154
 * operation, while readers (concurrent backends not owning the slot) need
155
 * to hold it in shared mode when looking at replication slot data.
156
 * - Individual fields are protected by mutex where only the backend owning
157
 * the slot is authorized to update the fields from its own slot.  The
158
 * backend owning the slot does not need to take this lock when reading its
159
 * own fields, while concurrent backends not owning this slot should take the
160
 * lock when reading this slot's data.
161
 */
162
typedef struct ReplicationSlot
163
{
164
  /* lock, on same cacheline as effective_xmin */
165
  slock_t   mutex;
166
167
  /* is this slot defined */
168
  bool    in_use;
169
170
  /* Who is streaming out changes for this slot? 0 in unused slots. */
171
  pid_t   active_pid;
172
173
  /* any outstanding modifications? */
174
  bool    just_dirtied;
175
  bool    dirty;
176
177
  /*
178
   * For logical decoding, it's extremely important that we never remove any
179
   * data that's still needed for decoding purposes, even after a crash;
180
   * otherwise, decoding will produce wrong answers.  Ordinary streaming
181
   * replication also needs to prevent old row versions from being removed
182
   * too soon, but the worst consequence we might encounter there is
183
   * unwanted query cancellations on the standby.  Thus, for logical
184
   * decoding, this value represents the latest xmin that has actually been
185
   * written to disk, whereas for streaming replication, it's just the same
186
   * as the persistent value (data.xmin).
187
   */
188
  TransactionId effective_xmin;
189
  TransactionId effective_catalog_xmin;
190
191
  /* data surviving shutdowns and crashes */
192
  ReplicationSlotPersistentData data;
193
194
  /* is somebody performing io on this slot? */
195
  LWLock    io_in_progress_lock;
196
197
  /* Condition variable signaled when active_pid changes */
198
  ConditionVariable active_cv;
199
200
  /* all the remaining data is only used for logical slots */
201
202
  /*
203
   * When the client has confirmed flushes >= candidate_xmin_lsn we can
204
   * advance the catalog xmin.  When restart_valid has been passed,
205
   * restart_lsn can be increased.
206
   */
207
  TransactionId candidate_catalog_xmin;
208
  XLogRecPtr  candidate_xmin_lsn;
209
  XLogRecPtr  candidate_restart_valid;
210
  XLogRecPtr  candidate_restart_lsn;
211
212
  /*
213
   * This value tracks the last confirmed_flush LSN flushed which is used
214
   * during a shutdown checkpoint to decide if logical's slot data should be
215
   * forcibly flushed or not.
216
   */
217
  XLogRecPtr  last_saved_confirmed_flush;
218
219
  /*
220
   * The time when the slot became inactive. For synced slots on a standby
221
   * server, it represents the time when slot synchronization was most
222
   * recently stopped.
223
   */
224
  TimestampTz inactive_since;
225
226
  /*
227
   * Latest restart_lsn that has been flushed to disk. For persistent slots
228
   * the flushed LSN should be taken into account when calculating the
229
   * oldest LSN for WAL segments removal.
230
   *
231
   * Do not assume that restart_lsn will always move forward, i.e., that the
232
   * previously flushed restart_lsn is always behind data.restart_lsn. In
233
   * streaming replication using a physical slot, the restart_lsn is updated
234
   * based on the flushed WAL position reported by the walreceiver.
235
   *
236
   * This replication mode allows duplicate WAL records to be received and
237
   * overwritten. If the walreceiver receives older WAL records and then
238
   * reports them as flushed to the walsender, the restart_lsn may appear to
239
   * move backward.
240
   *
241
   * This typically occurs at the beginning of replication. One reason is
242
   * that streaming replication starts at the beginning of a segment, so, if
243
   * restart_lsn is in the middle of a segment, it will be updated to an
244
   * earlier LSN, see RequestXLogStreaming. Another reason is that the
245
   * walreceiver chooses its startpoint based on the replayed LSN, so, if
246
   * some records have been received but not yet applied, they will be
247
   * received again and leads to updating the restart_lsn to an earlier
248
   * position.
249
   */
250
  XLogRecPtr  last_saved_restart_lsn;
251
252
} ReplicationSlot;
253
254
0
#define SlotIsPhysical(slot) ((slot)->data.database == InvalidOid)
255
0
#define SlotIsLogical(slot) ((slot)->data.database != InvalidOid)
256
257
/*
258
 * Shared memory control area for all of replication slots.
259
 */
260
typedef struct ReplicationSlotCtlData
261
{
262
  /*
263
   * This array should be declared [FLEXIBLE_ARRAY_MEMBER], but for some
264
   * reason you can't do that in an otherwise-empty struct.
265
   */
266
  ReplicationSlot replication_slots[1];
267
} ReplicationSlotCtlData;
268
269
/*
270
 * Set slot's inactive_since property unless it was previously invalidated.
271
 */
272
static inline void
273
ReplicationSlotSetInactiveSince(ReplicationSlot *s, TimestampTz ts,
274
                bool acquire_lock)
275
0
{
276
0
  if (acquire_lock)
277
0
    SpinLockAcquire(&s->mutex);
278
279
0
  if (s->data.invalidated == RS_INVAL_NONE)
280
0
    s->inactive_since = ts;
281
282
0
  if (acquire_lock)
283
0
    SpinLockRelease(&s->mutex);
284
0
}
Unexecuted instantiation: rewriteheap.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: rmgr.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: xact.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: xlog.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: xlogrecovery.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: dbcommands.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: subscriptioncmds.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: decode.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: launcher.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: logical.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: logicalfuncs.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: origin.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: reorderbuffer.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: slotsync.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: snapbuild.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: tablesync.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: worker.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: slot.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: slotfuncs.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: walsender.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: basebackup.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: ipci.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: standby.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: postgres.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: pgstat_replslot.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: genfile.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: pg_upgrade_support.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: postinit.c:ReplicationSlotSetInactiveSince
Unexecuted instantiation: guc_tables.c:ReplicationSlotSetInactiveSince
285
286
/*
287
 * Pointers to shared memory
288
 */
289
extern PGDLLIMPORT ReplicationSlotCtlData *ReplicationSlotCtl;
290
extern PGDLLIMPORT ReplicationSlot *MyReplicationSlot;
291
292
/* GUCs */
293
extern PGDLLIMPORT int max_replication_slots;
294
extern PGDLLIMPORT char *synchronized_standby_slots;
295
extern PGDLLIMPORT int idle_replication_slot_timeout_secs;
296
297
/* shmem initialization functions */
298
extern Size ReplicationSlotsShmemSize(void);
299
extern void ReplicationSlotsShmemInit(void);
300
301
/* management of individual slots */
302
extern void ReplicationSlotCreate(const char *name, bool db_specific,
303
                  ReplicationSlotPersistency persistency,
304
                  bool two_phase, bool failover,
305
                  bool synced);
306
extern void ReplicationSlotPersist(void);
307
extern void ReplicationSlotDrop(const char *name, bool nowait);
308
extern void ReplicationSlotDropAcquired(void);
309
extern void ReplicationSlotAlter(const char *name, const bool *failover,
310
                 const bool *two_phase);
311
312
extern void ReplicationSlotAcquire(const char *name, bool nowait,
313
                   bool error_if_invalid);
314
extern void ReplicationSlotRelease(void);
315
extern void ReplicationSlotCleanup(bool synced_only);
316
extern void ReplicationSlotSave(void);
317
extern void ReplicationSlotMarkDirty(void);
318
319
/* misc stuff */
320
extern void ReplicationSlotInitialize(void);
321
extern bool ReplicationSlotValidateName(const char *name,
322
                    bool allow_reserved_name,
323
                    int elevel);
324
extern void ReplicationSlotReserveWal(void);
325
extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
326
extern void ReplicationSlotsComputeRequiredLSN(void);
327
extern XLogRecPtr ReplicationSlotsComputeLogicalRestartLSN(void);
328
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
329
extern void ReplicationSlotsDropDBSlots(Oid dboid);
330
extern bool InvalidateObsoleteReplicationSlots(uint32 possible_causes,
331
                         XLogSegNo oldestSegno,
332
                         Oid dboid,
333
                         TransactionId snapshotConflictHorizon);
334
extern ReplicationSlot *SearchNamedReplicationSlot(const char *name, bool need_lock);
335
extern int  ReplicationSlotIndex(ReplicationSlot *slot);
336
extern bool ReplicationSlotName(int index, Name name);
337
extern void ReplicationSlotNameForTablesync(Oid suboid, Oid relid, char *syncslotname, Size szslot);
338
extern void ReplicationSlotDropAtPubNode(WalReceiverConn *wrconn, char *slotname, bool missing_ok);
339
340
extern void StartupReplicationSlots(void);
341
extern void CheckPointReplicationSlots(bool is_shutdown);
342
343
extern void CheckSlotRequirements(void);
344
extern void CheckSlotPermissions(void);
345
extern ReplicationSlotInvalidationCause
346
      GetSlotInvalidationCause(const char *cause_name);
347
extern const char *GetSlotInvalidationCauseName(ReplicationSlotInvalidationCause cause);
348
349
extern bool SlotExistsInSyncStandbySlots(const char *slot_name);
350
extern bool StandbySlotsHaveCaughtup(XLogRecPtr wait_for_lsn, int elevel);
351
extern void WaitForStandbyConfirmation(XLogRecPtr wait_for_lsn);
352
353
#endif              /* SLOT_H */