Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/include/replication/walreceiver.h
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * walreceiver.h
4
 *    Exports from replication/walreceiverfuncs.c.
5
 *
6
 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
7
 *
8
 * src/include/replication/walreceiver.h
9
 *
10
 *-------------------------------------------------------------------------
11
 */
12
#ifndef _WALRECEIVER_H
13
#define _WALRECEIVER_H
14
15
#include <netdb.h>
16
17
#include "access/xlog.h"
18
#include "access/xlogdefs.h"
19
#include "pgtime.h"
20
#include "port/atomics.h"
21
#include "replication/logicalproto.h"
22
#include "replication/walsender.h"
23
#include "storage/condition_variable.h"
24
#include "storage/spin.h"
25
#include "utils/tuplestore.h"
26
27
/* user-settable parameters */
28
extern PGDLLIMPORT int wal_receiver_status_interval;
29
extern PGDLLIMPORT int wal_receiver_timeout;
30
extern PGDLLIMPORT bool hot_standby_feedback;
31
32
/*
33
 * MAXCONNINFO: maximum size of a connection string.
34
 *
35
 * XXX: Should this move to pg_config_manual.h?
36
 */
37
0
#define MAXCONNINFO   1024
38
39
/* Can we allow the standby to accept replication connection from another standby? */
40
0
#define AllowCascadeReplication() (EnableHotStandby && max_wal_senders > 0)
41
42
/*
43
 * Values for WalRcv->walRcvState.
44
 */
