Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/replication/walreceiverfuncs.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * walreceiverfuncs.c
4
 *
5
 * This file contains functions used by the startup process to communicate
6
 * with the walreceiver process. Functions implementing walreceiver itself
7
 * are in walreceiver.c.
8
 *
9
 * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
10
 *
11
 *
12
 * IDENTIFICATION
13
 *    src/backend/replication/walreceiverfuncs.c
14
 *
15
 *-------------------------------------------------------------------------
16
 */
17
#include "postgres.h"
18
19
#include <sys/stat.h>
20
#include <sys/time.h>
21
#include <time.h>
22
#include <unistd.h>
23
#include <signal.h>
24
25
#include "access/xlog_internal.h"
26
#include "access/xlogrecovery.h"
27
#include "pgstat.h"
28
#include "replication/walreceiver.h"
29
#include "storage/pmsignal.h"
30
#include "storage/proc.h"
31
#include "storage/shmem.h"
32
#include "utils/timestamp.h"
33
34
WalRcvData *WalRcv = NULL;
35
36
/*
37
 * How long to wait for walreceiver to start up after requesting
38
 * postmaster to launch it. In seconds.
39
 */
40
0
#define WALRCV_STARTUP_TIMEOUT 10
41
42
/* Report shared memory space needed by WalRcvShmemInit */
43
Size
44
WalRcvShmemSize(void)
45
0
{
46
0
  Size    size = 0;
47
48
0
  size = add_size(size, sizeof(WalRcvData));
49
50
0
  return size;
51
0
}
52
53
/* Allocate and initialize walreceiver-related shared memory */
54
void
55
WalRcvShmemInit(void)
56
0
{
57
0
  bool    found;
58
59
0
  WalRcv = (WalRcvData *)
60
0
    ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61
62
0
  if (!found)
63
0
  {
64
    /* First time through, so initialize */
65
0
    MemSet(WalRcv, 0, WalRcvShmemSize());
66
0
    WalRcv->walRcvState = WALRCV_STOPPED;
67
0
    ConditionVariableInit(&WalRcv->walRcvStoppedCV);
68
0
    SpinLockInit(&WalRcv->mutex);
69
0
    pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
70
0
    WalRcv->procno = INVALID_PROC_NUMBER;
71
0
  }
72
0
}
73
74
/* Is walreceiver running (or starting up)? */
75
bool
76
WalRcvRunning(void)
77
0
{
78
0
  WalRcvData *walrcv = WalRcv;
79
0
  WalRcvState state;
80
0
  pg_time_t startTime;
81
82
0
  SpinLockAcquire(&walrcv->mutex);
83
84
0
  state = walrcv->walRcvState;
85
0
  startTime = walrcv->startTime;
86
87
0
  SpinLockRelease(&walrcv->mutex);
88
89
  /*
90
   * If it has taken too long for walreceiver to start up, give up. Setting
91
   * the state to STOPPED ensures that if walreceiver later does start up
92
   * after all, it will see that it's not supposed to be running and die
93
   * without doing anything.
94
   */
95
0
  if (state == WALRCV_STARTING)
96
0
  {
97
0
    pg_time_t now = (pg_time_t) time(NULL);
98
99
0
    if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100
0
    {
101
0
      bool    stopped = false;
102
103
0
      SpinLockAcquire(&walrcv->mutex);
104
0
      if (walrcv->walRcvState == WALRCV_STARTING)
105
0
      {
106
0
        state = walrcv->walRcvState = WALRCV_STOPPED;
107
0
        stopped = true;
108
0
      }
109
0
      SpinLockRelease(&walrcv->mutex);
110
111
0
      if (stopped)
112
0
        ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113
0
    }
114
0
  }
115
116
0
  if (state != WALRCV_STOPPED)
117
0
    return true;
118
0
  else
119
0
    return false;
120
0
}
121
122
/*
123
 * Is walreceiver running and streaming (or at least attempting to connect,
124
 * or starting up)?
125
 */
126
bool
127
WalRcvStreaming(void)
128
0
{
129
0
  WalRcvData *walrcv = WalRcv;
130
0
  WalRcvState state;
131
0
  pg_time_t startTime;
132
133
0
  SpinLockAcquire(&walrcv->mutex);
134
135
0
  state = walrcv->walRcvState;
136
0
  startTime = walrcv->startTime;
137
138
0
  SpinLockRelease(&walrcv->mutex);
139
140
  /*
141
   * If it has taken too long for walreceiver to start up, give up. Setting
142
   * the state to STOPPED ensures that if walreceiver later does start up
143
   * after all, it will see that it's not supposed to be running and die
144
   * without doing anything.
145
   */
146
0
  if (state == WALRCV_STARTING)
147
0
  {
148
0
    pg_time_t now = (pg_time_t) time(NULL);
149
150
0
    if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
151
0
    {
152
0
      bool    stopped = false;
153
154
0
      SpinLockAcquire(&walrcv->mutex);
155
0
      if (walrcv->walRcvState == WALRCV_STARTING)
156
0
      {
157
0
        state = walrcv->walRcvState = WALRCV_STOPPED;
158
0
        stopped = true;
159
0
      }
160
0
      SpinLockRelease(&walrcv->mutex);
161
162
0
      if (stopped)
163
0
        ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
164
0
    }
165
0
  }
166
167
0
  if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
168
0
    state == WALRCV_RESTARTING)
