/src/postgres/src/backend/replication/logical/origin.c
Line | Count | Source |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * origin.c |
4 | | * Logical replication progress tracking support. |
5 | | * |
6 | | * Copyright (c) 2013-2025, PostgreSQL Global Development Group |
7 | | * |
8 | | * IDENTIFICATION |
9 | | * src/backend/replication/logical/origin.c |
10 | | * |
11 | | * NOTES |
12 | | * |
13 | | * This file provides the following: |
14 | | * * An infrastructure to name nodes in a replication setup |
15 | | * * A facility to efficiently store and persist replication progress in an |
16 | | * efficient and durable manner. |
17 | | * |
18 | | * Replication origin consist out of a descriptive, user defined, external |
19 | | * name and a short, thus space efficient, internal 2 byte one. This split |
20 | | * exists because replication origin have to be stored in WAL and shared |
21 | | * memory and long descriptors would be inefficient. For now only use 2 bytes |
22 | | * for the internal id of a replication origin as it seems unlikely that there |
23 | | * soon will be more than 65k nodes in one replication setup; and using only |
24 | | * two bytes allow us to be more space efficient. |
25 | | * |
26 | | * Replication progress is tracked in a shared memory table |
27 | | * (ReplicationState) that's dumped to disk every checkpoint. Entries |
28 | | * ('slots') in this table are identified by the internal id. That's the case |
29 | | * because it allows to increase replication progress during crash |
30 | | * recovery. To allow doing so we store the original LSN (from the originating |
31 | | * system) of a transaction in the commit record. That allows to recover the |
32 | | * precise replayed state after crash recovery; without requiring synchronous |
33 | | * commits. Allowing logical replication to use asynchronous commit is |
34 | | * generally good for performance, but especially important as it allows a |
35 | | * single threaded replay process to keep up with a source that has multiple |
36 | | * backends generating changes concurrently. For efficiency and simplicity |
37 | | * reasons a backend can setup one replication origin that's from then used as |
38 | | * the source of changes produced by the backend, until reset again. |
39 | | * |
40 | | * This infrastructure is intended to be used in cooperation with logical |
41 | | * decoding. When replaying from a remote system the configured origin is |
42 | | * provided to output plugins, allowing prevention of replication loops and |
43 | | * other filtering. |
44 | | * |
45 | | * There are several levels of locking at work: |
46 | | * |
47 | | * * To create and drop replication origins an exclusive lock on |
48 | | * pg_replication_slot is required for the duration. That allows us to |
49 | | * safely and conflict free assign new origins using a dirty snapshot. |
50 | | * |
51 | | * * When creating an in-memory replication progress slot the ReplicationOrigin |
52 | | * LWLock has to be held exclusively; when iterating over the replication |
53 | | * progress a shared lock has to be held, the same when advancing the |
54 | | * replication progress of an individual backend that has not setup as the |
55 | | * session's replication origin. |
56 | | * |
57 | | * * When manipulating or looking at the remote_lsn and local_lsn fields of a |
58 | | * replication progress slot that slot's lwlock has to be held. That's |
59 | | * primarily because we do not assume 8 byte writes (the LSN) is atomic on |
60 | | * all our platforms, but it also simplifies memory ordering concerns |
61 | | * between the remote and local lsn. We use a lwlock instead of a spinlock |
62 | | * so it's less harmful to hold the lock over a WAL write |
63 | | * (cf. AdvanceReplicationProgress). |
64 | | * |
65 | | * --------------------------------------------------------------------------- |
66 | | */ |
67 | | |
68 | | #include "postgres.h" |
69 | | |
70 | | #include <unistd.h> |
71 | | #include <sys/stat.h> |
72 | | |
73 | | #include "access/genam.h" |
74 | | #include "access/htup_details.h" |
75 | | #include "access/table.h" |
76 | | #include "access/xact.h" |
77 | | #include "access/xloginsert.h" |
78 | | #include "catalog/catalog.h" |
79 | | #include "catalog/indexing.h" |
80 | | #include "catalog/pg_subscription.h" |
81 | | #include "funcapi.h" |
82 | | #include "miscadmin.h" |
83 | | #include "nodes/execnodes.h" |
84 | | #include "pgstat.h" |
85 | | #include "replication/origin.h" |
86 | | #include "replication/slot.h" |
87 | | #include "storage/condition_variable.h" |
88 | | #include "storage/fd.h" |
89 | | #include "storage/ipc.h" |
90 | | #include "storage/lmgr.h" |
91 | | #include "utils/builtins.h" |
92 | | #include "utils/fmgroids.h" |
93 | | #include "utils/guc.h" |
94 | | #include "utils/pg_lsn.h" |
95 | | #include "utils/rel.h" |
96 | | #include "utils/snapmgr.h" |
97 | | #include "utils/syscache.h" |
98 | | |
99 | | /* paths for replication origin checkpoint files */ |
100 | 0 | #define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint" |
101 | 0 | #define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp" |
102 | | |
103 | | /* GUC variables */ |
104 | | int max_active_replication_origins = 10; |
105 | | |
106 | | /* |
107 | | * Replay progress of a single remote node. |
108 | | */ |
109 | | typedef struct ReplicationState |
110 | | { |
111 | | /* |
112 | | * Local identifier for the remote node. |
113 | | */ |
114 | | RepOriginId roident; |
115 | | |
116 | | /* |
117 | | * Location of the latest commit from the remote side. |
118 | | */ |
119 | | XLogRecPtr remote_lsn; |
120 | | |
121 | | /* |
122 | | * Remember the local lsn of the commit record so we can XLogFlush() to it |
123 | | * during a checkpoint so we know the commit record actually is safe on |
124 | | * disk. |
125 | | */ |
126 | | XLogRecPtr local_lsn; |
127 | | |
128 | | /* |
129 | | * PID of backend that's acquired slot, or 0 if none. |
130 | | */ |
131 | | int acquired_by; |
132 | | |
133 | | /* |
134 | | * Condition variable that's signaled when acquired_by changes. |
135 | | */ |
136 | | ConditionVariable origin_cv; |
137 | | |
138 | | /* |
139 | | * Lock protecting remote_lsn and local_lsn. |
140 | | */ |
141 | | LWLock lock; |
142 | | } ReplicationState; |
143 | | |
144 | | /* |
145 | | * On disk version of ReplicationState. |
146 | | */ |
147 | | typedef struct ReplicationStateOnDisk |
148 | | { |
149 | | RepOriginId roident; |
150 | | XLogRecPtr remote_lsn; |
151 | | } ReplicationStateOnDisk; |
152 | | |
153 | | |
154 | | typedef struct ReplicationStateCtl |
155 | | { |
156 | | /* Tranche to use for per-origin LWLocks */ |
157 | | int tranche_id; |
158 | | /* Array of length max_active_replication_origins */ |
159 | | ReplicationState states[FLEXIBLE_ARRAY_MEMBER]; |
160 | | } ReplicationStateCtl; |
161 | | |
162 | | /* external variables */ |
163 | | RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */ |
164 | | XLogRecPtr replorigin_session_origin_lsn = InvalidXLogRecPtr; |
165 | | TimestampTz replorigin_session_origin_timestamp = 0; |
166 | | |
167 | | /* |
168 | | * Base address into a shared memory array of replication states of size |
169 | | * max_active_replication_origins. |
170 | | */ |
171 | | static ReplicationState *replication_states; |
172 | | |
173 | | /* |
174 | | * Actual shared memory block (replication_states[] is now part of this). |
175 | | */ |
176 | | static ReplicationStateCtl *replication_states_ctl; |
177 | | |
178 | | /* |
179 | | * We keep a pointer to this backend's ReplicationState to avoid having to |
180 | | * search the replication_states array in replorigin_session_advance for each |
181 | | * remote commit. (Ownership of a backend's own entry can only be changed by |
182 | | * that backend.) |
183 | | */ |
184 | | static ReplicationState *session_replication_state = NULL; |
185 | | |
186 | | /* Magic for on disk files. */ |
187 | 0 | #define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE) |
188 | | |
189 | | static void |
190 | | replorigin_check_prerequisites(bool check_origins, bool recoveryOK) |
191 | 0 | { |
192 | 0 | if (check_origins && max_active_replication_origins == 0) |
193 | 0 | ereport(ERROR, |
194 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
195 | 0 | errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0"))); |
196 | | |
197 | 0 | if (!recoveryOK && RecoveryInProgress()) |
198 | 0 | ereport(ERROR, |
199 | 0 | (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION), |
200 | 0 | errmsg("cannot manipulate replication origins during recovery"))); |
201 | 0 | } |
202 | | |
203 | | |
204 | | /* |
205 | | * IsReservedOriginName |
206 | | * True iff name is either "none" or "any". |
207 | | */ |
208 | | static bool |
209 | | IsReservedOriginName(const char *name) |
210 | 0 | { |
211 | 0 | return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) || |
212 | 0 | (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0)); |
213 | 0 | } |
214 | | |
215 | | /* --------------------------------------------------------------------------- |
216 | | * Functions for working with replication origins themselves. |
217 | | * --------------------------------------------------------------------------- |
218 | | */ |
219 | | |
220 | | /* |
221 | | * Check for a persistent replication origin identified by name. |
222 | | * |
223 | | * Returns InvalidOid if the node isn't known yet and missing_ok is true. |
224 | | */ |
225 | | RepOriginId |
226 | | replorigin_by_name(const char *roname, bool missing_ok) |
227 | 0 | { |
228 | 0 | Form_pg_replication_origin ident; |
229 | 0 | Oid roident = InvalidOid; |
230 | 0 | HeapTuple tuple; |
231 | 0 | Datum roname_d; |
232 | |
|
233 | 0 | roname_d = CStringGetTextDatum(roname); |
234 | |
|
235 | 0 | tuple = SearchSysCache1(REPLORIGNAME, roname_d); |
236 | 0 | if (HeapTupleIsValid(tuple)) |
237 | 0 | { |
238 | 0 | ident = (Form_pg_replication_origin) GETSTRUCT(tuple); |
239 | 0 | roident = ident->roident; |
240 | 0 | ReleaseSysCache(tuple); |
241 | 0 | } |
242 | 0 | else if (!missing_ok) |
243 | 0 | ereport(ERROR, |
244 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
245 | 0 | errmsg("replication origin \"%s\" does not exist", |
246 | 0 | roname))); |
247 | | |
248 | 0 | return roident; |
249 | 0 | } |
250 | | |
251 | | /* |
252 | | * Create a replication origin. |
253 | | * |
254 | | * Needs to be called in a transaction. |
255 | | */ |
256 | | RepOriginId |
257 | | replorigin_create(const char *roname) |
258 | 0 | { |
259 | 0 | Oid roident; |
260 | 0 | HeapTuple tuple = NULL; |
261 | 0 | Relation rel; |
262 | 0 | Datum roname_d; |
263 | 0 | SnapshotData SnapshotDirty; |
264 | 0 | SysScanDesc scan; |
265 | 0 | ScanKeyData key; |
266 | | |
267 | | /* |
268 | | * To avoid needing a TOAST table for pg_replication_origin, we limit |
269 | | * replication origin names to 512 bytes. This should be more than enough |
270 | | * for all practical use. |
271 | | */ |
272 | 0 | if (strlen(roname) > MAX_RONAME_LEN) |
273 | 0 | ereport(ERROR, |
274 | 0 | (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
275 | 0 | errmsg("replication origin name is too long"), |
276 | 0 | errdetail("Replication origin names must be no longer than %d bytes.", |
277 | 0 | MAX_RONAME_LEN))); |
278 | | |
279 | 0 | roname_d = CStringGetTextDatum(roname); |
280 | |
|
281 | 0 | Assert(IsTransactionState()); |
282 | | |
283 | | /* |
284 | | * We need the numeric replication origin to be 16bit wide, so we cannot |
285 | | * rely on the normal oid allocation. Instead we simply scan |
286 | | * pg_replication_origin for the first unused id. That's not particularly |
287 | | * efficient, but this should be a fairly infrequent operation - we can |
288 | | * easily spend a bit more code on this when it turns out it needs to be |
289 | | * faster. |
290 | | * |
291 | | * We handle concurrency by taking an exclusive lock (allowing reads!) |
292 | | * over the table for the duration of the search. Because we use a "dirty |
293 | | * snapshot" we can read rows that other in-progress sessions have |
294 | | * written, even though they would be invisible with normal snapshots. Due |
295 | | * to the exclusive lock there's no danger that new rows can appear while |
296 | | * we're checking. |
297 | | */ |
298 | 0 | InitDirtySnapshot(SnapshotDirty); |
299 | |
|
300 | 0 | rel = table_open(ReplicationOriginRelationId, ExclusiveLock); |
301 | | |
302 | | /* |
303 | | * We want to be able to access pg_replication_origin without setting up a |
304 | | * snapshot. To make that safe, it needs to not have a TOAST table, since |
305 | | * TOASTed data cannot be fetched without a snapshot. As of this writing, |
306 | | * its only varlena column is roname, which we limit to 512 bytes to avoid |
307 | | * needing out-of-line storage. If you add a TOAST table to this catalog, |
308 | | * be sure to set up a snapshot everywhere it might be needed. For more |
309 | | * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan. |
310 | | */ |
311 | 0 | Assert(!OidIsValid(rel->rd_rel->reltoastrelid)); |
312 | |
|
313 | 0 | for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++) |
314 | 0 | { |
315 | 0 | bool nulls[Natts_pg_replication_origin]; |
316 | 0 | Datum values[Natts_pg_replication_origin]; |
317 | 0 | bool collides; |
318 | |
|
319 | 0 | CHECK_FOR_INTERRUPTS(); |
320 | |
|
321 | 0 | ScanKeyInit(&key, |
322 | 0 | Anum_pg_replication_origin_roident, |
323 | 0 | BTEqualStrategyNumber, F_OIDEQ, |
324 | 0 | ObjectIdGetDatum(roident)); |
325 | |
|
326 | 0 | scan = systable_beginscan(rel, ReplicationOriginIdentIndex, |
327 | 0 | true /* indexOK */ , |
328 | 0 | &SnapshotDirty, |
329 | 0 | 1, &key); |
330 | |
|
331 | 0 | collides = HeapTupleIsValid(systable_getnext(scan)); |
332 | |
|
333 | 0 | systable_endscan(scan); |
334 | |
|
335 | 0 | if (!collides) |
336 | 0 | { |
337 | | /* |
338 | | * Ok, found an unused roident, insert the new row and do a CCI, |
339 | | * so our callers can look it up if they want to. |
340 | | */ |
341 | 0 | memset(&nulls, 0, sizeof(nulls)); |
342 | |
|
343 | 0 | values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident); |
344 | 0 | values[Anum_pg_replication_origin_roname - 1] = roname_d; |
345 | |
|
346 | 0 | tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls); |
347 | 0 | CatalogTupleInsert(rel, tuple); |
348 | 0 | CommandCounterIncrement(); |
349 | 0 | break; |
350 | 0 | } |
351 | 0 | } |
352 | | |
353 | | /* now release lock again, */ |
354 | 0 | table_close(rel, ExclusiveLock); |
355 | |
|
356 | 0 | if (tuple == NULL) |
357 | 0 | ereport(ERROR, |
358 | 0 | (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
359 | 0 | errmsg("could not find free replication origin ID"))); |
360 | | |
361 | 0 | heap_freetuple(tuple); |
362 | 0 | return roident; |
363 | 0 | } |
364 | | |
365 | | /* |
366 | | * Helper function to drop a replication origin. |
367 | | */ |
368 | | static void |
369 | | replorigin_state_clear(RepOriginId roident, bool nowait) |
370 | 0 | { |
371 | 0 | int i; |
372 | | |
373 | | /* |
374 | | * Clean up the slot state info, if there is any matching slot. |
375 | | */ |
376 | 0 | restart: |
377 | 0 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
378 | |
|
379 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
380 | 0 | { |
381 | 0 | ReplicationState *state = &replication_states[i]; |
382 | |
|
383 | 0 | if (state->roident == roident) |
384 | 0 | { |
385 | | /* found our slot, is it busy? */ |
386 | 0 | if (state->acquired_by != 0) |
387 | 0 | { |
388 | 0 | ConditionVariable *cv; |
389 | |
|
390 | 0 | if (nowait) |
391 | 0 | ereport(ERROR, |
392 | 0 | (errcode(ERRCODE_OBJECT_IN_USE), |
393 | 0 | errmsg("could not drop replication origin with ID %d, in use by PID %d", |
394 | 0 | state->roident, |
395 | 0 | state->acquired_by))); |
396 | | |
397 | | /* |
398 | | * We must wait and then retry. Since we don't know which CV |
399 | | * to wait on until here, we can't readily use |
400 | | * ConditionVariablePrepareToSleep (calling it here would be |
401 | | * wrong, since we could miss the signal if we did so); just |
402 | | * use ConditionVariableSleep directly. |
403 | | */ |
404 | 0 | cv = &state->origin_cv; |
405 | |
|
406 | 0 | LWLockRelease(ReplicationOriginLock); |
407 | |
|
408 | 0 | ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP); |
409 | 0 | goto restart; |
410 | 0 | } |
411 | | |
412 | | /* first make a WAL log entry */ |
413 | 0 | { |
414 | 0 | xl_replorigin_drop xlrec; |
415 | |
|
416 | 0 | xlrec.node_id = roident; |
417 | 0 | XLogBeginInsert(); |
418 | 0 | XLogRegisterData(&xlrec, sizeof(xlrec)); |
419 | 0 | XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP); |
420 | 0 | } |
421 | | |
422 | | /* then clear the in-memory slot */ |
423 | 0 | state->roident = InvalidRepOriginId; |
424 | 0 | state->remote_lsn = InvalidXLogRecPtr; |
425 | 0 | state->local_lsn = InvalidXLogRecPtr; |
426 | 0 | break; |
427 | 0 | } |
428 | 0 | } |
429 | 0 | LWLockRelease(ReplicationOriginLock); |
430 | 0 | ConditionVariableCancelSleep(); |
431 | 0 | } |
432 | | |
433 | | /* |
434 | | * Drop replication origin (by name). |
435 | | * |
436 | | * Needs to be called in a transaction. |
437 | | */ |
438 | | void |
439 | | replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait) |
440 | 0 | { |
441 | 0 | RepOriginId roident; |
442 | 0 | Relation rel; |
443 | 0 | HeapTuple tuple; |
444 | |
|
445 | 0 | Assert(IsTransactionState()); |
446 | |
|
447 | 0 | rel = table_open(ReplicationOriginRelationId, RowExclusiveLock); |
448 | |
|
449 | 0 | roident = replorigin_by_name(name, missing_ok); |
450 | | |
451 | | /* Lock the origin to prevent concurrent drops. */ |
452 | 0 | LockSharedObject(ReplicationOriginRelationId, roident, 0, |
453 | 0 | AccessExclusiveLock); |
454 | |
|
455 | 0 | tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident)); |
456 | 0 | if (!HeapTupleIsValid(tuple)) |
457 | 0 | { |
458 | 0 | if (!missing_ok) |
459 | 0 | elog(ERROR, "cache lookup failed for replication origin with ID %d", |
460 | 0 | roident); |
461 | | |
462 | | /* |
463 | | * We don't need to retain the locks if the origin is already dropped. |
464 | | */ |
465 | 0 | UnlockSharedObject(ReplicationOriginRelationId, roident, 0, |
466 | 0 | AccessExclusiveLock); |
467 | 0 | table_close(rel, RowExclusiveLock); |
468 | 0 | return; |
469 | 0 | } |
470 | | |
471 | 0 | replorigin_state_clear(roident, nowait); |
472 | | |
473 | | /* |
474 | | * Now, we can delete the catalog entry. |
475 | | */ |
476 | 0 | CatalogTupleDelete(rel, &tuple->t_self); |
477 | 0 | ReleaseSysCache(tuple); |
478 | |
|
479 | 0 | CommandCounterIncrement(); |
480 | | |
481 | | /* We keep the lock on pg_replication_origin until commit */ |
482 | 0 | table_close(rel, NoLock); |
483 | 0 | } |
484 | | |
485 | | /* |
486 | | * Lookup replication origin via its oid and return the name. |
487 | | * |
488 | | * The external name is palloc'd in the calling context. |
489 | | * |
490 | | * Returns true if the origin is known, false otherwise. |
491 | | */ |
492 | | bool |
493 | | replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname) |
494 | 0 | { |
495 | 0 | HeapTuple tuple; |
496 | 0 | Form_pg_replication_origin ric; |
497 | |
|
498 | 0 | Assert(OidIsValid((Oid) roident)); |
499 | 0 | Assert(roident != InvalidRepOriginId); |
500 | 0 | Assert(roident != DoNotReplicateId); |
501 | |
|
502 | 0 | tuple = SearchSysCache1(REPLORIGIDENT, |
503 | 0 | ObjectIdGetDatum((Oid) roident)); |
504 | |
|
505 | 0 | if (HeapTupleIsValid(tuple)) |
506 | 0 | { |
507 | 0 | ric = (Form_pg_replication_origin) GETSTRUCT(tuple); |
508 | 0 | *roname = text_to_cstring(&ric->roname); |
509 | 0 | ReleaseSysCache(tuple); |
510 | |
|
511 | 0 | return true; |
512 | 0 | } |
513 | 0 | else |
514 | 0 | { |
515 | 0 | *roname = NULL; |
516 | |
|
517 | 0 | if (!missing_ok) |
518 | 0 | ereport(ERROR, |
519 | 0 | (errcode(ERRCODE_UNDEFINED_OBJECT), |
520 | 0 | errmsg("replication origin with ID %d does not exist", |
521 | 0 | roident))); |
522 | | |
523 | 0 | return false; |
524 | 0 | } |
525 | 0 | } |
526 | | |
527 | | |
528 | | /* --------------------------------------------------------------------------- |
529 | | * Functions for handling replication progress. |
530 | | * --------------------------------------------------------------------------- |
531 | | */ |
532 | | |
533 | | Size |
534 | | ReplicationOriginShmemSize(void) |
535 | 0 | { |
536 | 0 | Size size = 0; |
537 | |
|
538 | 0 | if (max_active_replication_origins == 0) |
539 | 0 | return size; |
540 | | |
541 | 0 | size = add_size(size, offsetof(ReplicationStateCtl, states)); |
542 | |
|
543 | 0 | size = add_size(size, |
544 | 0 | mul_size(max_active_replication_origins, sizeof(ReplicationState))); |
545 | 0 | return size; |
546 | 0 | } |
547 | | |
548 | | void |
549 | | ReplicationOriginShmemInit(void) |
550 | 0 | { |
551 | 0 | bool found; |
552 | |
|
553 | 0 | if (max_active_replication_origins == 0) |
554 | 0 | return; |
555 | | |
556 | 0 | replication_states_ctl = (ReplicationStateCtl *) |
557 | 0 | ShmemInitStruct("ReplicationOriginState", |
558 | 0 | ReplicationOriginShmemSize(), |
559 | 0 | &found); |
560 | 0 | replication_states = replication_states_ctl->states; |
561 | |
|
562 | 0 | if (!found) |
563 | 0 | { |
564 | 0 | int i; |
565 | |
|
566 | 0 | MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize()); |
567 | |
|
568 | 0 | replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE; |
569 | |
|
570 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
571 | 0 | { |
572 | 0 | LWLockInitialize(&replication_states[i].lock, |
573 | 0 | replication_states_ctl->tranche_id); |
574 | 0 | ConditionVariableInit(&replication_states[i].origin_cv); |
575 | 0 | } |
576 | 0 | } |
577 | 0 | } |
578 | | |
579 | | /* --------------------------------------------------------------------------- |
580 | | * Perform a checkpoint of each replication origin's progress with respect to |
581 | | * the replayed remote_lsn. Make sure that all transactions we refer to in the |
582 | | * checkpoint (local_lsn) are actually on-disk. This might not yet be the case |
583 | | * if the transactions were originally committed asynchronously. |
584 | | * |
585 | | * We store checkpoints in the following format: |
586 | | * +-------+------------------------+------------------+-----+--------+ |
587 | | * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF |
588 | | * +-------+------------------------+------------------+-----+--------+ |
589 | | * |
590 | | * So its just the magic, followed by the statically sized |
591 | | * ReplicationStateOnDisk structs. Note that the maximum number of |
592 | | * ReplicationState is determined by max_active_replication_origins. |
593 | | * --------------------------------------------------------------------------- |
594 | | */ |
595 | | void |
596 | | CheckPointReplicationOrigin(void) |
597 | 0 | { |
598 | 0 | const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE; |
599 | 0 | const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME; |
600 | 0 | int tmpfd; |
601 | 0 | int i; |
602 | 0 | uint32 magic = REPLICATION_STATE_MAGIC; |
603 | 0 | pg_crc32c crc; |
604 | |
|
605 | 0 | if (max_active_replication_origins == 0) |
606 | 0 | return; |
607 | | |
608 | 0 | INIT_CRC32C(crc); |
609 | | |
610 | | /* make sure no old temp file is remaining */ |
611 | 0 | if (unlink(tmppath) < 0 && errno != ENOENT) |
612 | 0 | ereport(PANIC, |
613 | 0 | (errcode_for_file_access(), |
614 | 0 | errmsg("could not remove file \"%s\": %m", |
615 | 0 | tmppath))); |
616 | | |
617 | | /* |
618 | | * no other backend can perform this at the same time; only one checkpoint |
619 | | * can happen at a time. |
620 | | */ |
621 | 0 | tmpfd = OpenTransientFile(tmppath, |
622 | 0 | O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); |
623 | 0 | if (tmpfd < 0) |
624 | 0 | ereport(PANIC, |
625 | 0 | (errcode_for_file_access(), |
626 | 0 | errmsg("could not create file \"%s\": %m", |
627 | 0 | tmppath))); |
628 | | |
629 | | /* write magic */ |
630 | 0 | errno = 0; |
631 | 0 | if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic)) |
632 | 0 | { |
633 | | /* if write didn't set errno, assume problem is no disk space */ |
634 | 0 | if (errno == 0) |
635 | 0 | errno = ENOSPC; |
636 | 0 | ereport(PANIC, |
637 | 0 | (errcode_for_file_access(), |
638 | 0 | errmsg("could not write to file \"%s\": %m", |
639 | 0 | tmppath))); |
640 | 0 | } |
641 | 0 | COMP_CRC32C(crc, &magic, sizeof(magic)); |
642 | | |
643 | | /* prevent concurrent creations/drops */ |
644 | 0 | LWLockAcquire(ReplicationOriginLock, LW_SHARED); |
645 | | |
646 | | /* write actual data */ |
647 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
648 | 0 | { |
649 | 0 | ReplicationStateOnDisk disk_state; |
650 | 0 | ReplicationState *curstate = &replication_states[i]; |
651 | 0 | XLogRecPtr local_lsn; |
652 | |
|
653 | 0 | if (curstate->roident == InvalidRepOriginId) |
654 | 0 | continue; |
655 | | |
656 | | /* zero, to avoid uninitialized padding bytes */ |
657 | 0 | memset(&disk_state, 0, sizeof(disk_state)); |
658 | |
|
659 | 0 | LWLockAcquire(&curstate->lock, LW_SHARED); |
660 | |
|
661 | 0 | disk_state.roident = curstate->roident; |
662 | |
|
663 | 0 | disk_state.remote_lsn = curstate->remote_lsn; |
664 | 0 | local_lsn = curstate->local_lsn; |
665 | |
|
666 | 0 | LWLockRelease(&curstate->lock); |
667 | | |
668 | | /* make sure we only write out a commit that's persistent */ |
669 | 0 | XLogFlush(local_lsn); |
670 | |
|
671 | 0 | errno = 0; |
672 | 0 | if ((write(tmpfd, &disk_state, sizeof(disk_state))) != |
673 | 0 | sizeof(disk_state)) |
674 | 0 | { |
675 | | /* if write didn't set errno, assume problem is no disk space */ |
676 | 0 | if (errno == 0) |
677 | 0 | errno = ENOSPC; |
678 | 0 | ereport(PANIC, |
679 | 0 | (errcode_for_file_access(), |
680 | 0 | errmsg("could not write to file \"%s\": %m", |
681 | 0 | tmppath))); |
682 | 0 | } |
683 | | |
684 | 0 | COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); |
685 | 0 | } |
686 | | |
687 | 0 | LWLockRelease(ReplicationOriginLock); |
688 | | |
689 | | /* write out the CRC */ |
690 | 0 | FIN_CRC32C(crc); |
691 | 0 | errno = 0; |
692 | 0 | if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc)) |
693 | 0 | { |
694 | | /* if write didn't set errno, assume problem is no disk space */ |
695 | 0 | if (errno == 0) |
696 | 0 | errno = ENOSPC; |
697 | 0 | ereport(PANIC, |
698 | 0 | (errcode_for_file_access(), |
699 | 0 | errmsg("could not write to file \"%s\": %m", |
700 | 0 | tmppath))); |
701 | 0 | } |
702 | | |
703 | 0 | if (CloseTransientFile(tmpfd) != 0) |
704 | 0 | ereport(PANIC, |
705 | 0 | (errcode_for_file_access(), |
706 | 0 | errmsg("could not close file \"%s\": %m", |
707 | 0 | tmppath))); |
708 | | |
709 | | /* fsync, rename to permanent file, fsync file and directory */ |
710 | 0 | durable_rename(tmppath, path, PANIC); |
711 | 0 | } |
712 | | |
713 | | /* |
714 | | * Recover replication replay status from checkpoint data saved earlier by |
715 | | * CheckPointReplicationOrigin. |
716 | | * |
717 | | * This only needs to be called at startup and *not* during every checkpoint |
718 | | * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All |
719 | | * state thereafter can be recovered by looking at commit records. |
720 | | */ |
721 | | void |
722 | | StartupReplicationOrigin(void) |
723 | 0 | { |
724 | 0 | const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME; |
725 | 0 | int fd; |
726 | 0 | int readBytes; |
727 | 0 | uint32 magic = REPLICATION_STATE_MAGIC; |
728 | 0 | int last_state = 0; |
729 | 0 | pg_crc32c file_crc; |
730 | 0 | pg_crc32c crc; |
731 | | |
732 | | /* don't want to overwrite already existing state */ |
733 | | #ifdef USE_ASSERT_CHECKING |
734 | | static bool already_started = false; |
735 | | |
736 | | Assert(!already_started); |
737 | | already_started = true; |
738 | | #endif |
739 | |
|
740 | 0 | if (max_active_replication_origins == 0) |
741 | 0 | return; |
742 | | |
743 | 0 | INIT_CRC32C(crc); |
744 | |
|
745 | 0 | elog(DEBUG2, "starting up replication origin progress state"); |
746 | | |
747 | 0 | fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); |
748 | | |
749 | | /* |
750 | | * might have had max_active_replication_origins == 0 last run, or we just |
751 | | * brought up a standby. |
752 | | */ |
753 | 0 | if (fd < 0 && errno == ENOENT) |
754 | 0 | return; |
755 | 0 | else if (fd < 0) |
756 | 0 | ereport(PANIC, |
757 | 0 | (errcode_for_file_access(), |
758 | 0 | errmsg("could not open file \"%s\": %m", |
759 | 0 | path))); |
760 | | |
761 | | /* verify magic, that is written even if nothing was active */ |
762 | 0 | readBytes = read(fd, &magic, sizeof(magic)); |
763 | 0 | if (readBytes != sizeof(magic)) |
764 | 0 | { |
765 | 0 | if (readBytes < 0) |
766 | 0 | ereport(PANIC, |
767 | 0 | (errcode_for_file_access(), |
768 | 0 | errmsg("could not read file \"%s\": %m", |
769 | 0 | path))); |
770 | 0 | else |
771 | 0 | ereport(PANIC, |
772 | 0 | (errcode(ERRCODE_DATA_CORRUPTED), |
773 | 0 | errmsg("could not read file \"%s\": read %d of %zu", |
774 | 0 | path, readBytes, sizeof(magic)))); |
775 | 0 | } |
776 | 0 | COMP_CRC32C(crc, &magic, sizeof(magic)); |
777 | |
|
778 | 0 | if (magic != REPLICATION_STATE_MAGIC) |
779 | 0 | ereport(PANIC, |
780 | 0 | (errmsg("replication checkpoint has wrong magic %u instead of %u", |
781 | 0 | magic, REPLICATION_STATE_MAGIC))); |
782 | | |
783 | | /* we can skip locking here, no other access is possible */ |
784 | | |
785 | | /* recover individual states, until there are no more to be found */ |
786 | 0 | while (true) |
787 | 0 | { |
788 | 0 | ReplicationStateOnDisk disk_state; |
789 | |
|
790 | 0 | readBytes = read(fd, &disk_state, sizeof(disk_state)); |
791 | | |
792 | | /* no further data */ |
793 | 0 | if (readBytes == sizeof(crc)) |
794 | 0 | { |
795 | | /* not pretty, but simple ... */ |
796 | 0 | file_crc = *(pg_crc32c *) &disk_state; |
797 | 0 | break; |
798 | 0 | } |
799 | | |
800 | 0 | if (readBytes < 0) |
801 | 0 | { |
802 | 0 | ereport(PANIC, |
803 | 0 | (errcode_for_file_access(), |
804 | 0 | errmsg("could not read file \"%s\": %m", |
805 | 0 | path))); |
806 | 0 | } |
807 | | |
808 | 0 | if (readBytes != sizeof(disk_state)) |
809 | 0 | { |
810 | 0 | ereport(PANIC, |
811 | 0 | (errcode_for_file_access(), |
812 | 0 | errmsg("could not read file \"%s\": read %d of %zu", |
813 | 0 | path, readBytes, sizeof(disk_state)))); |
814 | 0 | } |
815 | | |
816 | 0 | COMP_CRC32C(crc, &disk_state, sizeof(disk_state)); |
817 | |
|
818 | 0 | if (last_state == max_active_replication_origins) |
819 | 0 | ereport(PANIC, |
820 | 0 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
821 | 0 | errmsg("could not find free replication state, increase \"max_active_replication_origins\""))); |
822 | | |
823 | | /* copy data to shared memory */ |
824 | 0 | replication_states[last_state].roident = disk_state.roident; |
825 | 0 | replication_states[last_state].remote_lsn = disk_state.remote_lsn; |
826 | 0 | last_state++; |
827 | |
|
828 | 0 | ereport(LOG, |
829 | 0 | errmsg("recovered replication state of node %d to %X/%08X", |
830 | 0 | disk_state.roident, |
831 | 0 | LSN_FORMAT_ARGS(disk_state.remote_lsn))); |
832 | 0 | } |
833 | | |
834 | | /* now check checksum */ |
835 | 0 | FIN_CRC32C(crc); |
836 | 0 | if (file_crc != crc) |
837 | 0 | ereport(PANIC, |
838 | 0 | (errcode(ERRCODE_DATA_CORRUPTED), |
839 | 0 | errmsg("replication slot checkpoint has wrong checksum %u, expected %u", |
840 | 0 | crc, file_crc))); |
841 | | |
842 | 0 | if (CloseTransientFile(fd) != 0) |
843 | 0 | ereport(PANIC, |
844 | 0 | (errcode_for_file_access(), |
845 | 0 | errmsg("could not close file \"%s\": %m", |
846 | 0 | path))); |
847 | 0 | } |
848 | | |
849 | | void |
850 | | replorigin_redo(XLogReaderState *record) |
851 | 0 | { |
852 | 0 | uint8 info = XLogRecGetInfo(record) & ~XLR_INFO_MASK; |
853 | |
|
854 | 0 | switch (info) |
855 | 0 | { |
856 | 0 | case XLOG_REPLORIGIN_SET: |
857 | 0 | { |
858 | 0 | xl_replorigin_set *xlrec = |
859 | 0 | (xl_replorigin_set *) XLogRecGetData(record); |
860 | |
|
861 | 0 | replorigin_advance(xlrec->node_id, |
862 | 0 | xlrec->remote_lsn, record->EndRecPtr, |
863 | 0 | xlrec->force /* backward */ , |
864 | 0 | false /* WAL log */ ); |
865 | 0 | break; |
866 | 0 | } |
867 | 0 | case XLOG_REPLORIGIN_DROP: |
868 | 0 | { |
869 | 0 | xl_replorigin_drop *xlrec; |
870 | 0 | int i; |
871 | |
|
872 | 0 | xlrec = (xl_replorigin_drop *) XLogRecGetData(record); |
873 | |
|
874 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
875 | 0 | { |
876 | 0 | ReplicationState *state = &replication_states[i]; |
877 | | |
878 | | /* found our slot */ |
879 | 0 | if (state->roident == xlrec->node_id) |
880 | 0 | { |
881 | | /* reset entry */ |
882 | 0 | state->roident = InvalidRepOriginId; |
883 | 0 | state->remote_lsn = InvalidXLogRecPtr; |
884 | 0 | state->local_lsn = InvalidXLogRecPtr; |
885 | 0 | break; |
886 | 0 | } |
887 | 0 | } |
888 | 0 | break; |
889 | 0 | } |
890 | 0 | default: |
891 | 0 | elog(PANIC, "replorigin_redo: unknown op code %u", info); |
892 | 0 | } |
893 | 0 | } |
894 | | |
895 | | |
896 | | /* |
897 | | * Tell the replication origin progress machinery that a commit from 'node' |
898 | | * that originated at the LSN remote_commit on the remote node was replayed |
899 | | * successfully and that we don't need to do so again. In combination with |
900 | | * setting up replorigin_session_origin_lsn and replorigin_session_origin |
901 | | * that ensures we won't lose knowledge about that after a crash if the |
902 | | * transaction had a persistent effect (think of asynchronous commits). |
903 | | * |
904 | | * local_commit needs to be a local LSN of the commit so that we can make sure |
905 | | * upon a checkpoint that enough WAL has been persisted to disk. |
906 | | * |
907 | | * Needs to be called with a RowExclusiveLock on pg_replication_origin, |
908 | | * unless running in recovery. |
909 | | */ |
910 | | void |
911 | | replorigin_advance(RepOriginId node, |
912 | | XLogRecPtr remote_commit, XLogRecPtr local_commit, |
913 | | bool go_backward, bool wal_log) |
914 | 0 | { |
915 | 0 | int i; |
916 | 0 | ReplicationState *replication_state = NULL; |
917 | 0 | ReplicationState *free_state = NULL; |
918 | |
|
919 | 0 | Assert(node != InvalidRepOriginId); |
920 | | |
921 | | /* we don't track DoNotReplicateId */ |
922 | 0 | if (node == DoNotReplicateId) |
923 | 0 | return; |
924 | | |
925 | | /* |
926 | | * XXX: For the case where this is called by WAL replay, it'd be more |
927 | | * efficient to restore into a backend local hashtable and only dump into |
928 | | * shmem after recovery is finished. Let's wait with implementing that |
929 | | * till it's shown to be a measurable expense |
930 | | */ |
931 | | |
932 | | /* Lock exclusively, as we may have to create a new table entry. */ |
933 | 0 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
934 | | |
935 | | /* |
936 | | * Search for either an existing slot for the origin, or a free one we can |
937 | | * use. |
938 | | */ |
939 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
940 | 0 | { |
941 | 0 | ReplicationState *curstate = &replication_states[i]; |
942 | | |
943 | | /* remember where to insert if necessary */ |
944 | 0 | if (curstate->roident == InvalidRepOriginId && |
945 | 0 | free_state == NULL) |
946 | 0 | { |
947 | 0 | free_state = curstate; |
948 | 0 | continue; |
949 | 0 | } |
950 | | |
951 | | /* not our slot */ |
952 | 0 | if (curstate->roident != node) |
953 | 0 | { |
954 | 0 | continue; |
955 | 0 | } |
956 | | |
957 | | /* ok, found slot */ |
958 | 0 | replication_state = curstate; |
959 | |
|
960 | 0 | LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE); |
961 | | |
962 | | /* Make sure it's not used by somebody else */ |
963 | 0 | if (replication_state->acquired_by != 0) |
964 | 0 | { |
965 | 0 | ereport(ERROR, |
966 | 0 | (errcode(ERRCODE_OBJECT_IN_USE), |
967 | 0 | errmsg("replication origin with ID %d is already active for PID %d", |
968 | 0 | replication_state->roident, |
969 | 0 | replication_state->acquired_by))); |
970 | 0 | } |
971 | | |
972 | 0 | break; |
973 | 0 | } |
974 | | |
975 | 0 | if (replication_state == NULL && free_state == NULL) |
976 | 0 | ereport(ERROR, |
977 | 0 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
978 | 0 | errmsg("could not find free replication state slot for replication origin with ID %d", |
979 | 0 | node), |
980 | 0 | errhint("Increase \"max_active_replication_origins\" and try again."))); |
981 | | |
982 | 0 | if (replication_state == NULL) |
983 | 0 | { |
984 | | /* initialize new slot */ |
985 | 0 | LWLockAcquire(&free_state->lock, LW_EXCLUSIVE); |
986 | 0 | replication_state = free_state; |
987 | 0 | Assert(replication_state->remote_lsn == InvalidXLogRecPtr); |
988 | 0 | Assert(replication_state->local_lsn == InvalidXLogRecPtr); |
989 | 0 | replication_state->roident = node; |
990 | 0 | } |
991 | |
|
992 | 0 | Assert(replication_state->roident != InvalidRepOriginId); |
993 | | |
994 | | /* |
995 | | * If somebody "forcefully" sets this slot, WAL log it, so it's durable |
996 | | * and the standby gets the message. Primarily this will be called during |
997 | | * WAL replay (of commit records) where no WAL logging is necessary. |
998 | | */ |
999 | 0 | if (wal_log) |
1000 | 0 | { |
1001 | 0 | xl_replorigin_set xlrec; |
1002 | |
|
1003 | 0 | xlrec.remote_lsn = remote_commit; |
1004 | 0 | xlrec.node_id = node; |
1005 | 0 | xlrec.force = go_backward; |
1006 | |
|
1007 | 0 | XLogBeginInsert(); |
1008 | 0 | XLogRegisterData(&xlrec, sizeof(xlrec)); |
1009 | |
|
1010 | 0 | XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET); |
1011 | 0 | } |
1012 | | |
1013 | | /* |
1014 | | * Due to - harmless - race conditions during a checkpoint we could see |
1015 | | * values here that are older than the ones we already have in memory. We |
1016 | | * could also see older values for prepared transactions when the prepare |
1017 | | * is sent at a later point of time along with commit prepared and there |
1018 | | * are other transactions commits between prepare and commit prepared. See |
1019 | | * ReorderBufferFinishPrepared. Don't overwrite those. |
1020 | | */ |
1021 | 0 | if (go_backward || replication_state->remote_lsn < remote_commit) |
1022 | 0 | replication_state->remote_lsn = remote_commit; |
1023 | 0 | if (local_commit != InvalidXLogRecPtr && |
1024 | 0 | (go_backward || replication_state->local_lsn < local_commit)) |
1025 | 0 | replication_state->local_lsn = local_commit; |
1026 | 0 | LWLockRelease(&replication_state->lock); |
1027 | | |
1028 | | /* |
1029 | | * Release *after* changing the LSNs, slot isn't acquired and thus could |
1030 | | * otherwise be dropped anytime. |
1031 | | */ |
1032 | 0 | LWLockRelease(ReplicationOriginLock); |
1033 | 0 | } |
1034 | | |
1035 | | |
1036 | | XLogRecPtr |
1037 | | replorigin_get_progress(RepOriginId node, bool flush) |
1038 | 0 | { |
1039 | 0 | int i; |
1040 | 0 | XLogRecPtr local_lsn = InvalidXLogRecPtr; |
1041 | 0 | XLogRecPtr remote_lsn = InvalidXLogRecPtr; |
1042 | | |
1043 | | /* prevent slots from being concurrently dropped */ |
1044 | 0 | LWLockAcquire(ReplicationOriginLock, LW_SHARED); |
1045 | |
|
1046 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
1047 | 0 | { |
1048 | 0 | ReplicationState *state; |
1049 | |
|
1050 | 0 | state = &replication_states[i]; |
1051 | |
|
1052 | 0 | if (state->roident == node) |
1053 | 0 | { |
1054 | 0 | LWLockAcquire(&state->lock, LW_SHARED); |
1055 | |
|
1056 | 0 | remote_lsn = state->remote_lsn; |
1057 | 0 | local_lsn = state->local_lsn; |
1058 | |
|
1059 | 0 | LWLockRelease(&state->lock); |
1060 | |
|
1061 | 0 | break; |
1062 | 0 | } |
1063 | 0 | } |
1064 | |
|
1065 | 0 | LWLockRelease(ReplicationOriginLock); |
1066 | |
|
1067 | 0 | if (flush && local_lsn != InvalidXLogRecPtr) |
1068 | 0 | XLogFlush(local_lsn); |
1069 | |
|
1070 | 0 | return remote_lsn; |
1071 | 0 | } |
1072 | | |
1073 | | /* |
1074 | | * Tear down a (possibly) configured session replication origin during process |
1075 | | * exit. |
1076 | | */ |
1077 | | static void |
1078 | | ReplicationOriginExitCleanup(int code, Datum arg) |
1079 | 0 | { |
1080 | 0 | ConditionVariable *cv = NULL; |
1081 | |
|
1082 | 0 | if (session_replication_state == NULL) |
1083 | 0 | return; |
1084 | | |
1085 | 0 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
1086 | |
|
1087 | 0 | if (session_replication_state->acquired_by == MyProcPid) |
1088 | 0 | { |
1089 | 0 | cv = &session_replication_state->origin_cv; |
1090 | |
|
1091 | 0 | session_replication_state->acquired_by = 0; |
1092 | 0 | session_replication_state = NULL; |
1093 | 0 | } |
1094 | |
|
1095 | 0 | LWLockRelease(ReplicationOriginLock); |
1096 | |
|
1097 | 0 | if (cv) |
1098 | 0 | ConditionVariableBroadcast(cv); |
1099 | 0 | } |
1100 | | |
1101 | | /* |
1102 | | * Setup a replication origin in the shared memory struct if it doesn't |
1103 | | * already exist and cache access to the specific ReplicationSlot so the |
1104 | | * array doesn't have to be searched when calling |
1105 | | * replorigin_session_advance(). |
1106 | | * |
1107 | | * Normally only one such cached origin can exist per process so the cached |
1108 | | * value can only be set again after the previous value is torn down with |
1109 | | * replorigin_session_reset(). For this normal case pass acquired_by = 0 |
1110 | | * (meaning the slot is not allowed to be already acquired by another process). |
1111 | | * |
1112 | | * However, sometimes multiple processes can safely re-use the same origin slot |
1113 | | * (for example, multiple parallel apply processes can safely use the same |
1114 | | * origin, provided they maintain commit order by allowing only one process to |
1115 | | * commit at a time). For this case the first process must pass acquired_by = |
1116 | | * 0, and then the other processes sharing that same origin can pass |
1117 | | * acquired_by = PID of the first process. |
1118 | | */ |
1119 | | void |
1120 | | replorigin_session_setup(RepOriginId node, int acquired_by) |
1121 | 0 | { |
1122 | 0 | static bool registered_cleanup; |
1123 | 0 | int i; |
1124 | 0 | int free_slot = -1; |
1125 | |
|
1126 | 0 | if (!registered_cleanup) |
1127 | 0 | { |
1128 | 0 | on_shmem_exit(ReplicationOriginExitCleanup, 0); |
1129 | 0 | registered_cleanup = true; |
1130 | 0 | } |
1131 | |
|
1132 | 0 | Assert(max_active_replication_origins > 0); |
1133 | |
|
1134 | 0 | if (session_replication_state != NULL) |
1135 | 0 | ereport(ERROR, |
1136 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1137 | 0 | errmsg("cannot setup replication origin when one is already setup"))); |
1138 | | |
1139 | | /* Lock exclusively, as we may have to create a new table entry. */ |
1140 | 0 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
1141 | | |
1142 | | /* |
1143 | | * Search for either an existing slot for the origin, or a free one we can |
1144 | | * use. |
1145 | | */ |
1146 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
1147 | 0 | { |
1148 | 0 | ReplicationState *curstate = &replication_states[i]; |
1149 | | |
1150 | | /* remember where to insert if necessary */ |
1151 | 0 | if (curstate->roident == InvalidRepOriginId && |
1152 | 0 | free_slot == -1) |
1153 | 0 | { |
1154 | 0 | free_slot = i; |
1155 | 0 | continue; |
1156 | 0 | } |
1157 | | |
1158 | | /* not our slot */ |
1159 | 0 | if (curstate->roident != node) |
1160 | 0 | continue; |
1161 | | |
1162 | 0 | else if (curstate->acquired_by != 0 && acquired_by == 0) |
1163 | 0 | { |
1164 | 0 | ereport(ERROR, |
1165 | 0 | (errcode(ERRCODE_OBJECT_IN_USE), |
1166 | 0 | errmsg("replication origin with ID %d is already active for PID %d", |
1167 | 0 | curstate->roident, curstate->acquired_by))); |
1168 | 0 | } |
1169 | | |
1170 | 0 | else if (curstate->acquired_by != acquired_by) |
1171 | 0 | { |
1172 | 0 | ereport(ERROR, |
1173 | 0 | (errcode(ERRCODE_OBJECT_IN_USE), |
1174 | 0 | errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d", |
1175 | 0 | node, acquired_by))); |
1176 | 0 | } |
1177 | | |
1178 | | /* ok, found slot */ |
1179 | 0 | session_replication_state = curstate; |
1180 | 0 | break; |
1181 | 0 | } |
1182 | | |
1183 | | |
1184 | 0 | if (session_replication_state == NULL && free_slot == -1) |
1185 | 0 | ereport(ERROR, |
1186 | 0 | (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED), |
1187 | 0 | errmsg("could not find free replication state slot for replication origin with ID %d", |
1188 | 0 | node), |
1189 | 0 | errhint("Increase \"max_active_replication_origins\" and try again."))); |
1190 | 0 | else if (session_replication_state == NULL) |
1191 | 0 | { |
1192 | 0 | if (acquired_by) |
1193 | 0 | ereport(ERROR, |
1194 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1195 | 0 | errmsg("cannot use PID %d for inactive replication origin with ID %d", |
1196 | 0 | acquired_by, node))); |
1197 | | |
1198 | | /* initialize new slot */ |
1199 | 0 | session_replication_state = &replication_states[free_slot]; |
1200 | 0 | Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr); |
1201 | 0 | Assert(session_replication_state->local_lsn == InvalidXLogRecPtr); |
1202 | 0 | session_replication_state->roident = node; |
1203 | 0 | } |
1204 | | |
1205 | | |
1206 | 0 | Assert(session_replication_state->roident != InvalidRepOriginId); |
1207 | |
|
1208 | 0 | if (acquired_by == 0) |
1209 | 0 | session_replication_state->acquired_by = MyProcPid; |
1210 | 0 | else |
1211 | 0 | Assert(session_replication_state->acquired_by == acquired_by); |
1212 | |
|
1213 | 0 | LWLockRelease(ReplicationOriginLock); |
1214 | | |
1215 | | /* probably this one is pointless */ |
1216 | 0 | ConditionVariableBroadcast(&session_replication_state->origin_cv); |
1217 | 0 | } |
1218 | | |
1219 | | /* |
1220 | | * Reset replay state previously setup in this session. |
1221 | | * |
1222 | | * This function may only be called if an origin was setup with |
1223 | | * replorigin_session_setup(). |
1224 | | */ |
1225 | | void |
1226 | | replorigin_session_reset(void) |
1227 | 0 | { |
1228 | 0 | ConditionVariable *cv; |
1229 | |
|
1230 | 0 | Assert(max_active_replication_origins != 0); |
1231 | |
|
1232 | 0 | if (session_replication_state == NULL) |
1233 | 0 | ereport(ERROR, |
1234 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1235 | 0 | errmsg("no replication origin is configured"))); |
1236 | | |
1237 | 0 | LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE); |
1238 | |
|
1239 | 0 | session_replication_state->acquired_by = 0; |
1240 | 0 | cv = &session_replication_state->origin_cv; |
1241 | 0 | session_replication_state = NULL; |
1242 | |
|
1243 | 0 | LWLockRelease(ReplicationOriginLock); |
1244 | |
|
1245 | 0 | ConditionVariableBroadcast(cv); |
1246 | 0 | } |
1247 | | |
1248 | | /* |
1249 | | * Do the same work replorigin_advance() does, just on the session's |
1250 | | * configured origin. |
1251 | | * |
1252 | | * This is noticeably cheaper than using replorigin_advance(). |
1253 | | */ |
1254 | | void |
1255 | | replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit) |
1256 | 0 | { |
1257 | 0 | Assert(session_replication_state != NULL); |
1258 | 0 | Assert(session_replication_state->roident != InvalidRepOriginId); |
1259 | |
|
1260 | 0 | LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE); |
1261 | 0 | if (session_replication_state->local_lsn < local_commit) |
1262 | 0 | session_replication_state->local_lsn = local_commit; |
1263 | 0 | if (session_replication_state->remote_lsn < remote_commit) |
1264 | 0 | session_replication_state->remote_lsn = remote_commit; |
1265 | 0 | LWLockRelease(&session_replication_state->lock); |
1266 | 0 | } |
1267 | | |
1268 | | /* |
1269 | | * Ask the machinery about the point up to which we successfully replayed |
1270 | | * changes from an already setup replication origin. |
1271 | | */ |
1272 | | XLogRecPtr |
1273 | | replorigin_session_get_progress(bool flush) |
1274 | 0 | { |
1275 | 0 | XLogRecPtr remote_lsn; |
1276 | 0 | XLogRecPtr local_lsn; |
1277 | |
|
1278 | 0 | Assert(session_replication_state != NULL); |
1279 | |
|
1280 | 0 | LWLockAcquire(&session_replication_state->lock, LW_SHARED); |
1281 | 0 | remote_lsn = session_replication_state->remote_lsn; |
1282 | 0 | local_lsn = session_replication_state->local_lsn; |
1283 | 0 | LWLockRelease(&session_replication_state->lock); |
1284 | |
|
1285 | 0 | if (flush && local_lsn != InvalidXLogRecPtr) |
1286 | 0 | XLogFlush(local_lsn); |
1287 | |
|
1288 | 0 | return remote_lsn; |
1289 | 0 | } |
1290 | | |
1291 | | |
1292 | | |
1293 | | /* --------------------------------------------------------------------------- |
1294 | | * SQL functions for working with replication origin. |
1295 | | * |
1296 | | * These mostly should be fairly short wrappers around more generic functions. |
1297 | | * --------------------------------------------------------------------------- |
1298 | | */ |
1299 | | |
1300 | | /* |
1301 | | * Create replication origin for the passed in name, and return the assigned |
1302 | | * oid. |
1303 | | */ |
1304 | | Datum |
1305 | | pg_replication_origin_create(PG_FUNCTION_ARGS) |
1306 | 0 | { |
1307 | 0 | char *name; |
1308 | 0 | RepOriginId roident; |
1309 | |
|
1310 | 0 | replorigin_check_prerequisites(false, false); |
1311 | |
|
1312 | 0 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1313 | | |
1314 | | /* |
1315 | | * Replication origins "any and "none" are reserved for system options. |
1316 | | * The origins "pg_xxx" are reserved for internal use. |
1317 | | */ |
1318 | 0 | if (IsReservedName(name) || IsReservedOriginName(name)) |
1319 | 0 | ereport(ERROR, |
1320 | 0 | (errcode(ERRCODE_RESERVED_NAME), |
1321 | 0 | errmsg("replication origin name \"%s\" is reserved", |
1322 | 0 | name), |
1323 | 0 | errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.", |
1324 | 0 | LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE))); |
1325 | | |
1326 | | /* |
1327 | | * If built with appropriate switch, whine when regression-testing |
1328 | | * conventions for replication origin names are violated. |
1329 | | */ |
1330 | | #ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS |
1331 | | if (strncmp(name, "regress_", 8) != 0) |
1332 | | elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\""); |
1333 | | #endif |
1334 | | |
1335 | 0 | roident = replorigin_create(name); |
1336 | |
|
1337 | 0 | pfree(name); |
1338 | |
|
1339 | 0 | PG_RETURN_OID(roident); |
1340 | 0 | } |
1341 | | |
1342 | | /* |
1343 | | * Drop replication origin. |
1344 | | */ |
1345 | | Datum |
1346 | | pg_replication_origin_drop(PG_FUNCTION_ARGS) |
1347 | 0 | { |
1348 | 0 | char *name; |
1349 | |
|
1350 | 0 | replorigin_check_prerequisites(false, false); |
1351 | |
|
1352 | 0 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1353 | |
|
1354 | 0 | replorigin_drop_by_name(name, false, true); |
1355 | |
|
1356 | 0 | pfree(name); |
1357 | |
|
1358 | 0 | PG_RETURN_VOID(); |
1359 | 0 | } |
1360 | | |
1361 | | /* |
1362 | | * Return oid of a replication origin. |
1363 | | */ |
1364 | | Datum |
1365 | | pg_replication_origin_oid(PG_FUNCTION_ARGS) |
1366 | 0 | { |
1367 | 0 | char *name; |
1368 | 0 | RepOriginId roident; |
1369 | |
|
1370 | 0 | replorigin_check_prerequisites(false, false); |
1371 | |
|
1372 | 0 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1373 | 0 | roident = replorigin_by_name(name, true); |
1374 | |
|
1375 | 0 | pfree(name); |
1376 | |
|
1377 | 0 | if (OidIsValid(roident)) |
1378 | 0 | PG_RETURN_OID(roident); |
1379 | 0 | PG_RETURN_NULL(); |
1380 | 0 | } |
1381 | | |
1382 | | /* |
1383 | | * Setup a replication origin for this session. |
1384 | | */ |
1385 | | Datum |
1386 | | pg_replication_origin_session_setup(PG_FUNCTION_ARGS) |
1387 | 0 | { |
1388 | 0 | char *name; |
1389 | 0 | RepOriginId origin; |
1390 | 0 | int pid; |
1391 | |
|
1392 | 0 | replorigin_check_prerequisites(true, false); |
1393 | |
|
1394 | 0 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1395 | 0 | origin = replorigin_by_name(name, false); |
1396 | 0 | pid = PG_GETARG_INT32(1); |
1397 | 0 | replorigin_session_setup(origin, pid); |
1398 | |
|
1399 | 0 | replorigin_session_origin = origin; |
1400 | |
|
1401 | 0 | pfree(name); |
1402 | |
|
1403 | 0 | PG_RETURN_VOID(); |
1404 | 0 | } |
1405 | | |
1406 | | /* |
1407 | | * Reset previously setup origin in this session |
1408 | | */ |
1409 | | Datum |
1410 | | pg_replication_origin_session_reset(PG_FUNCTION_ARGS) |
1411 | 0 | { |
1412 | 0 | replorigin_check_prerequisites(true, false); |
1413 | |
|
1414 | 0 | replorigin_session_reset(); |
1415 | |
|
1416 | 0 | replorigin_session_origin = InvalidRepOriginId; |
1417 | 0 | replorigin_session_origin_lsn = InvalidXLogRecPtr; |
1418 | 0 | replorigin_session_origin_timestamp = 0; |
1419 | |
|
1420 | 0 | PG_RETURN_VOID(); |
1421 | 0 | } |
1422 | | |
1423 | | /* |
1424 | | * Has a replication origin been setup for this session. |
1425 | | */ |
1426 | | Datum |
1427 | | pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS) |
1428 | 0 | { |
1429 | 0 | replorigin_check_prerequisites(false, false); |
1430 | |
|
1431 | 0 | PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId); |
1432 | 0 | } |
1433 | | |
1434 | | |
1435 | | /* |
1436 | | * Return the replication progress for origin setup in the current session. |
1437 | | * |
1438 | | * If 'flush' is set to true it is ensured that the returned value corresponds |
1439 | | * to a local transaction that has been flushed. This is useful if asynchronous |
1440 | | * commits are used when replaying replicated transactions. |
1441 | | */ |
1442 | | Datum |
1443 | | pg_replication_origin_session_progress(PG_FUNCTION_ARGS) |
1444 | 0 | { |
1445 | 0 | XLogRecPtr remote_lsn = InvalidXLogRecPtr; |
1446 | 0 | bool flush = PG_GETARG_BOOL(0); |
1447 | |
|
1448 | 0 | replorigin_check_prerequisites(true, false); |
1449 | |
|
1450 | 0 | if (session_replication_state == NULL) |
1451 | 0 | ereport(ERROR, |
1452 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1453 | 0 | errmsg("no replication origin is configured"))); |
1454 | | |
1455 | 0 | remote_lsn = replorigin_session_get_progress(flush); |
1456 | |
|
1457 | 0 | if (remote_lsn == InvalidXLogRecPtr) |
1458 | 0 | PG_RETURN_NULL(); |
1459 | | |
1460 | 0 | PG_RETURN_LSN(remote_lsn); |
1461 | 0 | } |
1462 | | |
1463 | | Datum |
1464 | | pg_replication_origin_xact_setup(PG_FUNCTION_ARGS) |
1465 | 0 | { |
1466 | 0 | XLogRecPtr location = PG_GETARG_LSN(0); |
1467 | |
|
1468 | 0 | replorigin_check_prerequisites(true, false); |
1469 | |
|
1470 | 0 | if (session_replication_state == NULL) |
1471 | 0 | ereport(ERROR, |
1472 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1473 | 0 | errmsg("no replication origin is configured"))); |
1474 | | |
1475 | 0 | replorigin_session_origin_lsn = location; |
1476 | 0 | replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1); |
1477 | |
|
1478 | 0 | PG_RETURN_VOID(); |
1479 | 0 | } |
1480 | | |
1481 | | Datum |
1482 | | pg_replication_origin_xact_reset(PG_FUNCTION_ARGS) |
1483 | 0 | { |
1484 | 0 | replorigin_check_prerequisites(true, false); |
1485 | |
|
1486 | 0 | replorigin_session_origin_lsn = InvalidXLogRecPtr; |
1487 | 0 | replorigin_session_origin_timestamp = 0; |
1488 | |
|
1489 | 0 | PG_RETURN_VOID(); |
1490 | 0 | } |
1491 | | |
1492 | | |
1493 | | Datum |
1494 | | pg_replication_origin_advance(PG_FUNCTION_ARGS) |
1495 | 0 | { |
1496 | 0 | text *name = PG_GETARG_TEXT_PP(0); |
1497 | 0 | XLogRecPtr remote_commit = PG_GETARG_LSN(1); |
1498 | 0 | RepOriginId node; |
1499 | |
|
1500 | 0 | replorigin_check_prerequisites(true, false); |
1501 | | |
1502 | | /* lock to prevent the replication origin from vanishing */ |
1503 | 0 | LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); |
1504 | |
|
1505 | 0 | node = replorigin_by_name(text_to_cstring(name), false); |
1506 | | |
1507 | | /* |
1508 | | * Can't sensibly pass a local commit to be flushed at checkpoint - this |
1509 | | * xact hasn't committed yet. This is why this function should be used to |
1510 | | * set up the initial replication state, but not for replay. |
1511 | | */ |
1512 | 0 | replorigin_advance(node, remote_commit, InvalidXLogRecPtr, |
1513 | 0 | true /* go backward */ , true /* WAL log */ ); |
1514 | |
|
1515 | 0 | UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock); |
1516 | |
|
1517 | 0 | PG_RETURN_VOID(); |
1518 | 0 | } |
1519 | | |
1520 | | |
1521 | | /* |
1522 | | * Return the replication progress for an individual replication origin. |
1523 | | * |
1524 | | * If 'flush' is set to true it is ensured that the returned value corresponds |
1525 | | * to a local transaction that has been flushed. This is useful if asynchronous |
1526 | | * commits are used when replaying replicated transactions. |
1527 | | */ |
1528 | | Datum |
1529 | | pg_replication_origin_progress(PG_FUNCTION_ARGS) |
1530 | 0 | { |
1531 | 0 | char *name; |
1532 | 0 | bool flush; |
1533 | 0 | RepOriginId roident; |
1534 | 0 | XLogRecPtr remote_lsn = InvalidXLogRecPtr; |
1535 | |
|
1536 | 0 | replorigin_check_prerequisites(true, true); |
1537 | |
|
1538 | 0 | name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0))); |
1539 | 0 | flush = PG_GETARG_BOOL(1); |
1540 | |
|
1541 | 0 | roident = replorigin_by_name(name, false); |
1542 | 0 | Assert(OidIsValid(roident)); |
1543 | |
|
1544 | 0 | remote_lsn = replorigin_get_progress(roident, flush); |
1545 | |
|
1546 | 0 | if (remote_lsn == InvalidXLogRecPtr) |
1547 | 0 | PG_RETURN_NULL(); |
1548 | | |
1549 | 0 | PG_RETURN_LSN(remote_lsn); |
1550 | 0 | } |
1551 | | |
1552 | | |
1553 | | Datum |
1554 | | pg_show_replication_origin_status(PG_FUNCTION_ARGS) |
1555 | 0 | { |
1556 | 0 | ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; |
1557 | 0 | int i; |
1558 | 0 | #define REPLICATION_ORIGIN_PROGRESS_COLS 4 |
1559 | | |
1560 | | /* we want to return 0 rows if slot is set to zero */ |
1561 | 0 | replorigin_check_prerequisites(false, true); |
1562 | |
|
1563 | 0 | InitMaterializedSRF(fcinfo, 0); |
1564 | | |
1565 | | /* prevent slots from being concurrently dropped */ |
1566 | 0 | LWLockAcquire(ReplicationOriginLock, LW_SHARED); |
1567 | | |
1568 | | /* |
1569 | | * Iterate through all possible replication_states, display if they are |
1570 | | * filled. Note that we do not take any locks, so slightly corrupted/out |
1571 | | * of date values are a possibility. |
1572 | | */ |
1573 | 0 | for (i = 0; i < max_active_replication_origins; i++) |
1574 | 0 | { |
1575 | 0 | ReplicationState *state; |
1576 | 0 | Datum values[REPLICATION_ORIGIN_PROGRESS_COLS]; |
1577 | 0 | bool nulls[REPLICATION_ORIGIN_PROGRESS_COLS]; |
1578 | 0 | char *roname; |
1579 | |
|
1580 | 0 | state = &replication_states[i]; |
1581 | | |
1582 | | /* unused slot, nothing to display */ |
1583 | 0 | if (state->roident == InvalidRepOriginId) |
1584 | 0 | continue; |
1585 | | |
1586 | 0 | memset(values, 0, sizeof(values)); |
1587 | 0 | memset(nulls, 1, sizeof(nulls)); |
1588 | |
|
1589 | 0 | values[0] = ObjectIdGetDatum(state->roident); |
1590 | 0 | nulls[0] = false; |
1591 | | |
1592 | | /* |
1593 | | * We're not preventing the origin to be dropped concurrently, so |
1594 | | * silently accept that it might be gone. |
1595 | | */ |
1596 | 0 | if (replorigin_by_oid(state->roident, true, |
1597 | 0 | &roname)) |
1598 | 0 | { |
1599 | 0 | values[1] = CStringGetTextDatum(roname); |
1600 | 0 | nulls[1] = false; |
1601 | 0 | } |
1602 | |
|
1603 | 0 | LWLockAcquire(&state->lock, LW_SHARED); |
1604 | |
|
1605 | 0 | values[2] = LSNGetDatum(state->remote_lsn); |
1606 | 0 | nulls[2] = false; |
1607 | |
|
1608 | 0 | values[3] = LSNGetDatum(state->local_lsn); |
1609 | 0 | nulls[3] = false; |
1610 | |
|
1611 | 0 | LWLockRelease(&state->lock); |
1612 | |
|
1613 | 0 | tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, |
1614 | 0 | values, nulls); |
1615 | 0 | } |
1616 | |
|
1617 | 0 | LWLockRelease(ReplicationOriginLock); |
1618 | |
|
1619 | 0 | #undef REPLICATION_ORIGIN_PROGRESS_COLS |
1620 | |
|
1621 | 0 | return (Datum) 0; |
1622 | 0 | } |