45
typedef enum
46
{
47
  WALRCV_STOPPED,       /* stopped and mustn't start up again */
48
  WALRCV_STARTING,      /* launched, but the process hasn't
49
                 * initialized yet */
50
  WALRCV_STREAMING,     /* walreceiver is streaming */
51
  WALRCV_WAITING,       /* stopped streaming, waiting for orders */
52
  WALRCV_RESTARTING,      /* asked to restart streaming */
53
  WALRCV_STOPPING,      /* requested to stop, but still running */
54
} WalRcvState;
55
56
/* Shared memory area for management of walreceiver process */
57
typedef struct
58
{
59
  /*
60
   * Currently active walreceiver process's proc number and PID.
61
   *
62
   * The startup process uses the proc number to wake it up after telling it
63
   * where to start streaming (after setting receiveStart and
64
   * receiveStartTLI), and also to tell it to send apply feedback to the
65
   * primary whenever specially marked commit records are applied.
66
   */
67
  ProcNumber  procno;
68
  pid_t   pid;
69
70
  /* Its current state */
71
  WalRcvState walRcvState;
72
  ConditionVariable walRcvStoppedCV;
73
74
  /*
75
   * Its start time (actually, the time at which it was requested to be
76
   * started).
77
   */
78
  pg_time_t startTime;
79
80
  /*
81
   * receiveStart and receiveStartTLI indicate the first byte position and
82
   * timeline that will be received. When startup process starts the
83
   * walreceiver, it sets these to the point where it wants the streaming to
84
   * begin.
85
   */
86
  XLogRecPtr  receiveStart;
87
  TimeLineID  receiveStartTLI;
88
89
  /*
90
   * flushedUpto-1 is the last byte position that has already been received,
91
   * and receivedTLI is the timeline it came from.  At the first startup of
92
   * walreceiver, these are set to receiveStart and receiveStartTLI. After
93
   * that, walreceiver updates these whenever it flushes the received WAL to
94
   * disk.
95
   */
96
  XLogRecPtr  flushedUpto;
97
  TimeLineID  receivedTLI;
98
99
  /*
100
   * latestChunkStart is the starting byte position of the current "batch"
101
   * of received WAL.  It's actually the same as the previous value of
102
   * flushedUpto before the last flush to disk.  Startup process can use
103
   * this to detect whether it's keeping up or not.
104
   */
105
  XLogRecPtr  latestChunkStart;
106
107
  /*
108
   * Time of send and receive of any message received.
109
   */
110
  TimestampTz lastMsgSendTime;
111
  TimestampTz lastMsgReceiptTime;
112
113
  /*
114
   * Latest reported end of WAL on the sender
115
   */
116
  XLogRecPtr  latestWalEnd;
117
  TimestampTz latestWalEndTime;
118
119
  /*
120
   * connection string; initially set to connect to the primary, and later
121
   * clobbered to hide security-sensitive fields.
122
   */
123
  char    conninfo[MAXCONNINFO];
124
125
  /*
126
   * Host name (this can be a host name, an IP address, or a directory path)
127
   * and port number of the active replication connection.
128
   */
129
  char    sender_host[NI_MAXHOST];
130
  int     sender_port;
131
132
  /*
133
   * replication slot name; is also used for walreceiver to connect with the
134
   * primary
135
   */
136
  char    slotname[NAMEDATALEN];
137
138
  /*
139
   * If it's a temporary replication slot, it needs to be recreated when
140
   * connecting.
141
   */
142
  bool    is_temp_slot;
143
144
  /* set true once conninfo is ready to display (obfuscated pwds etc) */
145
  bool    ready_to_display;
146
147
  slock_t   mutex;      /* locks shared variables shown above */
148
149
  /*
150
   * Like flushedUpto, but advanced after writing and before flushing,
151
   * without the need to acquire the spin lock.  Data can be read by another
152
   * process up to this point, but shouldn't be used for data integrity
153
   * purposes.
154
   */
155
  pg_atomic_uint64 writtenUpto;
156
157
  /*
158
   * force walreceiver reply?  This doesn't need to be locked; memory
159
   * barriers for ordering are sufficient.  But we do need atomic fetch and
160
   * store semantics, so use sig_atomic_t.
161
   */
162
  sig_atomic_t force_reply; /* used as a bool */
163
} WalRcvData;
164
165
extern PGDLLIMPORT WalRcvData *WalRcv;
166
167
typedef struct
168
{
169
  bool    logical;    /* True if this is logical replication stream,
170
                 * false if physical stream.  */
171
  char     *slotname;   /* Name of the replication slot or NULL. */
172
  XLogRecPtr  startpoint;   /* LSN of starting point. */
173
174
  union
175
  {
176
    struct
177
    {
178
      TimeLineID  startpointTLI;  /* Starting timeline */
179
    }     physical;
180
    struct
181
    {
182
      uint32    proto_version;  /* Logical protocol version */
183
      List     *publication_names;  /* String list of publications */
184
      bool    binary; /* Ask publisher to use binary */
185
      char     *streaming_str;  /* Streaming of large transactions */
186
      bool    twophase; /* Streaming of two-phase transactions at
187
                   * prepare time */
188
      char     *origin; /* Only publish data originating from the
189
                 * specified origin */
190
    }     logical;
191
  }     proto;
192
} WalRcvStreamOptions;
193
194
struct WalReceiverConn;
195
typedef struct WalReceiverConn WalReceiverConn;
196
197
/*
198
 * Status of walreceiver query execution.
199
 *
200
 * We only define statuses that are currently used.
201
 */
202
typedef enum
203
{
204
  WALRCV_ERROR,       /* There was error when executing the query. */
205
  WALRCV_OK_COMMAND,      /* Query executed utility or replication
206
                 * command. */
207
  WALRCV_OK_TUPLES,     /* Query returned tuples. */
208
  WALRCV_OK_COPY_IN,      /* Query started COPY FROM. */
209
  WALRCV_OK_COPY_OUT,     /* Query started COPY TO. */
210
  WALRCV_OK_COPY_BOTH,    /* Query started COPY BOTH replication
211
                 * protocol. */
212
} WalRcvExecStatus;
213
214
/*
215
 * Return value for walrcv_exec, returns the status of the execution and
216
 * tuples if any.
217
 */