169
0
    return true;
170
0
  else
171
0
    return false;
172
0
}
173
174
/*
175
 * Stop walreceiver (if running) and wait for it to die.
176
 * Executed by the Startup process.
177
 */
178
void
179
ShutdownWalRcv(void)
180
0
{
181
0
  WalRcvData *walrcv = WalRcv;
182
0
  pid_t   walrcvpid = 0;
183
0
  bool    stopped = false;
184
185
  /*
186
   * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
187
   * mode once it's finished, and will also request postmaster to not
188
   * restart itself.
189
   */
190
0
  SpinLockAcquire(&walrcv->mutex);
191
0
  switch (walrcv->walRcvState)
192
0
  {
193
0
    case WALRCV_STOPPED:
194
0
      break;
195
0
    case WALRCV_STARTING:
196
0
      walrcv->walRcvState = WALRCV_STOPPED;
197
0
      stopped = true;
198
0
      break;
199
200
0
    case WALRCV_STREAMING:
201
0
    case WALRCV_WAITING:
202
0
    case WALRCV_RESTARTING:
203
0
      walrcv->walRcvState = WALRCV_STOPPING;
204
      /* fall through */
205
0
    case WALRCV_STOPPING:
206
0
      walrcvpid = walrcv->pid;
207
0
      break;
208
0
  }
209
0
  SpinLockRelease(&walrcv->mutex);
210
211
  /* Unnecessary but consistent. */
212
0
  if (stopped)
213
0
    ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
214
215
  /*
216
   * Signal walreceiver process if it was still running.
217
   */
218
0
  if (walrcvpid != 0)
219
0
    kill(walrcvpid, SIGTERM);
220
221
  /*
222
   * Wait for walreceiver to acknowledge its death by setting state to
223
   * WALRCV_STOPPED.
224
   */
225
0
  ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
226
0
  while (WalRcvRunning())
227
0
    ConditionVariableSleep(&walrcv->walRcvStoppedCV,
228
0
                 WAIT_EVENT_WAL_RECEIVER_EXIT);
229
0
  ConditionVariableCancelSleep();
230
0
}
231
232
/*
233
 * Request postmaster to start walreceiver.
234
 *
235
 * "recptr" indicates the position where streaming should begin.  "conninfo"
236
 * is a libpq connection string to use.  "slotname" is, optionally, the name
237
 * of a replication slot to acquire.  "create_temp_slot" indicates to create
238
 * a temporary slot when no "slotname" is given.
239
 *
240
 * WAL receivers do not directly load GUC parameters used for the connection
241
 * to the primary, and rely on the values passed down by the caller of this
242
 * routine instead.  Hence, the addition of any new parameters should happen
243
 * through this code path.
244
 */
245
void
246
RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
247
           const char *slotname, bool create_temp_slot)
