/src/postgres/src/backend/replication/walreceiverfuncs.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * walreceiverfuncs.c |
4 | | * |
5 | | * This file contains functions used by the startup process to communicate |
6 | | * with the walreceiver process. Functions implementing walreceiver itself |
7 | | * are in walreceiver.c. |
8 | | * |
9 | | * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group |
10 | | * |
11 | | * |
12 | | * IDENTIFICATION |
13 | | * src/backend/replication/walreceiverfuncs.c |
14 | | * |
15 | | *------------------------------------------------------------------------- |
16 | | */ |
17 | | #include "postgres.h" |
18 | | |
19 | | #include <sys/stat.h> |
20 | | #include <sys/time.h> |
21 | | #include <time.h> |
22 | | #include <unistd.h> |
23 | | #include <signal.h> |
24 | | |
25 | | #include "access/xlog_internal.h" |
26 | | #include "access/xlogrecovery.h" |
27 | | #include "pgstat.h" |
28 | | #include "replication/walreceiver.h" |
29 | | #include "storage/pmsignal.h" |
30 | | #include "storage/proc.h" |
31 | | #include "storage/shmem.h" |
32 | | #include "utils/timestamp.h" |
33 | | |
34 | | WalRcvData *WalRcv = NULL; |
35 | | |
36 | | /* |
37 | | * How long to wait for walreceiver to start up after requesting |
38 | | * postmaster to launch it. In seconds. |
39 | | */ |
40 | 0 | #define WALRCV_STARTUP_TIMEOUT 10 |
41 | | |
42 | | /* Report shared memory space needed by WalRcvShmemInit */ |
43 | | Size |
44 | | WalRcvShmemSize(void) |
45 | 0 | { |
46 | 0 | Size size = 0; |
47 | |
|
48 | 0 | size = add_size(size, sizeof(WalRcvData)); |
49 | |
|
50 | 0 | return size; |
51 | 0 | } |
52 | | |
53 | | /* Allocate and initialize walreceiver-related shared memory */ |
54 | | void |
55 | | WalRcvShmemInit(void) |
56 | 0 | { |
57 | 0 | bool found; |
58 | |
|
59 | 0 | WalRcv = (WalRcvData *) |
60 | 0 | ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found); |
61 | |
|
62 | 0 | if (!found) |
63 | 0 | { |
64 | | /* First time through, so initialize */ |
65 | 0 | MemSet(WalRcv, 0, WalRcvShmemSize()); |
66 | 0 | WalRcv->walRcvState = WALRCV_STOPPED; |
67 | 0 | ConditionVariableInit(&WalRcv->walRcvStoppedCV); |
68 | 0 | SpinLockInit(&WalRcv->mutex); |
69 | 0 | pg_atomic_init_u64(&WalRcv->writtenUpto, 0); |
70 | 0 | WalRcv->procno = INVALID_PROC_NUMBER; |
71 | 0 | } |
72 | 0 | } |
73 | | |
74 | | /* Is walreceiver running (or starting up)? */ |
75 | | bool |
76 | | WalRcvRunning(void) |
77 | 0 | { |
78 | 0 | WalRcvData *walrcv = WalRcv; |
79 | 0 | WalRcvState state; |
80 | 0 | pg_time_t startTime; |
81 | |
|
82 | 0 | SpinLockAcquire(&walrcv->mutex); |
83 | |
|
84 | 0 | state = walrcv->walRcvState; |
85 | 0 | startTime = walrcv->startTime; |
86 | |
|
87 | 0 | SpinLockRelease(&walrcv->mutex); |
88 | | |
89 | | /* |
90 | | * If it has taken too long for walreceiver to start up, give up. Setting |
91 | | * the state to STOPPED ensures that if walreceiver later does start up |
92 | | * after all, it will see that it's not supposed to be running and die |
93 | | * without doing anything. |
94 | | */ |
95 | 0 | if (state == WALRCV_STARTING) |
96 | 0 | { |
97 | 0 | pg_time_t now = (pg_time_t) time(NULL); |
98 | |
|
99 | 0 | if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) |
100 | 0 | { |
101 | 0 | bool stopped = false; |
102 | |
|
103 | 0 | SpinLockAcquire(&walrcv->mutex); |
104 | 0 | if (walrcv->walRcvState == WALRCV_STARTING) |
105 | 0 | { |
106 | 0 | state = walrcv->walRcvState = WALRCV_STOPPED; |
107 | 0 | stopped = true; |
108 | 0 | } |
109 | 0 | SpinLockRelease(&walrcv->mutex); |
110 | |
|
111 | 0 | if (stopped) |
112 | 0 | ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
113 | 0 | } |
114 | 0 | } |
115 | |
|
116 | 0 | if (state != WALRCV_STOPPED) |
117 | 0 | return true; |
118 | 0 | else |
119 | 0 | return false; |
120 | 0 | } |
121 | | |
122 | | /* |
123 | | * Is walreceiver running and streaming (or at least attempting to connect, |
124 | | * or starting up)? |
125 | | */ |
126 | | bool |
127 | | WalRcvStreaming(void) |
128 | 0 | { |
129 | 0 | WalRcvData *walrcv = WalRcv; |
130 | 0 | WalRcvState state; |
131 | 0 | pg_time_t startTime; |
132 | |
|
133 | 0 | SpinLockAcquire(&walrcv->mutex); |
134 | |
|
135 | 0 | state = walrcv->walRcvState; |
136 | 0 | startTime = walrcv->startTime; |
137 | |
|
138 | 0 | SpinLockRelease(&walrcv->mutex); |
139 | | |
140 | | /* |
141 | | * If it has taken too long for walreceiver to start up, give up. Setting |
142 | | * the state to STOPPED ensures that if walreceiver later does start up |
143 | | * after all, it will see that it's not supposed to be running and die |
144 | | * without doing anything. |
145 | | */ |
146 | 0 | if (state == WALRCV_STARTING) |
147 | 0 | { |
148 | 0 | pg_time_t now = (pg_time_t) time(NULL); |
149 | |
|
150 | 0 | if ((now - startTime) > WALRCV_STARTUP_TIMEOUT) |
151 | 0 | { |
152 | 0 | bool stopped = false; |
153 | |
|
154 | 0 | SpinLockAcquire(&walrcv->mutex); |
155 | 0 | if (walrcv->walRcvState == WALRCV_STARTING) |
156 | 0 | { |
157 | 0 | state = walrcv->walRcvState = WALRCV_STOPPED; |
158 | 0 | stopped = true; |
159 | 0 | } |
160 | 0 | SpinLockRelease(&walrcv->mutex); |
161 | |
|
162 | 0 | if (stopped) |
163 | 0 | ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
164 | 0 | } |
165 | 0 | } |
166 | |
|
167 | 0 | if (state == WALRCV_STREAMING || state == WALRCV_STARTING || |
168 | 0 | state == WALRCV_RESTARTING) |
169 | 0 | return true; |
170 | 0 | else |
171 | 0 | return false; |
172 | 0 | } |
173 | | |
174 | | /* |
175 | | * Stop walreceiver (if running) and wait for it to die. |
176 | | * Executed by the Startup process. |
177 | | */ |
178 | | void |
179 | | ShutdownWalRcv(void) |
180 | 0 | { |
181 | 0 | WalRcvData *walrcv = WalRcv; |
182 | 0 | pid_t walrcvpid = 0; |
183 | 0 | bool stopped = false; |
184 | | |
185 | | /* |
186 | | * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED |
187 | | * mode once it's finished, and will also request postmaster to not |
188 | | * restart itself. |
189 | | */ |
190 | 0 | SpinLockAcquire(&walrcv->mutex); |
191 | 0 | switch (walrcv->walRcvState) |
192 | 0 | { |
193 | 0 | case WALRCV_STOPPED: |
194 | 0 | break; |
195 | 0 | case WALRCV_STARTING: |
196 | 0 | walrcv->walRcvState = WALRCV_STOPPED; |
197 | 0 | stopped = true; |
198 | 0 | break; |
199 | | |
200 | 0 | case WALRCV_STREAMING: |
201 | 0 | case WALRCV_WAITING: |
202 | 0 | case WALRCV_RESTARTING: |
203 | 0 | walrcv->walRcvState = WALRCV_STOPPING; |
204 | | /* fall through */ |
205 | 0 | case WALRCV_STOPPING: |
206 | 0 | walrcvpid = walrcv->pid; |
207 | 0 | break; |
208 | 0 | } |
209 | 0 | SpinLockRelease(&walrcv->mutex); |
210 | | |
211 | | /* Unnecessary but consistent. */ |
212 | 0 | if (stopped) |
213 | 0 | ConditionVariableBroadcast(&walrcv->walRcvStoppedCV); |
214 | | |
215 | | /* |
216 | | * Signal walreceiver process if it was still running. |
217 | | */ |
218 | 0 | if (walrcvpid != 0) |
219 | 0 | kill(walrcvpid, SIGTERM); |
220 | | |
221 | | /* |
222 | | * Wait for walreceiver to acknowledge its death by setting state to |
223 | | * WALRCV_STOPPED. |
224 | | */ |
225 | 0 | ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV); |
226 | 0 | while (WalRcvRunning()) |
227 | 0 | ConditionVariableSleep(&walrcv->walRcvStoppedCV, |
228 | 0 | WAIT_EVENT_WAL_RECEIVER_EXIT); |
229 | 0 | ConditionVariableCancelSleep(); |
230 | 0 | } |
231 | | |
232 | | /* |
233 | | * Request postmaster to start walreceiver. |
234 | | * |
235 | | * "recptr" indicates the position where streaming should begin. "conninfo" |
236 | | * is a libpq connection string to use. "slotname" is, optionally, the name |
237 | | * of a replication slot to acquire. "create_temp_slot" indicates to create |
238 | | * a temporary slot when no "slotname" is given. |
239 | | * |
240 | | * WAL receivers do not directly load GUC parameters used for the connection |
241 | | * to the primary, and rely on the values passed down by the caller of this |
242 | | * routine instead. Hence, the addition of any new parameters should happen |
243 | | * through this code path. |
244 | | */ |
245 | | void |
246 | | RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo, |
247 | | const char *slotname, bool create_temp_slot) |
248 | 0 | { |
249 | 0 | WalRcvData *walrcv = WalRcv; |
250 | 0 | bool launch = false; |
251 | 0 | pg_time_t now = (pg_time_t) time(NULL); |
252 | 0 | ProcNumber walrcv_proc; |
253 | | |
254 | | /* |
255 | | * We always start at the beginning of the segment. That prevents a broken |
256 | | * segment (i.e., with no records in the first half of a segment) from |
257 | | * being created by XLOG streaming, which might cause trouble later on if |
258 | | * the segment is e.g archived. |
259 | | */ |
260 | 0 | if (XLogSegmentOffset(recptr, wal_segment_size) != 0) |
261 | 0 | recptr -= XLogSegmentOffset(recptr, wal_segment_size); |
262 | |
|
263 | 0 | SpinLockAcquire(&walrcv->mutex); |
264 | | |
265 | | /* It better be stopped if we try to restart it */ |
266 | 0 | Assert(walrcv->walRcvState == WALRCV_STOPPED || |
267 | 0 | walrcv->walRcvState == WALRCV_WAITING); |
268 | |
|
269 | 0 | if (conninfo != NULL) |
270 | 0 | strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO); |
271 | 0 | else |
272 | 0 | walrcv->conninfo[0] = '\0'; |
273 | | |
274 | | /* |
275 | | * Use configured replication slot if present, and ignore the value of |
276 | | * create_temp_slot as the slot name should be persistent. Otherwise, use |
277 | | * create_temp_slot to determine whether this WAL receiver should create a |
278 | | * temporary slot by itself and use it, or not. |
279 | | */ |
280 | 0 | if (slotname != NULL && slotname[0] != '\0') |
281 | 0 | { |
282 | 0 | strlcpy(walrcv->slotname, slotname, NAMEDATALEN); |
283 | 0 | walrcv->is_temp_slot = false; |
284 | 0 | } |
285 | 0 | else |
286 | 0 | { |
287 | 0 | walrcv->slotname[0] = '\0'; |
288 | 0 | walrcv->is_temp_slot = create_temp_slot; |
289 | 0 | } |
290 | |
|
291 | 0 | if (walrcv->walRcvState == WALRCV_STOPPED) |
292 | 0 | { |
293 | 0 | launch = true; |
294 | 0 | walrcv->walRcvState = WALRCV_STARTING; |
295 | 0 | } |
296 | 0 | else |
297 | 0 | walrcv->walRcvState = WALRCV_RESTARTING; |
298 | 0 | walrcv->startTime = now; |
299 | | |
300 | | /* |
301 | | * If this is the first startup of walreceiver (on this timeline), |
302 | | * initialize flushedUpto and latestChunkStart to the starting point. |
303 | | */ |
304 | 0 | if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli) |
305 | 0 | { |
306 | 0 | walrcv->flushedUpto = recptr; |
307 | 0 | walrcv->receivedTLI = tli; |
308 | 0 | walrcv->latestChunkStart = recptr; |
309 | 0 | } |
310 | 0 | walrcv->receiveStart = recptr; |
311 | 0 | walrcv->receiveStartTLI = tli; |
312 | |
|
313 | 0 | walrcv_proc = walrcv->procno; |
314 | |
|
315 | 0 | SpinLockRelease(&walrcv->mutex); |
316 | |
|
317 | 0 | if (launch) |
318 | 0 | SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER); |
319 | 0 | else if (walrcv_proc != INVALID_PROC_NUMBER) |
320 | 0 | SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch); |
321 | 0 | } |
322 | | |
323 | | /* |
324 | | * Returns the last+1 byte position that walreceiver has flushed. |
325 | | * |
326 | | * Optionally, returns the previous chunk start, that is the first byte |
327 | | * written in the most recent walreceiver flush cycle. Callers not |
328 | | * interested in that value may pass NULL for latestChunkStart. Same for |
329 | | * receiveTLI. |
330 | | */ |
331 | | XLogRecPtr |
332 | | GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI) |
333 | 0 | { |
334 | 0 | WalRcvData *walrcv = WalRcv; |
335 | 0 | XLogRecPtr recptr; |
336 | |
|
337 | 0 | SpinLockAcquire(&walrcv->mutex); |
338 | 0 | recptr = walrcv->flushedUpto; |
339 | 0 | if (latestChunkStart) |
340 | 0 | *latestChunkStart = walrcv->latestChunkStart; |
341 | 0 | if (receiveTLI) |
342 | 0 | *receiveTLI = walrcv->receivedTLI; |
343 | 0 | SpinLockRelease(&walrcv->mutex); |
344 | |
|
345 | 0 | return recptr; |
346 | 0 | } |
347 | | |
348 | | /* |
349 | | * Returns the last+1 byte position that walreceiver has written. |
350 | | * This returns a recently written value without taking a lock. |
351 | | */ |
352 | | XLogRecPtr |
353 | | GetWalRcvWriteRecPtr(void) |
354 | 0 | { |
355 | 0 | WalRcvData *walrcv = WalRcv; |
356 | |
|
357 | 0 | return pg_atomic_read_u64(&walrcv->writtenUpto); |
358 | 0 | } |
359 | | |
360 | | /* |
361 | | * Returns the replication apply delay in ms or -1 |
362 | | * if the apply delay info is not available |
363 | | */ |
364 | | int |
365 | | GetReplicationApplyDelay(void) |
366 | 0 | { |
367 | 0 | WalRcvData *walrcv = WalRcv; |
368 | 0 | XLogRecPtr receivePtr; |
369 | 0 | XLogRecPtr replayPtr; |
370 | 0 | TimestampTz chunkReplayStartTime; |
371 | |
|
372 | 0 | SpinLockAcquire(&walrcv->mutex); |
373 | 0 | receivePtr = walrcv->flushedUpto; |
374 | 0 | SpinLockRelease(&walrcv->mutex); |
375 | |
|
376 | 0 | replayPtr = GetXLogReplayRecPtr(NULL); |
377 | |
|
378 | 0 | if (receivePtr == replayPtr) |
379 | 0 | return 0; |
380 | | |
381 | 0 | chunkReplayStartTime = GetCurrentChunkReplayStartTime(); |
382 | |
|
383 | 0 | if (chunkReplayStartTime == 0) |
384 | 0 | return -1; |
385 | | |
386 | 0 | return TimestampDifferenceMilliseconds(chunkReplayStartTime, |
387 | 0 | GetCurrentTimestamp()); |
388 | 0 | } |
389 | | |
390 | | /* |
391 | | * Returns the network latency in ms, note that this includes any |
392 | | * difference in clock settings between the servers, as well as timezone. |
393 | | */ |
394 | | int |
395 | | GetReplicationTransferLatency(void) |
396 | 0 | { |
397 | 0 | WalRcvData *walrcv = WalRcv; |
398 | 0 | TimestampTz lastMsgSendTime; |
399 | 0 | TimestampTz lastMsgReceiptTime; |
400 | |
|
401 | 0 | SpinLockAcquire(&walrcv->mutex); |
402 | 0 | lastMsgSendTime = walrcv->lastMsgSendTime; |
403 | 0 | lastMsgReceiptTime = walrcv->lastMsgReceiptTime; |
404 | 0 | SpinLockRelease(&walrcv->mutex); |
405 | |
|
406 | 0 | return TimestampDifferenceMilliseconds(lastMsgSendTime, |
407 | 0 | lastMsgReceiptTime); |
408 | 0 | } |