218
typedef struct WalRcvExecResult
219
{
220
  WalRcvExecStatus status;
221
  int     sqlstate;
222
  char     *err;
223
  Tuplestorestate *tuplestore;
224
  TupleDesc tupledesc;
225
} WalRcvExecResult;
226
227
/* WAL receiver - libpqwalreceiver hooks */
228
229
/*
230
 * walrcv_connect_fn
231
 *
232
 * Establish connection to a cluster.  'replication' is true if the
233
 * connection is a replication connection, and false if it is a
234
 * regular connection.  If it is a replication connection, it could
235
 * be either logical or physical based on input argument 'logical'.
236
 * 'appname' is a name associated to the connection, to use for example
237
 * with fallback_application_name or application_name.  Returns the
238
 * details about the connection established, as defined by
239
 * WalReceiverConn for each WAL receiver module.  On error, NULL is
240
 * returned with 'err' including the error generated.
241
 */
242
typedef WalReceiverConn *(*walrcv_connect_fn) (const char *conninfo,
243
                         bool replication,
244
                         bool logical,
245
                         bool must_use_password,
246
                         const char *appname,
247
                         char **err);
248
249
/*
250
 * walrcv_check_conninfo_fn
251
 *
252
 * Parse and validate the connection string given as of 'conninfo'.
253
 */
254
typedef void (*walrcv_check_conninfo_fn) (const char *conninfo,
255
                      bool must_use_password);
256
257
/*
258
 * walrcv_get_conninfo_fn
259
 *
260
 * Returns a user-displayable conninfo string.  Note that any
261
 * security-sensitive fields should be obfuscated.
262
 */
263
typedef char *(*walrcv_get_conninfo_fn) (WalReceiverConn *conn);
264
265
/*
266
 * walrcv_get_senderinfo_fn
267
 *
268
 * Provide information of the WAL sender this WAL receiver is connected
269
 * to, as of 'sender_host' for the host of the sender and 'sender_port'
270
 * for its port.
271
 */
272
typedef void (*walrcv_get_senderinfo_fn) (WalReceiverConn *conn,
273
                      char **sender_host,
274
                      int *sender_port);
275
276
/*
277
 * walrcv_identify_system_fn
278
 *
279
 * Run IDENTIFY_SYSTEM on the cluster connected to and validate the
280
 * identity of the cluster.  Returns the system ID of the cluster
281
 * connected to.  'primary_tli' is the timeline ID of the sender.
282
 */
283
typedef char *(*walrcv_identify_system_fn) (WalReceiverConn *conn,
284
                      TimeLineID *primary_tli);
285
286
/*
287
 * walrcv_get_dbname_from_conninfo_fn
288
 *
289
 * Returns the database name from the primary_conninfo
290
 */
291
typedef char *(*walrcv_get_dbname_from_conninfo_fn) (const char *conninfo);
292
293
/*
294
 * walrcv_server_version_fn
295
 *
296
 * Returns the version number of the cluster connected to.
297
 */
298
typedef int (*walrcv_server_version_fn) (WalReceiverConn *conn);
299
300
/*
301
 * walrcv_readtimelinehistoryfile_fn
302
 *
303
 * Fetch from cluster the timeline history file for timeline 'tli'.
304
 * Returns the name of the timeline history file as of 'filename', its
305
 * contents as of 'content' and its 'size'.
306
 */
307
typedef void (*walrcv_readtimelinehistoryfile_fn) (WalReceiverConn *conn,
308
                           TimeLineID tli,
309
                           char **filename,
310
                           char **content,
311
                           int *size);
312
313
/*
314
 * walrcv_startstreaming_fn
315
 *
316
 * Start streaming WAL data from given streaming options.  Returns true
317
 * if the connection has switched successfully to copy-both mode and false
318
 * if the server received the command and executed it successfully, but
319
 * didn't switch to copy-mode.
320
 */
321
typedef bool (*walrcv_startstreaming_fn) (WalReceiverConn *conn,
322
                      const WalRcvStreamOptions *options);