248
0
{
249
0
  WalRcvData *walrcv = WalRcv;
250
0
  bool    launch = false;
251
0
  pg_time_t now = (pg_time_t) time(NULL);
252
0
  ProcNumber  walrcv_proc;
253
254
  /*
255
   * We always start at the beginning of the segment. That prevents a broken
256
   * segment (i.e., with no records in the first half of a segment) from
257
   * being created by XLOG streaming, which might cause trouble later on if
258
   * the segment is e.g archived.
259
   */
260
0
  if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
261
0
    recptr -= XLogSegmentOffset(recptr, wal_segment_size);
262
263
0
  SpinLockAcquire(&walrcv->mutex);
264
265
  /* It better be stopped if we try to restart it */
266
0
  Assert(walrcv->walRcvState == WALRCV_STOPPED ||
267
0
       walrcv->walRcvState == WALRCV_WAITING);
268
269
0
  if (conninfo != NULL)
270
0
    strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
271
0
  else
272
0
    walrcv->conninfo[0] = '\0';
273
274
  /*
275
   * Use configured replication slot if present, and ignore the value of
276
   * create_temp_slot as the slot name should be persistent.  Otherwise, use
277
   * create_temp_slot to determine whether this WAL receiver should create a
278
   * temporary slot by itself and use it, or not.
279
   */
280
0
  if (slotname != NULL && slotname[0] != '\0')
281
0
  {
282
0
    strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
283
0
    walrcv->is_temp_slot = false;
284
0
  }
285
0
  else
286
0
  {
287
0
    walrcv->slotname[0] = '\0';
288
0
    walrcv->is_temp_slot = create_temp_slot;
289
0
  }
290
291
0
  if (walrcv->walRcvState == WALRCV_STOPPED)
292
0
  {
293
0
    launch = true;
294
0
    walrcv->walRcvState = WALRCV_STARTING;
295
0
  }
296
0
  else
297
0
    walrcv->walRcvState = WALRCV_RESTARTING;
298
0
  walrcv->startTime = now;
299
300
  /*
301
   * If this is the first startup of walreceiver (on this timeline),
302
   * initialize flushedUpto and latestChunkStart to the starting point.
303
   */
304
0
  if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
305
0
  {
306
0
    walrcv->flushedUpto = recptr;
307
0
    walrcv->receivedTLI = tli;
308
0
    walrcv->latestChunkStart = recptr;
309
0
  }
310
0
  walrcv->receiveStart = recptr;
311
0
  walrcv->receiveStartTLI = tli;
312
313
0
  walrcv_proc = walrcv->procno;
314
315
0
  SpinLockRelease(&walrcv->mutex);
316
317
0
  if (launch)
318
0
    SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
319
0
  else if (walrcv_proc != INVALID_PROC_NUMBER)
320
0
    SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
321
0
}
322
323
/*
324
 * Returns the last+1 byte position that walreceiver has flushed.
325
 *
326
 * Optionally, returns the previous chunk start, that is the first byte
327
 * written in the most recent walreceiver flush cycle.  Callers not
328
 * interested in that value may pass NULL for latestChunkStart. Same for
329
 * receiveTLI.
330
 */
331
XLogRecPtr
332
GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
333
0
{
334
0
  WalRcvData *walrcv = WalRcv;
335
0
  XLogRecPtr  recptr;
336
337
0
  SpinLockAcquire(&walrcv->mutex);
338
0
  recptr = walrcv->flushedUpto;
339
0
  if (latestChunkStart)
340
0
    *latestChunkStart = walrcv->latestChunkStart;
341
0
  if (receiveTLI)
342
0
    *receiveTLI = walrcv->receivedTLI;
343
0
  SpinLockRelease(&walrcv->mutex);
344
345
0
  return recptr;
346
0
}
347
348
/*
349
 * Returns the last+1 byte position that walreceiver has written.
350
 * This returns a recently written value without taking a lock.
351
 */
352
XLogRecPtr
353
GetWalRcvWriteRecPtr(void)
354
0
{
355
0
  WalRcvData *walrcv = WalRcv;
356
357
0
  return pg_atomic_read_u64(&walrcv->writtenUpto);
358
0
}
359
360
/*
361
 * Returns the replication apply delay in ms or -1
362
 * if the apply delay info is not available
363
 */
364
int
365
GetReplicationApplyDelay(void)
366
0
{
367
0
  WalRcvData *walrcv = WalRcv;
368
0
  XLogRecPtr  receivePtr;
369
0
  XLogRecPtr  replayPtr;
370
0
  TimestampTz chunkReplayStartTime;
371
372
0
  SpinLockAcquire(&walrcv->mutex);
373
0
  receivePtr = walrcv->flushedUpto;
374
0
  SpinLockRelease(&walrcv->mutex);
375
376
0
  replayPtr = GetXLogReplayRecPtr(NULL);
377
378
0
  if (receivePtr == replayPtr)
379
0
    return 0;
380
381
0
  chunkReplayStartTime = GetCurrentChunkReplayStartTime();
382
383
0
  if (chunkReplayStartTime == 0)
384
0
    return -1;
385
386
0
  return TimestampDifferenceMilliseconds(chunkReplayStartTime,
387
0
                       GetCurrentTimestamp());
388
0
}
389
390
/*
391
 * Returns the network latency in ms, note that this includes any
392
 * difference in clock settings between the servers, as well as timezone.
393
 */
394
int
395
GetReplicationTransferLatency(void)
396
0
{
397
0
  WalRcvData *walrcv = WalRcv;
398
0
  TimestampTz lastMsgSendTime;
399
0
  TimestampTz lastMsgReceiptTime;
400
401
0
  SpinLockAcquire(&walrcv->mutex);
402
0
  lastMsgSendTime = walrcv->lastMsgSendTime;
403
0
  lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
404
0
  SpinLockRelease(&walrcv->mutex);
405
406
0
  return TimestampDifferenceMilliseconds(lastMsgSendTime,
407
0
                       lastMsgReceiptTime);
408
0
}