323
324
/*
325
 * walrcv_endstreaming_fn
326
 *
327
 * Stop streaming of WAL data.  Returns the next timeline ID of the cluster
328
 * connected to in 'next_tli', or 0 if there was no report.
329
 */
330
typedef void (*walrcv_endstreaming_fn) (WalReceiverConn *conn,
331
                    TimeLineID *next_tli);
332
333
/*
334
 * walrcv_receive_fn
335
 *
336
 * Receive a message available from the WAL stream.  'buffer' is a pointer
337
 * to a buffer holding the message received.  Returns the length of the data,
338
 * 0 if no data is available yet ('wait_fd' is a socket descriptor which can
339
 * be waited on before a retry), and -1 if the cluster ended the COPY.
340
 */
341
typedef int (*walrcv_receive_fn) (WalReceiverConn *conn,
342
                  char **buffer,
343
                  pgsocket *wait_fd);
344
345
/*
346
 * walrcv_send_fn
347
 *
348
 * Send a message of size 'nbytes' to the WAL stream with 'buffer' as
349
 * contents.
350
 */
351
typedef void (*walrcv_send_fn) (WalReceiverConn *conn,
352
                const char *buffer,
353
                int nbytes);
354
355
/*
356
 * walrcv_create_slot_fn
357
 *
358
 * Create a new replication slot named 'slotname'.  'temporary' defines
359
 * if the slot is temporary.  'snapshot_action' defines the behavior wanted
360
 * for an exported snapshot (see replication protocol for more details).
361
 * 'lsn' includes the LSN position at which the created slot became
362
 * consistent.  Returns the name of the exported snapshot for a logical
363
 * slot, or NULL for a physical slot.
364
 */
365
typedef char *(*walrcv_create_slot_fn) (WalReceiverConn *conn,
366
                    const char *slotname,
367
                    bool temporary,
368
                    bool two_phase,
369
                    bool failover,
370
                    CRSSnapshotAction snapshot_action,
371
                    XLogRecPtr *lsn);
372
373
/*
374
 * walrcv_alter_slot_fn
375
 *
376
 * Change the definition of a replication slot. Currently, it supports
377
 * changing the failover and two_phase properties of the slot.
378
 */
379
typedef void (*walrcv_alter_slot_fn) (WalReceiverConn *conn,
380
                    const char *slotname,
381
                    const bool *failover,
382
                    const bool *two_phase);
383
384
385
/*
386
 * walrcv_get_backend_pid_fn
387
 *
388
 * Returns the PID of the remote backend process.
389
 */
390
typedef pid_t (*walrcv_get_backend_pid_fn) (WalReceiverConn *conn);
391
392
/*
393
 * walrcv_exec_fn
394
 *
395
 * Send generic queries (and commands) to the remote cluster.  'nRetTypes'
396
 * is the expected number of returned attributes, and 'retTypes' an array
397
 * including their type OIDs.  Returns the status of the execution and
398
 * tuples if any.
399
 */
400
typedef WalRcvExecResult *(*walrcv_exec_fn) (WalReceiverConn *conn,
401
                       const char *query,
402
                       const int nRetTypes,
403
                       const Oid *retTypes);
404
405
/*
406
 * walrcv_disconnect_fn
407
 *
408
 * Disconnect with the cluster.
409
 */
410
typedef void (*walrcv_disconnect_fn) (WalReceiverConn *conn);
411
412
typedef struct WalReceiverFunctionsType
413
{
414
  walrcv_connect_fn walrcv_connect;
415
  walrcv_check_conninfo_fn walrcv_check_conninfo;
416
  walrcv_get_conninfo_fn walrcv_get_conninfo;
417
  walrcv_get_senderinfo_fn walrcv_get_senderinfo;
418
  walrcv_identify_system_fn walrcv_identify_system;
419
  walrcv_get_dbname_from_conninfo_fn walrcv_get_dbname_from_conninfo;
420
  walrcv_server_version_fn walrcv_server_version;
421
  walrcv_readtimelinehistoryfile_fn walrcv_readtimelinehistoryfile;
422
  walrcv_startstreaming_fn walrcv_startstreaming;
423
  walrcv_endstreaming_fn walrcv_endstreaming;
424
  walrcv_receive_fn walrcv_receive;
425
  walrcv_send_fn walrcv_send;
426
  walrcv_create_slot_fn walrcv_create_slot;
427
  walrcv_alter_slot_fn walrcv_alter_slot;
428
  walrcv_get_backend_pid_fn walrcv_get_backend_pid;
429
  walrcv_exec_fn walrcv_exec;
430
  walrcv_disconnect_fn walrcv_disconnect;
431
} WalReceiverFunctionsType;
432
433
extern PGDLLIMPORT WalReceiverFunctionsType *WalReceiverFunctions;
434
435
#define walrcv_connect(conninfo, replication, logical, must_use_password, appname, err) \
436
0
  WalReceiverFunctions->walrcv_connect(conninfo, replication, logical, must_use_password, appname, err)
437
#define walrcv_check_conninfo(conninfo, must_use_password) \
438
0
  WalReceiverFunctions->walrcv_check_conninfo(conninfo, must_use_password)
439
#define walrcv_get_conninfo(conn) \
440
0
  WalReceiverFunctions->walrcv_get_conninfo(conn)
441
#define walrcv_get_senderinfo(conn, sender_host, sender_port) \
442
0
  WalReceiverFunctions->walrcv_get_senderinfo(conn, sender_host, sender_port)
443
#define walrcv_identify_system(conn, primary_tli) \
444
0
  WalReceiverFunctions->walrcv_identify_system(conn, primary_tli)
445
#define walrcv_get_dbname_from_conninfo(conninfo) \
446
0
  WalReceiverFunctions->walrcv_get_dbname_from_conninfo(conninfo)
447
#define walrcv_server_version(conn) \
448
0
  WalReceiverFunctions->walrcv_server_version(conn)
449
#define walrcv_readtimelinehistoryfile(conn, tli, filename, content, size) \
450
0
  WalReceiverFunctions->walrcv_readtimelinehistoryfile(conn, tli, filename, content, size)
451
#define walrcv_startstreaming(conn, options) \
452
0
  WalReceiverFunctions->walrcv_startstreaming(conn, options)
453
#define walrcv_endstreaming(conn, next_tli) \
454
0
  WalReceiverFunctions->walrcv_endstreaming(conn, next_tli)
455
#define walrcv_receive(conn, buffer, wait_fd) \
456
0
  WalReceiverFunctions->walrcv_receive(conn, buffer, wait_fd)
457
#define walrcv_send(conn, buffer, nbytes) \
458
0
  WalReceiverFunctions->walrcv_send(conn, buffer, nbytes)
459
#define walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn) \
460
0
  WalReceiverFunctions->walrcv_create_slot(conn, slotname, temporary, two_phase, failover, snapshot_action, lsn)
461
#define walrcv_alter_slot(conn, slotname, failover, two_phase) \
462
0
  WalReceiverFunctions->walrcv_alter_slot(conn, slotname, failover, two_phase)
463
#define walrcv_get_backend_pid(conn) \
464
0
  WalReceiverFunctions->walrcv_get_backend_pid(conn)
465
#define walrcv_exec(conn, exec, nRetTypes, retTypes) \
466
0
  WalReceiverFunctions->walrcv_exec(conn, exec, nRetTypes, retTypes)
467
#define walrcv_disconnect(conn) \
468
0
  WalReceiverFunctions->walrcv_disconnect(conn)
469
470
static inline void
471
walrcv_clear_result(WalRcvExecResult *walres)
472
0
{
473
0
  if (!walres)
474
0
    return;
475
476
0
  if (walres->err)
477
0
    pfree(walres->err);
478
479
0
  if (walres->tuplestore)
480
0
    tuplestore_end(walres->tuplestore);
481
482
0
  if (walres->tupledesc)
483
0
    FreeTupleDesc(walres->tupledesc);
484
485
0
  pfree(walres);
486
0
}
Unexecuted instantiation: rewriteheap.c:walrcv_clear_result
Unexecuted instantiation: rmgr.c:walrcv_clear_result
Unexecuted instantiation: xact.c:walrcv_clear_result
Unexecuted instantiation: xlog.c:walrcv_clear_result
Unexecuted instantiation: xlogfuncs.c:walrcv_clear_result
Unexecuted instantiation: xlogrecovery.c:walrcv_clear_result
Unexecuted instantiation: dbcommands.c:walrcv_clear_result
Unexecuted instantiation: subscriptioncmds.c:walrcv_clear_result
Unexecuted instantiation: launch_backend.c:walrcv_clear_result
Unexecuted instantiation: postmaster.c:walrcv_clear_result
Unexecuted instantiation: walsummarizer.c:walrcv_clear_result
Unexecuted instantiation: applyparallelworker.c:walrcv_clear_result
Unexecuted instantiation: conflict.c:walrcv_clear_result
Unexecuted instantiation: decode.c:walrcv_clear_result
Unexecuted instantiation: launcher.c:walrcv_clear_result
Unexecuted instantiation: logical.c:walrcv_clear_result
Unexecuted instantiation: logicalfuncs.c:walrcv_clear_result
Unexecuted instantiation: origin.c:walrcv_clear_result
Unexecuted instantiation: relation.c:walrcv_clear_result
Unexecuted instantiation: reorderbuffer.c:walrcv_clear_result
Unexecuted instantiation: slotsync.c:walrcv_clear_result
Unexecuted instantiation: snapbuild.c:walrcv_clear_result
Unexecuted instantiation: tablesync.c:walrcv_clear_result
Unexecuted instantiation: worker.c:walrcv_clear_result
Unexecuted instantiation: slot.c:walrcv_clear_result
Unexecuted instantiation: slotfuncs.c:walrcv_clear_result
Unexecuted instantiation: walreceiver.c:walrcv_clear_result
Unexecuted instantiation: walreceiverfuncs.c:walrcv_clear_result
Unexecuted instantiation: walsender.c:walrcv_clear_result
Unexecuted instantiation: basebackup.c:walrcv_clear_result
Unexecuted instantiation: ipci.c:walrcv_clear_result
Unexecuted instantiation: standby.c:walrcv_clear_result
Unexecuted instantiation: proc.c:walrcv_clear_result
Unexecuted instantiation: postgres.c:walrcv_clear_result
Unexecuted instantiation: pgstat_replslot.c:walrcv_clear_result
Unexecuted instantiation: genfile.c:walrcv_clear_result
Unexecuted instantiation: pg_upgrade_support.c:walrcv_clear_result
Unexecuted instantiation: miscinit.c:walrcv_clear_result
Unexecuted instantiation: postinit.c:walrcv_clear_result
Unexecuted instantiation: guc_tables.c:walrcv_clear_result
487
488
/* prototypes for functions in walreceiver.c */
489
pg_noreturn extern void WalReceiverMain(const void *startup_data, size_t startup_data_len);
490
extern void WalRcvForceReply(void);
491
492
/* prototypes for functions in walreceiverfuncs.c */
493
extern Size WalRcvShmemSize(void);
494
extern void WalRcvShmemInit(void);
495
extern void ShutdownWalRcv(void);
496
extern bool WalRcvStreaming(void);
497
extern bool WalRcvRunning(void);
498
extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr,
499
                 const char *conninfo, const char *slotname,
500
                 bool create_temp_slot);
501
extern XLogRecPtr GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
502
extern XLogRecPtr GetWalRcvWriteRecPtr(void);
503
extern int  GetReplicationApplyDelay(void);
504
extern int  GetReplicationTransferLatency(void);
505
506
#endif              /* _WALRECEIVER_H */