Coverage Report

Created: 2025-09-27 06:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/replication/logical/origin.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * origin.c
4
 *    Logical replication progress tracking support.
5
 *
6
 * Copyright (c) 2013-2025, PostgreSQL Global Development Group
7
 *
8
 * IDENTIFICATION
9
 *    src/backend/replication/logical/origin.c
10
 *
11
 * NOTES
12
 *
13
 * This file provides the following:
14
 * * An infrastructure to name nodes in a replication setup
15
 * * A facility to efficiently store and persist replication progress in an
16
 *   efficient and durable manner.
17
 *
18
 * Replication origin consist out of a descriptive, user defined, external
19
 * name and a short, thus space efficient, internal 2 byte one. This split
20
 * exists because replication origin have to be stored in WAL and shared
21
 * memory and long descriptors would be inefficient.  For now only use 2 bytes
22
 * for the internal id of a replication origin as it seems unlikely that there
23
 * soon will be more than 65k nodes in one replication setup; and using only
24
 * two bytes allow us to be more space efficient.
25
 *
26
 * Replication progress is tracked in a shared memory table
27
 * (ReplicationState) that's dumped to disk every checkpoint. Entries
28
 * ('slots') in this table are identified by the internal id. That's the case
29
 * because it allows to increase replication progress during crash
30
 * recovery. To allow doing so we store the original LSN (from the originating
31
 * system) of a transaction in the commit record. That allows to recover the
32
 * precise replayed state after crash recovery; without requiring synchronous
33
 * commits. Allowing logical replication to use asynchronous commit is
34
 * generally good for performance, but especially important as it allows a
35
 * single threaded replay process to keep up with a source that has multiple
36
 * backends generating changes concurrently.  For efficiency and simplicity
37
 * reasons a backend can setup one replication origin that's from then used as
38
 * the source of changes produced by the backend, until reset again.
39
 *
40
 * This infrastructure is intended to be used in cooperation with logical
41
 * decoding. When replaying from a remote system the configured origin is
42
 * provided to output plugins, allowing prevention of replication loops and
43
 * other filtering.
44
 *
45
 * There are several levels of locking at work:
46
 *
47
 * * To create and drop replication origins an exclusive lock on
48
 *   pg_replication_slot is required for the duration. That allows us to
49
 *   safely and conflict free assign new origins using a dirty snapshot.
50
 *
51
 * * When creating an in-memory replication progress slot the ReplicationOrigin
52
 *   LWLock has to be held exclusively; when iterating over the replication
53
 *   progress a shared lock has to be held, the same when advancing the
54
 *   replication progress of an individual backend that has not setup as the
55
 *   session's replication origin.
56
 *
57
 * * When manipulating or looking at the remote_lsn and local_lsn fields of a
58
 *   replication progress slot that slot's lwlock has to be held. That's
59
 *   primarily because we do not assume 8 byte writes (the LSN) is atomic on
60
 *   all our platforms, but it also simplifies memory ordering concerns
61
 *   between the remote and local lsn. We use a lwlock instead of a spinlock
62
 *   so it's less harmful to hold the lock over a WAL write
63
 *   (cf. AdvanceReplicationProgress).
64
 *
65
 * ---------------------------------------------------------------------------
66
 */
67
68
#include "postgres.h"
69
70
#include <unistd.h>
71
#include <sys/stat.h>
72
73
#include "access/genam.h"
74
#include "access/htup_details.h"
75
#include "access/table.h"
76
#include "access/xact.h"
77
#include "access/xloginsert.h"
78
#include "catalog/catalog.h"
79
#include "catalog/indexing.h"
80
#include "catalog/pg_subscription.h"
81
#include "funcapi.h"
82
#include "miscadmin.h"
83
#include "nodes/execnodes.h"
84
#include "pgstat.h"
85
#include "replication/origin.h"
86
#include "replication/slot.h"
87
#include "storage/condition_variable.h"
88
#include "storage/fd.h"
89
#include "storage/ipc.h"
90
#include "storage/lmgr.h"
91
#include "utils/builtins.h"
92
#include "utils/fmgroids.h"
93
#include "utils/guc.h"
94
#include "utils/pg_lsn.h"
95
#include "utils/rel.h"
96
#include "utils/snapmgr.h"
97
#include "utils/syscache.h"
98
99
/* paths for replication origin checkpoint files */
100
0
#define PG_REPLORIGIN_CHECKPOINT_FILENAME PG_LOGICAL_DIR "/replorigin_checkpoint"
101
0
#define PG_REPLORIGIN_CHECKPOINT_TMPFILE PG_REPLORIGIN_CHECKPOINT_FILENAME ".tmp"
102
103
/* GUC variables */
104
int     max_active_replication_origins = 10;
105
106
/*
107
 * Replay progress of a single remote node.
108
 */
109
typedef struct ReplicationState
110
{
111
  /*
112
   * Local identifier for the remote node.
113
   */
114
  RepOriginId roident;
115
116
  /*
117
   * Location of the latest commit from the remote side.
118
   */
119
  XLogRecPtr  remote_lsn;
120
121
  /*
122
   * Remember the local lsn of the commit record so we can XLogFlush() to it
123
   * during a checkpoint so we know the commit record actually is safe on
124
   * disk.
125
   */
126
  XLogRecPtr  local_lsn;
127
128
  /*
129
   * PID of backend that's acquired slot, or 0 if none.
130
   */
131
  int     acquired_by;
132
133
  /*
134
   * Condition variable that's signaled when acquired_by changes.
135
   */
136
  ConditionVariable origin_cv;
137
138
  /*
139
   * Lock protecting remote_lsn and local_lsn.
140
   */
141
  LWLock    lock;
142
} ReplicationState;
143
144
/*
145
 * On disk version of ReplicationState.
146
 */
147
typedef struct ReplicationStateOnDisk
148
{
149
  RepOriginId roident;
150
  XLogRecPtr  remote_lsn;
151
} ReplicationStateOnDisk;
152
153
154
typedef struct ReplicationStateCtl
155
{
156
  /* Tranche to use for per-origin LWLocks */
157
  int     tranche_id;
158
  /* Array of length max_active_replication_origins */
159
  ReplicationState states[FLEXIBLE_ARRAY_MEMBER];
160
} ReplicationStateCtl;
161
162
/* external variables */
163
RepOriginId replorigin_session_origin = InvalidRepOriginId; /* assumed identity */
164
XLogRecPtr  replorigin_session_origin_lsn = InvalidXLogRecPtr;
165
TimestampTz replorigin_session_origin_timestamp = 0;
166
167
/*
168
 * Base address into a shared memory array of replication states of size
169
 * max_active_replication_origins.
170
 */
171
static ReplicationState *replication_states;
172
173
/*
174
 * Actual shared memory block (replication_states[] is now part of this).
175
 */
176
static ReplicationStateCtl *replication_states_ctl;
177
178
/*
179
 * We keep a pointer to this backend's ReplicationState to avoid having to
180
 * search the replication_states array in replorigin_session_advance for each
181
 * remote commit.  (Ownership of a backend's own entry can only be changed by
182
 * that backend.)
183
 */
184
static ReplicationState *session_replication_state = NULL;
185
186
/* Magic for on disk files. */
187
0
#define REPLICATION_STATE_MAGIC ((uint32) 0x1257DADE)
188
189
static void
190
replorigin_check_prerequisites(bool check_origins, bool recoveryOK)
191
0
{
192
0
  if (check_origins && max_active_replication_origins == 0)
193
0
    ereport(ERROR,
194
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
195
0
         errmsg("cannot query or manipulate replication origin when \"max_active_replication_origins\" is 0")));
196
197
0
  if (!recoveryOK && RecoveryInProgress())
198
0
    ereport(ERROR,
199
0
        (errcode(ERRCODE_READ_ONLY_SQL_TRANSACTION),
200
0
         errmsg("cannot manipulate replication origins during recovery")));
201
0
}
202
203
204
/*
205
 * IsReservedOriginName
206
 *    True iff name is either "none" or "any".
207
 */
208
static bool
209
IsReservedOriginName(const char *name)
210
0
{
211
0
  return ((pg_strcasecmp(name, LOGICALREP_ORIGIN_NONE) == 0) ||
212
0
      (pg_strcasecmp(name, LOGICALREP_ORIGIN_ANY) == 0));
213
0
}
214
215
/* ---------------------------------------------------------------------------
216
 * Functions for working with replication origins themselves.
217
 * ---------------------------------------------------------------------------
218
 */
219
220
/*
221
 * Check for a persistent replication origin identified by name.
222
 *
223
 * Returns InvalidOid if the node isn't known yet and missing_ok is true.
224
 */
225
RepOriginId
226
replorigin_by_name(const char *roname, bool missing_ok)
227
0
{
228
0
  Form_pg_replication_origin ident;
229
0
  Oid     roident = InvalidOid;
230
0
  HeapTuple tuple;
231
0
  Datum   roname_d;
232
233
0
  roname_d = CStringGetTextDatum(roname);
234
235
0
  tuple = SearchSysCache1(REPLORIGNAME, roname_d);
236
0
  if (HeapTupleIsValid(tuple))
237
0
  {
238
0
    ident = (Form_pg_replication_origin) GETSTRUCT(tuple);
239
0
    roident = ident->roident;
240
0
    ReleaseSysCache(tuple);
241
0
  }
242
0
  else if (!missing_ok)
243
0
    ereport(ERROR,
244
0
        (errcode(ERRCODE_UNDEFINED_OBJECT),
245
0
         errmsg("replication origin \"%s\" does not exist",
246
0
            roname)));
247
248
0
  return roident;
249
0
}
250
251
/*
252
 * Create a replication origin.
253
 *
254
 * Needs to be called in a transaction.
255
 */
256
RepOriginId
257
replorigin_create(const char *roname)
258
0
{
259
0
  Oid     roident;
260
0
  HeapTuple tuple = NULL;
261
0
  Relation  rel;
262
0
  Datum   roname_d;
263
0
  SnapshotData SnapshotDirty;
264
0
  SysScanDesc scan;
265
0
  ScanKeyData key;
266
267
  /*
268
   * To avoid needing a TOAST table for pg_replication_origin, we limit
269
   * replication origin names to 512 bytes.  This should be more than enough
270
   * for all practical use.
271
   */
272
0
  if (strlen(roname) > MAX_RONAME_LEN)
273
0
    ereport(ERROR,
274
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
275
0
         errmsg("replication origin name is too long"),
276
0
         errdetail("Replication origin names must be no longer than %d bytes.",
277
0
               MAX_RONAME_LEN)));
278
279
0
  roname_d = CStringGetTextDatum(roname);
280
281
0
  Assert(IsTransactionState());
282
283
  /*
284
   * We need the numeric replication origin to be 16bit wide, so we cannot
285
   * rely on the normal oid allocation. Instead we simply scan
286
   * pg_replication_origin for the first unused id. That's not particularly
287
   * efficient, but this should be a fairly infrequent operation - we can
288
   * easily spend a bit more code on this when it turns out it needs to be
289
   * faster.
290
   *
291
   * We handle concurrency by taking an exclusive lock (allowing reads!)
292
   * over the table for the duration of the search. Because we use a "dirty
293
   * snapshot" we can read rows that other in-progress sessions have
294
   * written, even though they would be invisible with normal snapshots. Due
295
   * to the exclusive lock there's no danger that new rows can appear while
296
   * we're checking.
297
   */
298
0
  InitDirtySnapshot(SnapshotDirty);
299
300
0
  rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
301
302
  /*
303
   * We want to be able to access pg_replication_origin without setting up a
304
   * snapshot.  To make that safe, it needs to not have a TOAST table, since
305
   * TOASTed data cannot be fetched without a snapshot.  As of this writing,
306
   * its only varlena column is roname, which we limit to 512 bytes to avoid
307
   * needing out-of-line storage.  If you add a TOAST table to this catalog,
308
   * be sure to set up a snapshot everywhere it might be needed.  For more
309
   * information, see https://postgr.es/m/ZvMSUPOqUU-VNADN%40nathan.
310
   */
311
0
  Assert(!OidIsValid(rel->rd_rel->reltoastrelid));
312
313
0
  for (roident = InvalidOid + 1; roident < PG_UINT16_MAX; roident++)
314
0
  {
315
0
    bool    nulls[Natts_pg_replication_origin];
316
0
    Datum   values[Natts_pg_replication_origin];
317
0
    bool    collides;
318
319
0
    CHECK_FOR_INTERRUPTS();
320
321
0
    ScanKeyInit(&key,
322
0
          Anum_pg_replication_origin_roident,
323
0
          BTEqualStrategyNumber, F_OIDEQ,
324
0
          ObjectIdGetDatum(roident));
325
326
0
    scan = systable_beginscan(rel, ReplicationOriginIdentIndex,
327
0
                  true /* indexOK */ ,
328
0
                  &SnapshotDirty,
329
0
                  1, &key);
330
331
0
    collides = HeapTupleIsValid(systable_getnext(scan));
332
333
0
    systable_endscan(scan);
334
335
0
    if (!collides)
336
0
    {
337
      /*
338
       * Ok, found an unused roident, insert the new row and do a CCI,
339
       * so our callers can look it up if they want to.
340
       */
341
0
      memset(&nulls, 0, sizeof(nulls));
342
343
0
      values[Anum_pg_replication_origin_roident - 1] = ObjectIdGetDatum(roident);
344
0
      values[Anum_pg_replication_origin_roname - 1] = roname_d;
345
346
0
      tuple = heap_form_tuple(RelationGetDescr(rel), values, nulls);
347
0
      CatalogTupleInsert(rel, tuple);
348
0
      CommandCounterIncrement();
349
0
      break;
350
0
    }
351
0
  }
352
353
  /* now release lock again,  */
354
0
  table_close(rel, ExclusiveLock);
355
356
0
  if (tuple == NULL)
357
0
    ereport(ERROR,
358
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
359
0
         errmsg("could not find free replication origin ID")));
360
361
0
  heap_freetuple(tuple);
362
0
  return roident;
363
0
}
364
365
/*
366
 * Helper function to drop a replication origin.
367
 */
368
static void
369
replorigin_state_clear(RepOriginId roident, bool nowait)
370
0
{
371
0
  int     i;
372
373
  /*
374
   * Clean up the slot state info, if there is any matching slot.
375
   */
376
0
restart:
377
0
  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
378
379
0
  for (i = 0; i < max_active_replication_origins; i++)
380
0
  {
381
0
    ReplicationState *state = &replication_states[i];
382
383
0
    if (state->roident == roident)
384
0
    {
385
      /* found our slot, is it busy? */
386
0
      if (state->acquired_by != 0)
387
0
      {
388
0
        ConditionVariable *cv;
389
390
0
        if (nowait)
391
0
          ereport(ERROR,
392
0
              (errcode(ERRCODE_OBJECT_IN_USE),
393
0
               errmsg("could not drop replication origin with ID %d, in use by PID %d",
394
0
                  state->roident,
395
0
                  state->acquired_by)));
396
397
        /*
398
         * We must wait and then retry.  Since we don't know which CV
399
         * to wait on until here, we can't readily use
400
         * ConditionVariablePrepareToSleep (calling it here would be
401
         * wrong, since we could miss the signal if we did so); just
402
         * use ConditionVariableSleep directly.
403
         */
404
0
        cv = &state->origin_cv;
405
406
0
        LWLockRelease(ReplicationOriginLock);
407
408
0
        ConditionVariableSleep(cv, WAIT_EVENT_REPLICATION_ORIGIN_DROP);
409
0
        goto restart;
410
0
      }
411
412
      /* first make a WAL log entry */
413
0
      {
414
0
        xl_replorigin_drop xlrec;
415
416
0
        xlrec.node_id = roident;
417
0
        XLogBeginInsert();
418
0
        XLogRegisterData(&xlrec, sizeof(xlrec));
419
0
        XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_DROP);
420
0
      }
421
422
      /* then clear the in-memory slot */
423
0
      state->roident = InvalidRepOriginId;
424
0
      state->remote_lsn = InvalidXLogRecPtr;
425
0
      state->local_lsn = InvalidXLogRecPtr;
426
0
      break;
427
0
    }
428
0
  }
429
0
  LWLockRelease(ReplicationOriginLock);
430
0
  ConditionVariableCancelSleep();
431
0
}
432
433
/*
434
 * Drop replication origin (by name).
435
 *
436
 * Needs to be called in a transaction.
437
 */
438
void
439
replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
440
0
{
441
0
  RepOriginId roident;
442
0
  Relation  rel;
443
0
  HeapTuple tuple;
444
445
0
  Assert(IsTransactionState());
446
447
0
  rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
448
449
0
  roident = replorigin_by_name(name, missing_ok);
450
451
  /* Lock the origin to prevent concurrent drops. */
452
0
  LockSharedObject(ReplicationOriginRelationId, roident, 0,
453
0
           AccessExclusiveLock);
454
455
0
  tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
456
0
  if (!HeapTupleIsValid(tuple))
457
0
  {
458
0
    if (!missing_ok)
459
0
      elog(ERROR, "cache lookup failed for replication origin with ID %d",
460
0
         roident);
461
462
    /*
463
     * We don't need to retain the locks if the origin is already dropped.
464
     */
465
0
    UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
466
0
               AccessExclusiveLock);
467
0
    table_close(rel, RowExclusiveLock);
468
0
    return;
469
0
  }
470
471
0
  replorigin_state_clear(roident, nowait);
472
473
  /*
474
   * Now, we can delete the catalog entry.
475
   */
476
0
  CatalogTupleDelete(rel, &tuple->t_self);
477
0
  ReleaseSysCache(tuple);
478
479
0
  CommandCounterIncrement();
480
481
  /* We keep the lock on pg_replication_origin until commit */
482
0
  table_close(rel, NoLock);
483
0
}
484
485
/*
486
 * Lookup replication origin via its oid and return the name.
487
 *
488
 * The external name is palloc'd in the calling context.
489
 *
490
 * Returns true if the origin is known, false otherwise.
491
 */
492
bool
493
replorigin_by_oid(RepOriginId roident, bool missing_ok, char **roname)
494
0
{
495
0
  HeapTuple tuple;
496
0
  Form_pg_replication_origin ric;
497
498
0
  Assert(OidIsValid((Oid) roident));
499
0
  Assert(roident != InvalidRepOriginId);
500
0
  Assert(roident != DoNotReplicateId);
501
502
0
  tuple = SearchSysCache1(REPLORIGIDENT,
503
0
              ObjectIdGetDatum((Oid) roident));
504
505
0
  if (HeapTupleIsValid(tuple))
506
0
  {
507
0
    ric = (Form_pg_replication_origin) GETSTRUCT(tuple);
508
0
    *roname = text_to_cstring(&ric->roname);
509
0
    ReleaseSysCache(tuple);
510
511
0
    return true;
512
0
  }
513
0
  else
514
0
  {
515
0
    *roname = NULL;
516
517
0
    if (!missing_ok)
518
0
      ereport(ERROR,
519
0
          (errcode(ERRCODE_UNDEFINED_OBJECT),
520
0
           errmsg("replication origin with ID %d does not exist",
521
0
              roident)));
522
523
0
    return false;
524
0
  }
525
0
}
526
527
528
/* ---------------------------------------------------------------------------
529
 * Functions for handling replication progress.
530
 * ---------------------------------------------------------------------------
531
 */
532
533
Size
534
ReplicationOriginShmemSize(void)
535
0
{
536
0
  Size    size = 0;
537
538
0
  if (max_active_replication_origins == 0)
539
0
    return size;
540
541
0
  size = add_size(size, offsetof(ReplicationStateCtl, states));
542
543
0
  size = add_size(size,
544
0
          mul_size(max_active_replication_origins, sizeof(ReplicationState)));
545
0
  return size;
546
0
}
547
548
void
549
ReplicationOriginShmemInit(void)
550
0
{
551
0
  bool    found;
552
553
0
  if (max_active_replication_origins == 0)
554
0
    return;
555
556
0
  replication_states_ctl = (ReplicationStateCtl *)
557
0
    ShmemInitStruct("ReplicationOriginState",
558
0
            ReplicationOriginShmemSize(),
559
0
            &found);
560
0
  replication_states = replication_states_ctl->states;
561
562
0
  if (!found)
563
0
  {
564
0
    int     i;
565
566
0
    MemSet(replication_states_ctl, 0, ReplicationOriginShmemSize());
567
568
0
    replication_states_ctl->tranche_id = LWTRANCHE_REPLICATION_ORIGIN_STATE;
569
570
0
    for (i = 0; i < max_active_replication_origins; i++)
571
0
    {
572
0
      LWLockInitialize(&replication_states[i].lock,
573
0
               replication_states_ctl->tranche_id);
574
0
      ConditionVariableInit(&replication_states[i].origin_cv);
575
0
    }
576
0
  }
577
0
}
578
579
/* ---------------------------------------------------------------------------
580
 * Perform a checkpoint of each replication origin's progress with respect to
581
 * the replayed remote_lsn. Make sure that all transactions we refer to in the
582
 * checkpoint (local_lsn) are actually on-disk. This might not yet be the case
583
 * if the transactions were originally committed asynchronously.
584
 *
585
 * We store checkpoints in the following format:
586
 * +-------+------------------------+------------------+-----+--------+
587
 * | MAGIC | ReplicationStateOnDisk | struct Replic... | ... | CRC32C | EOF
588
 * +-------+------------------------+------------------+-----+--------+
589
 *
590
 * So its just the magic, followed by the statically sized
591
 * ReplicationStateOnDisk structs. Note that the maximum number of
592
 * ReplicationState is determined by max_active_replication_origins.
593
 * ---------------------------------------------------------------------------
594
 */
595
void
596
CheckPointReplicationOrigin(void)
597
0
{
598
0
  const char *tmppath = PG_REPLORIGIN_CHECKPOINT_TMPFILE;
599
0
  const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
600
0
  int     tmpfd;
601
0
  int     i;
602
0
  uint32    magic = REPLICATION_STATE_MAGIC;
603
0
  pg_crc32c crc;
604
605
0
  if (max_active_replication_origins == 0)
606
0
    return;
607
608
0
  INIT_CRC32C(crc);
609
610
  /* make sure no old temp file is remaining */
611
0
  if (unlink(tmppath) < 0 && errno != ENOENT)
612
0
    ereport(PANIC,
613
0
        (errcode_for_file_access(),
614
0
         errmsg("could not remove file \"%s\": %m",
615
0
            tmppath)));
616
617
  /*
618
   * no other backend can perform this at the same time; only one checkpoint
619
   * can happen at a time.
620
   */
621
0
  tmpfd = OpenTransientFile(tmppath,
622
0
                O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
623
0
  if (tmpfd < 0)
624
0
    ereport(PANIC,
625
0
        (errcode_for_file_access(),
626
0
         errmsg("could not create file \"%s\": %m",
627
0
            tmppath)));
628
629
  /* write magic */
630
0
  errno = 0;
631
0
  if ((write(tmpfd, &magic, sizeof(magic))) != sizeof(magic))
632
0
  {
633
    /* if write didn't set errno, assume problem is no disk space */
634
0
    if (errno == 0)
635
0
      errno = ENOSPC;
636
0
    ereport(PANIC,
637
0
        (errcode_for_file_access(),
638
0
         errmsg("could not write to file \"%s\": %m",
639
0
            tmppath)));
640
0
  }
641
0
  COMP_CRC32C(crc, &magic, sizeof(magic));
642
643
  /* prevent concurrent creations/drops */
644
0
  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
645
646
  /* write actual data */
647
0
  for (i = 0; i < max_active_replication_origins; i++)
648
0
  {
649
0
    ReplicationStateOnDisk disk_state;
650
0
    ReplicationState *curstate = &replication_states[i];
651
0
    XLogRecPtr  local_lsn;
652
653
0
    if (curstate->roident == InvalidRepOriginId)
654
0
      continue;
655
656
    /* zero, to avoid uninitialized padding bytes */
657
0
    memset(&disk_state, 0, sizeof(disk_state));
658
659
0
    LWLockAcquire(&curstate->lock, LW_SHARED);
660
661
0
    disk_state.roident = curstate->roident;
662
663
0
    disk_state.remote_lsn = curstate->remote_lsn;
664
0
    local_lsn = curstate->local_lsn;
665
666
0
    LWLockRelease(&curstate->lock);
667
668
    /* make sure we only write out a commit that's persistent */
669
0
    XLogFlush(local_lsn);
670
671
0
    errno = 0;
672
0
    if ((write(tmpfd, &disk_state, sizeof(disk_state))) !=
673
0
      sizeof(disk_state))
674
0
    {
675
      /* if write didn't set errno, assume problem is no disk space */
676
0
      if (errno == 0)
677
0
        errno = ENOSPC;
678
0
      ereport(PANIC,
679
0
          (errcode_for_file_access(),
680
0
           errmsg("could not write to file \"%s\": %m",
681
0
              tmppath)));
682
0
    }
683
684
0
    COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
685
0
  }
686
687
0
  LWLockRelease(ReplicationOriginLock);
688
689
  /* write out the CRC */
690
0
  FIN_CRC32C(crc);
691
0
  errno = 0;
692
0
  if ((write(tmpfd, &crc, sizeof(crc))) != sizeof(crc))
693
0
  {
694
    /* if write didn't set errno, assume problem is no disk space */
695
0
    if (errno == 0)
696
0
      errno = ENOSPC;
697
0
    ereport(PANIC,
698
0
        (errcode_for_file_access(),
699
0
         errmsg("could not write to file \"%s\": %m",
700
0
            tmppath)));
701
0
  }
702
703
0
  if (CloseTransientFile(tmpfd) != 0)
704
0
    ereport(PANIC,
705
0
        (errcode_for_file_access(),
706
0
         errmsg("could not close file \"%s\": %m",
707
0
            tmppath)));
708
709
  /* fsync, rename to permanent file, fsync file and directory */
710
0
  durable_rename(tmppath, path, PANIC);
711
0
}
712
713
/*
714
 * Recover replication replay status from checkpoint data saved earlier by
715
 * CheckPointReplicationOrigin.
716
 *
717
 * This only needs to be called at startup and *not* during every checkpoint
718
 * read during recovery (e.g. in HS or PITR from a base backup) afterwards. All
719
 * state thereafter can be recovered by looking at commit records.
720
 */
721
void
722
StartupReplicationOrigin(void)
723
0
{
724
0
  const char *path = PG_REPLORIGIN_CHECKPOINT_FILENAME;
725
0
  int     fd;
726
0
  int     readBytes;
727
0
  uint32    magic = REPLICATION_STATE_MAGIC;
728
0
  int     last_state = 0;
729
0
  pg_crc32c file_crc;
730
0
  pg_crc32c crc;
731
732
  /* don't want to overwrite already existing state */
733
#ifdef USE_ASSERT_CHECKING
734
  static bool already_started = false;
735
736
  Assert(!already_started);
737
  already_started = true;
738
#endif
739
740
0
  if (max_active_replication_origins == 0)
741
0
    return;
742
743
0
  INIT_CRC32C(crc);
744
745
0
  elog(DEBUG2, "starting up replication origin progress state");
746
747
0
  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
748
749
  /*
750
   * might have had max_active_replication_origins == 0 last run, or we just
751
   * brought up a standby.
752
   */
753
0
  if (fd < 0 && errno == ENOENT)
754
0
    return;
755
0
  else if (fd < 0)
756
0
    ereport(PANIC,
757
0
        (errcode_for_file_access(),
758
0
         errmsg("could not open file \"%s\": %m",
759
0
            path)));
760
761
  /* verify magic, that is written even if nothing was active */
762
0
  readBytes = read(fd, &magic, sizeof(magic));
763
0
  if (readBytes != sizeof(magic))
764
0
  {
765
0
    if (readBytes < 0)
766
0
      ereport(PANIC,
767
0
          (errcode_for_file_access(),
768
0
           errmsg("could not read file \"%s\": %m",
769
0
              path)));
770
0
    else
771
0
      ereport(PANIC,
772
0
          (errcode(ERRCODE_DATA_CORRUPTED),
773
0
           errmsg("could not read file \"%s\": read %d of %zu",
774
0
              path, readBytes, sizeof(magic))));
775
0
  }
776
0
  COMP_CRC32C(crc, &magic, sizeof(magic));
777
778
0
  if (magic != REPLICATION_STATE_MAGIC)
779
0
    ereport(PANIC,
780
0
        (errmsg("replication checkpoint has wrong magic %u instead of %u",
781
0
            magic, REPLICATION_STATE_MAGIC)));
782
783
  /* we can skip locking here, no other access is possible */
784
785
  /* recover individual states, until there are no more to be found */
786
0
  while (true)
787
0
  {
788
0
    ReplicationStateOnDisk disk_state;
789
790
0
    readBytes = read(fd, &disk_state, sizeof(disk_state));
791
792
    /* no further data */
793
0
    if (readBytes == sizeof(crc))
794
0
    {
795
      /* not pretty, but simple ... */
796
0
      file_crc = *(pg_crc32c *) &disk_state;
797
0
      break;
798
0
    }
799
800
0
    if (readBytes < 0)
801
0
    {
802
0
      ereport(PANIC,
803
0
          (errcode_for_file_access(),
804
0
           errmsg("could not read file \"%s\": %m",
805
0
              path)));
806
0
    }
807
808
0
    if (readBytes != sizeof(disk_state))
809
0
    {
810
0
      ereport(PANIC,
811
0
          (errcode_for_file_access(),
812
0
           errmsg("could not read file \"%s\": read %d of %zu",
813
0
              path, readBytes, sizeof(disk_state))));
814
0
    }
815
816
0
    COMP_CRC32C(crc, &disk_state, sizeof(disk_state));
817
818
0
    if (last_state == max_active_replication_origins)
819
0
      ereport(PANIC,
820
0
          (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
821
0
           errmsg("could not find free replication state, increase \"max_active_replication_origins\"")));
822
823
    /* copy data to shared memory */
824
0
    replication_states[last_state].roident = disk_state.roident;
825
0
    replication_states[last_state].remote_lsn = disk_state.remote_lsn;
826
0
    last_state++;
827
828
0
    ereport(LOG,
829
0
        errmsg("recovered replication state of node %d to %X/%08X",
830
0
             disk_state.roident,
831
0
             LSN_FORMAT_ARGS(disk_state.remote_lsn)));
832
0
  }
833
834
  /* now check checksum */
835
0
  FIN_CRC32C(crc);
836
0
  if (file_crc != crc)
837
0
    ereport(PANIC,
838
0
        (errcode(ERRCODE_DATA_CORRUPTED),
839
0
         errmsg("replication slot checkpoint has wrong checksum %u, expected %u",
840
0
            crc, file_crc)));
841
842
0
  if (CloseTransientFile(fd) != 0)
843
0
    ereport(PANIC,
844
0
        (errcode_for_file_access(),
845
0
         errmsg("could not close file \"%s\": %m",
846
0
            path)));
847
0
}
848
849
void
850
replorigin_redo(XLogReaderState *record)
851
0
{
852
0
  uint8   info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
853
854
0
  switch (info)
855
0
  {
856
0
    case XLOG_REPLORIGIN_SET:
857
0
      {
858
0
        xl_replorigin_set *xlrec =
859
0
          (xl_replorigin_set *) XLogRecGetData(record);
860
861
0
        replorigin_advance(xlrec->node_id,
862
0
                   xlrec->remote_lsn, record->EndRecPtr,
863
0
                   xlrec->force /* backward */ ,
864
0
                   false /* WAL log */ );
865
0
        break;
866
0
      }
867
0
    case XLOG_REPLORIGIN_DROP:
868
0
      {
869
0
        xl_replorigin_drop *xlrec;
870
0
        int     i;
871
872
0
        xlrec = (xl_replorigin_drop *) XLogRecGetData(record);
873
874
0
        for (i = 0; i < max_active_replication_origins; i++)
875
0
        {
876
0
          ReplicationState *state = &replication_states[i];
877
878
          /* found our slot */
879
0
          if (state->roident == xlrec->node_id)
880
0
          {
881
            /* reset entry */
882
0
            state->roident = InvalidRepOriginId;
883
0
            state->remote_lsn = InvalidXLogRecPtr;
884
0
            state->local_lsn = InvalidXLogRecPtr;
885
0
            break;
886
0
          }
887
0
        }
888
0
        break;
889
0
      }
890
0
    default:
891
0
      elog(PANIC, "replorigin_redo: unknown op code %u", info);
892
0
  }
893
0
}
894
895
896
/*
897
 * Tell the replication origin progress machinery that a commit from 'node'
898
 * that originated at the LSN remote_commit on the remote node was replayed
899
 * successfully and that we don't need to do so again. In combination with
900
 * setting up replorigin_session_origin_lsn and replorigin_session_origin
901
 * that ensures we won't lose knowledge about that after a crash if the
902
 * transaction had a persistent effect (think of asynchronous commits).
903
 *
904
 * local_commit needs to be a local LSN of the commit so that we can make sure
905
 * upon a checkpoint that enough WAL has been persisted to disk.
906
 *
907
 * Needs to be called with a RowExclusiveLock on pg_replication_origin,
908
 * unless running in recovery.
909
 */
910
void
911
replorigin_advance(RepOriginId node,
912
           XLogRecPtr remote_commit, XLogRecPtr local_commit,
913
           bool go_backward, bool wal_log)
914
0
{
915
0
  int     i;
916
0
  ReplicationState *replication_state = NULL;
917
0
  ReplicationState *free_state = NULL;
918
919
0
  Assert(node != InvalidRepOriginId);
920
921
  /* we don't track DoNotReplicateId */
922
0
  if (node == DoNotReplicateId)
923
0
    return;
924
925
  /*
926
   * XXX: For the case where this is called by WAL replay, it'd be more
927
   * efficient to restore into a backend local hashtable and only dump into
928
   * shmem after recovery is finished. Let's wait with implementing that
929
   * till it's shown to be a measurable expense
930
   */
931
932
  /* Lock exclusively, as we may have to create a new table entry. */
933
0
  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
934
935
  /*
936
   * Search for either an existing slot for the origin, or a free one we can
937
   * use.
938
   */
939
0
  for (i = 0; i < max_active_replication_origins; i++)
940
0
  {
941
0
    ReplicationState *curstate = &replication_states[i];
942
943
    /* remember where to insert if necessary */
944
0
    if (curstate->roident == InvalidRepOriginId &&
945
0
      free_state == NULL)
946
0
    {
947
0
      free_state = curstate;
948
0
      continue;
949
0
    }
950
951
    /* not our slot */
952
0
    if (curstate->roident != node)
953
0
    {
954
0
      continue;
955
0
    }
956
957
    /* ok, found slot */
958
0
    replication_state = curstate;
959
960
0
    LWLockAcquire(&replication_state->lock, LW_EXCLUSIVE);
961
962
    /* Make sure it's not used by somebody else */
963
0
    if (replication_state->acquired_by != 0)
964
0
    {
965
0
      ereport(ERROR,
966
0
          (errcode(ERRCODE_OBJECT_IN_USE),
967
0
           errmsg("replication origin with ID %d is already active for PID %d",
968
0
              replication_state->roident,
969
0
              replication_state->acquired_by)));
970
0
    }
971
972
0
    break;
973
0
  }
974
975
0
  if (replication_state == NULL && free_state == NULL)
976
0
    ereport(ERROR,
977
0
        (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
978
0
         errmsg("could not find free replication state slot for replication origin with ID %d",
979
0
            node),
980
0
         errhint("Increase \"max_active_replication_origins\" and try again.")));
981
982
0
  if (replication_state == NULL)
983
0
  {
984
    /* initialize new slot */
985
0
    LWLockAcquire(&free_state->lock, LW_EXCLUSIVE);
986
0
    replication_state = free_state;
987
0
    Assert(replication_state->remote_lsn == InvalidXLogRecPtr);
988
0
    Assert(replication_state->local_lsn == InvalidXLogRecPtr);
989
0
    replication_state->roident = node;
990
0
  }
991
992
0
  Assert(replication_state->roident != InvalidRepOriginId);
993
994
  /*
995
   * If somebody "forcefully" sets this slot, WAL log it, so it's durable
996
   * and the standby gets the message. Primarily this will be called during
997
   * WAL replay (of commit records) where no WAL logging is necessary.
998
   */
999
0
  if (wal_log)
1000
0
  {
1001
0
    xl_replorigin_set xlrec;
1002
1003
0
    xlrec.remote_lsn = remote_commit;
1004
0
    xlrec.node_id = node;
1005
0
    xlrec.force = go_backward;
1006
1007
0
    XLogBeginInsert();
1008
0
    XLogRegisterData(&xlrec, sizeof(xlrec));
1009
1010
0
    XLogInsert(RM_REPLORIGIN_ID, XLOG_REPLORIGIN_SET);
1011
0
  }
1012
1013
  /*
1014
   * Due to - harmless - race conditions during a checkpoint we could see
1015
   * values here that are older than the ones we already have in memory. We
1016
   * could also see older values for prepared transactions when the prepare
1017
   * is sent at a later point of time along with commit prepared and there
1018
   * are other transactions commits between prepare and commit prepared. See
1019
   * ReorderBufferFinishPrepared. Don't overwrite those.
1020
   */
1021
0
  if (go_backward || replication_state->remote_lsn < remote_commit)
1022
0
    replication_state->remote_lsn = remote_commit;
1023
0
  if (local_commit != InvalidXLogRecPtr &&
1024
0
    (go_backward || replication_state->local_lsn < local_commit))
1025
0
    replication_state->local_lsn = local_commit;
1026
0
  LWLockRelease(&replication_state->lock);
1027
1028
  /*
1029
   * Release *after* changing the LSNs, slot isn't acquired and thus could
1030
   * otherwise be dropped anytime.
1031
   */
1032
0
  LWLockRelease(ReplicationOriginLock);
1033
0
}
1034
1035
1036
XLogRecPtr
1037
replorigin_get_progress(RepOriginId node, bool flush)
1038
0
{
1039
0
  int     i;
1040
0
  XLogRecPtr  local_lsn = InvalidXLogRecPtr;
1041
0
  XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
1042
1043
  /* prevent slots from being concurrently dropped */
1044
0
  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1045
1046
0
  for (i = 0; i < max_active_replication_origins; i++)
1047
0
  {
1048
0
    ReplicationState *state;
1049
1050
0
    state = &replication_states[i];
1051
1052
0
    if (state->roident == node)
1053
0
    {
1054
0
      LWLockAcquire(&state->lock, LW_SHARED);
1055
1056
0
      remote_lsn = state->remote_lsn;
1057
0
      local_lsn = state->local_lsn;
1058
1059
0
      LWLockRelease(&state->lock);
1060
1061
0
      break;
1062
0
    }
1063
0
  }
1064
1065
0
  LWLockRelease(ReplicationOriginLock);
1066
1067
0
  if (flush && local_lsn != InvalidXLogRecPtr)
1068
0
    XLogFlush(local_lsn);
1069
1070
0
  return remote_lsn;
1071
0
}
1072
1073
/*
1074
 * Tear down a (possibly) configured session replication origin during process
1075
 * exit.
1076
 */
1077
static void
1078
ReplicationOriginExitCleanup(int code, Datum arg)
1079
0
{
1080
0
  ConditionVariable *cv = NULL;
1081
1082
0
  if (session_replication_state == NULL)
1083
0
    return;
1084
1085
0
  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1086
1087
0
  if (session_replication_state->acquired_by == MyProcPid)
1088
0
  {
1089
0
    cv = &session_replication_state->origin_cv;
1090
1091
0
    session_replication_state->acquired_by = 0;
1092
0
    session_replication_state = NULL;
1093
0
  }
1094
1095
0
  LWLockRelease(ReplicationOriginLock);
1096
1097
0
  if (cv)
1098
0
    ConditionVariableBroadcast(cv);
1099
0
}
1100
1101
/*
1102
 * Setup a replication origin in the shared memory struct if it doesn't
1103
 * already exist and cache access to the specific ReplicationSlot so the
1104
 * array doesn't have to be searched when calling
1105
 * replorigin_session_advance().
1106
 *
1107
 * Normally only one such cached origin can exist per process so the cached
1108
 * value can only be set again after the previous value is torn down with
1109
 * replorigin_session_reset(). For this normal case pass acquired_by = 0
1110
 * (meaning the slot is not allowed to be already acquired by another process).
1111
 *
1112
 * However, sometimes multiple processes can safely re-use the same origin slot
1113
 * (for example, multiple parallel apply processes can safely use the same
1114
 * origin, provided they maintain commit order by allowing only one process to
1115
 * commit at a time). For this case the first process must pass acquired_by =
1116
 * 0, and then the other processes sharing that same origin can pass
1117
 * acquired_by = PID of the first process.
1118
 */
1119
void
1120
replorigin_session_setup(RepOriginId node, int acquired_by)
1121
0
{
1122
0
  static bool registered_cleanup;
1123
0
  int     i;
1124
0
  int     free_slot = -1;
1125
1126
0
  if (!registered_cleanup)
1127
0
  {
1128
0
    on_shmem_exit(ReplicationOriginExitCleanup, 0);
1129
0
    registered_cleanup = true;
1130
0
  }
1131
1132
0
  Assert(max_active_replication_origins > 0);
1133
1134
0
  if (session_replication_state != NULL)
1135
0
    ereport(ERROR,
1136
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1137
0
         errmsg("cannot setup replication origin when one is already setup")));
1138
1139
  /* Lock exclusively, as we may have to create a new table entry. */
1140
0
  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1141
1142
  /*
1143
   * Search for either an existing slot for the origin, or a free one we can
1144
   * use.
1145
   */
1146
0
  for (i = 0; i < max_active_replication_origins; i++)
1147
0
  {
1148
0
    ReplicationState *curstate = &replication_states[i];
1149
1150
    /* remember where to insert if necessary */
1151
0
    if (curstate->roident == InvalidRepOriginId &&
1152
0
      free_slot == -1)
1153
0
    {
1154
0
      free_slot = i;
1155
0
      continue;
1156
0
    }
1157
1158
    /* not our slot */
1159
0
    if (curstate->roident != node)
1160
0
      continue;
1161
1162
0
    else if (curstate->acquired_by != 0 && acquired_by == 0)
1163
0
    {
1164
0
      ereport(ERROR,
1165
0
          (errcode(ERRCODE_OBJECT_IN_USE),
1166
0
           errmsg("replication origin with ID %d is already active for PID %d",
1167
0
              curstate->roident, curstate->acquired_by)));
1168
0
    }
1169
1170
0
    else if (curstate->acquired_by != acquired_by)
1171
0
    {
1172
0
      ereport(ERROR,
1173
0
          (errcode(ERRCODE_OBJECT_IN_USE),
1174
0
           errmsg("could not find replication state slot for replication origin with OID %u which was acquired by %d",
1175
0
              node, acquired_by)));
1176
0
    }
1177
1178
    /* ok, found slot */
1179
0
    session_replication_state = curstate;
1180
0
    break;
1181
0
  }
1182
1183
1184
0
  if (session_replication_state == NULL && free_slot == -1)
1185
0
    ereport(ERROR,
1186
0
        (errcode(ERRCODE_CONFIGURATION_LIMIT_EXCEEDED),
1187
0
         errmsg("could not find free replication state slot for replication origin with ID %d",
1188
0
            node),
1189
0
         errhint("Increase \"max_active_replication_origins\" and try again.")));
1190
0
  else if (session_replication_state == NULL)
1191
0
  {
1192
0
    if (acquired_by)
1193
0
      ereport(ERROR,
1194
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1195
0
           errmsg("cannot use PID %d for inactive replication origin with ID %d",
1196
0
              acquired_by, node)));
1197
1198
    /* initialize new slot */
1199
0
    session_replication_state = &replication_states[free_slot];
1200
0
    Assert(session_replication_state->remote_lsn == InvalidXLogRecPtr);
1201
0
    Assert(session_replication_state->local_lsn == InvalidXLogRecPtr);
1202
0
    session_replication_state->roident = node;
1203
0
  }
1204
1205
1206
0
  Assert(session_replication_state->roident != InvalidRepOriginId);
1207
1208
0
  if (acquired_by == 0)
1209
0
    session_replication_state->acquired_by = MyProcPid;
1210
0
  else
1211
0
    Assert(session_replication_state->acquired_by == acquired_by);
1212
1213
0
  LWLockRelease(ReplicationOriginLock);
1214
1215
  /* probably this one is pointless */
1216
0
  ConditionVariableBroadcast(&session_replication_state->origin_cv);
1217
0
}
1218
1219
/*
1220
 * Reset replay state previously setup in this session.
1221
 *
1222
 * This function may only be called if an origin was setup with
1223
 * replorigin_session_setup().
1224
 */
1225
void
1226
replorigin_session_reset(void)
1227
0
{
1228
0
  ConditionVariable *cv;
1229
1230
0
  Assert(max_active_replication_origins != 0);
1231
1232
0
  if (session_replication_state == NULL)
1233
0
    ereport(ERROR,
1234
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1235
0
         errmsg("no replication origin is configured")));
1236
1237
0
  LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
1238
1239
0
  session_replication_state->acquired_by = 0;
1240
0
  cv = &session_replication_state->origin_cv;
1241
0
  session_replication_state = NULL;
1242
1243
0
  LWLockRelease(ReplicationOriginLock);
1244
1245
0
  ConditionVariableBroadcast(cv);
1246
0
}
1247
1248
/*
1249
 * Do the same work replorigin_advance() does, just on the session's
1250
 * configured origin.
1251
 *
1252
 * This is noticeably cheaper than using replorigin_advance().
1253
 */
1254
void
1255
replorigin_session_advance(XLogRecPtr remote_commit, XLogRecPtr local_commit)
1256
0
{
1257
0
  Assert(session_replication_state != NULL);
1258
0
  Assert(session_replication_state->roident != InvalidRepOriginId);
1259
1260
0
  LWLockAcquire(&session_replication_state->lock, LW_EXCLUSIVE);
1261
0
  if (session_replication_state->local_lsn < local_commit)
1262
0
    session_replication_state->local_lsn = local_commit;
1263
0
  if (session_replication_state->remote_lsn < remote_commit)
1264
0
    session_replication_state->remote_lsn = remote_commit;
1265
0
  LWLockRelease(&session_replication_state->lock);
1266
0
}
1267
1268
/*
1269
 * Ask the machinery about the point up to which we successfully replayed
1270
 * changes from an already setup replication origin.
1271
 */
1272
XLogRecPtr
1273
replorigin_session_get_progress(bool flush)
1274
0
{
1275
0
  XLogRecPtr  remote_lsn;
1276
0
  XLogRecPtr  local_lsn;
1277
1278
0
  Assert(session_replication_state != NULL);
1279
1280
0
  LWLockAcquire(&session_replication_state->lock, LW_SHARED);
1281
0
  remote_lsn = session_replication_state->remote_lsn;
1282
0
  local_lsn = session_replication_state->local_lsn;
1283
0
  LWLockRelease(&session_replication_state->lock);
1284
1285
0
  if (flush && local_lsn != InvalidXLogRecPtr)
1286
0
    XLogFlush(local_lsn);
1287
1288
0
  return remote_lsn;
1289
0
}
1290
1291
1292
1293
/* ---------------------------------------------------------------------------
1294
 * SQL functions for working with replication origin.
1295
 *
1296
 * These mostly should be fairly short wrappers around more generic functions.
1297
 * ---------------------------------------------------------------------------
1298
 */
1299
1300
/*
1301
 * Create replication origin for the passed in name, and return the assigned
1302
 * oid.
1303
 */
1304
Datum
1305
pg_replication_origin_create(PG_FUNCTION_ARGS)
1306
0
{
1307
0
  char     *name;
1308
0
  RepOriginId roident;
1309
1310
0
  replorigin_check_prerequisites(false, false);
1311
1312
0
  name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1313
1314
  /*
1315
   * Replication origins "any and "none" are reserved for system options.
1316
   * The origins "pg_xxx" are reserved for internal use.
1317
   */
1318
0
  if (IsReservedName(name) || IsReservedOriginName(name))
1319
0
    ereport(ERROR,
1320
0
        (errcode(ERRCODE_RESERVED_NAME),
1321
0
         errmsg("replication origin name \"%s\" is reserved",
1322
0
            name),
1323
0
         errdetail("Origin names \"%s\", \"%s\", and names starting with \"pg_\" are reserved.",
1324
0
               LOGICALREP_ORIGIN_ANY, LOGICALREP_ORIGIN_NONE)));
1325
1326
  /*
1327
   * If built with appropriate switch, whine when regression-testing
1328
   * conventions for replication origin names are violated.
1329
   */
1330
#ifdef ENFORCE_REGRESSION_TEST_NAME_RESTRICTIONS
1331
  if (strncmp(name, "regress_", 8) != 0)
1332
    elog(WARNING, "replication origins created by regression test cases should have names starting with \"regress_\"");
1333
#endif
1334
1335
0
  roident = replorigin_create(name);
1336
1337
0
  pfree(name);
1338
1339
0
  PG_RETURN_OID(roident);
1340
0
}
1341
1342
/*
1343
 * Drop replication origin.
1344
 */
1345
Datum
1346
pg_replication_origin_drop(PG_FUNCTION_ARGS)
1347
0
{
1348
0
  char     *name;
1349
1350
0
  replorigin_check_prerequisites(false, false);
1351
1352
0
  name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1353
1354
0
  replorigin_drop_by_name(name, false, true);
1355
1356
0
  pfree(name);
1357
1358
0
  PG_RETURN_VOID();
1359
0
}
1360
1361
/*
1362
 * Return oid of a replication origin.
1363
 */
1364
Datum
1365
pg_replication_origin_oid(PG_FUNCTION_ARGS)
1366
0
{
1367
0
  char     *name;
1368
0
  RepOriginId roident;
1369
1370
0
  replorigin_check_prerequisites(false, false);
1371
1372
0
  name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1373
0
  roident = replorigin_by_name(name, true);
1374
1375
0
  pfree(name);
1376
1377
0
  if (OidIsValid(roident))
1378
0
    PG_RETURN_OID(roident);
1379
0
  PG_RETURN_NULL();
1380
0
}
1381
1382
/*
1383
 * Setup a replication origin for this session.
1384
 */
1385
Datum
1386
pg_replication_origin_session_setup(PG_FUNCTION_ARGS)
1387
0
{
1388
0
  char     *name;
1389
0
  RepOriginId origin;
1390
0
  int     pid;
1391
1392
0
  replorigin_check_prerequisites(true, false);
1393
1394
0
  name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1395
0
  origin = replorigin_by_name(name, false);
1396
0
  pid = PG_GETARG_INT32(1);
1397
0
  replorigin_session_setup(origin, pid);
1398
1399
0
  replorigin_session_origin = origin;
1400
1401
0
  pfree(name);
1402
1403
0
  PG_RETURN_VOID();
1404
0
}
1405
1406
/*
1407
 * Reset previously setup origin in this session
1408
 */
1409
Datum
1410
pg_replication_origin_session_reset(PG_FUNCTION_ARGS)
1411
0
{
1412
0
  replorigin_check_prerequisites(true, false);
1413
1414
0
  replorigin_session_reset();
1415
1416
0
  replorigin_session_origin = InvalidRepOriginId;
1417
0
  replorigin_session_origin_lsn = InvalidXLogRecPtr;
1418
0
  replorigin_session_origin_timestamp = 0;
1419
1420
0
  PG_RETURN_VOID();
1421
0
}
1422
1423
/*
1424
 * Has a replication origin been setup for this session.
1425
 */
1426
Datum
1427
pg_replication_origin_session_is_setup(PG_FUNCTION_ARGS)
1428
0
{
1429
0
  replorigin_check_prerequisites(false, false);
1430
1431
0
  PG_RETURN_BOOL(replorigin_session_origin != InvalidRepOriginId);
1432
0
}
1433
1434
1435
/*
1436
 * Return the replication progress for origin setup in the current session.
1437
 *
1438
 * If 'flush' is set to true it is ensured that the returned value corresponds
1439
 * to a local transaction that has been flushed. This is useful if asynchronous
1440
 * commits are used when replaying replicated transactions.
1441
 */
1442
Datum
1443
pg_replication_origin_session_progress(PG_FUNCTION_ARGS)
1444
0
{
1445
0
  XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
1446
0
  bool    flush = PG_GETARG_BOOL(0);
1447
1448
0
  replorigin_check_prerequisites(true, false);
1449
1450
0
  if (session_replication_state == NULL)
1451
0
    ereport(ERROR,
1452
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1453
0
         errmsg("no replication origin is configured")));
1454
1455
0
  remote_lsn = replorigin_session_get_progress(flush);
1456
1457
0
  if (remote_lsn == InvalidXLogRecPtr)
1458
0
    PG_RETURN_NULL();
1459
1460
0
  PG_RETURN_LSN(remote_lsn);
1461
0
}
1462
1463
Datum
1464
pg_replication_origin_xact_setup(PG_FUNCTION_ARGS)
1465
0
{
1466
0
  XLogRecPtr  location = PG_GETARG_LSN(0);
1467
1468
0
  replorigin_check_prerequisites(true, false);
1469
1470
0
  if (session_replication_state == NULL)
1471
0
    ereport(ERROR,
1472
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1473
0
         errmsg("no replication origin is configured")));
1474
1475
0
  replorigin_session_origin_lsn = location;
1476
0
  replorigin_session_origin_timestamp = PG_GETARG_TIMESTAMPTZ(1);
1477
1478
0
  PG_RETURN_VOID();
1479
0
}
1480
1481
Datum
1482
pg_replication_origin_xact_reset(PG_FUNCTION_ARGS)
1483
0
{
1484
0
  replorigin_check_prerequisites(true, false);
1485
1486
0
  replorigin_session_origin_lsn = InvalidXLogRecPtr;
1487
0
  replorigin_session_origin_timestamp = 0;
1488
1489
0
  PG_RETURN_VOID();
1490
0
}
1491
1492
1493
Datum
1494
pg_replication_origin_advance(PG_FUNCTION_ARGS)
1495
0
{
1496
0
  text     *name = PG_GETARG_TEXT_PP(0);
1497
0
  XLogRecPtr  remote_commit = PG_GETARG_LSN(1);
1498
0
  RepOriginId node;
1499
1500
0
  replorigin_check_prerequisites(true, false);
1501
1502
  /* lock to prevent the replication origin from vanishing */
1503
0
  LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1504
1505
0
  node = replorigin_by_name(text_to_cstring(name), false);
1506
1507
  /*
1508
   * Can't sensibly pass a local commit to be flushed at checkpoint - this
1509
   * xact hasn't committed yet. This is why this function should be used to
1510
   * set up the initial replication state, but not for replay.
1511
   */
1512
0
  replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
1513
0
             true /* go backward */ , true /* WAL log */ );
1514
1515
0
  UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
1516
1517
0
  PG_RETURN_VOID();
1518
0
}
1519
1520
1521
/*
1522
 * Return the replication progress for an individual replication origin.
1523
 *
1524
 * If 'flush' is set to true it is ensured that the returned value corresponds
1525
 * to a local transaction that has been flushed. This is useful if asynchronous
1526
 * commits are used when replaying replicated transactions.
1527
 */
1528
Datum
1529
pg_replication_origin_progress(PG_FUNCTION_ARGS)
1530
0
{
1531
0
  char     *name;
1532
0
  bool    flush;
1533
0
  RepOriginId roident;
1534
0
  XLogRecPtr  remote_lsn = InvalidXLogRecPtr;
1535
1536
0
  replorigin_check_prerequisites(true, true);
1537
1538
0
  name = text_to_cstring((text *) DatumGetPointer(PG_GETARG_DATUM(0)));
1539
0
  flush = PG_GETARG_BOOL(1);
1540
1541
0
  roident = replorigin_by_name(name, false);
1542
0
  Assert(OidIsValid(roident));
1543
1544
0
  remote_lsn = replorigin_get_progress(roident, flush);
1545
1546
0
  if (remote_lsn == InvalidXLogRecPtr)
1547
0
    PG_RETURN_NULL();
1548
1549
0
  PG_RETURN_LSN(remote_lsn);
1550
0
}
1551
1552
1553
Datum
1554
pg_show_replication_origin_status(PG_FUNCTION_ARGS)
1555
0
{
1556
0
  ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1557
0
  int     i;
1558
0
#define REPLICATION_ORIGIN_PROGRESS_COLS 4
1559
1560
  /* we want to return 0 rows if slot is set to zero */
1561
0
  replorigin_check_prerequisites(false, true);
1562
1563
0
  InitMaterializedSRF(fcinfo, 0);
1564
1565
  /* prevent slots from being concurrently dropped */
1566
0
  LWLockAcquire(ReplicationOriginLock, LW_SHARED);
1567
1568
  /*
1569
   * Iterate through all possible replication_states, display if they are
1570
   * filled. Note that we do not take any locks, so slightly corrupted/out
1571
   * of date values are a possibility.
1572
   */
1573
0
  for (i = 0; i < max_active_replication_origins; i++)
1574
0
  {
1575
0
    ReplicationState *state;
1576
0
    Datum   values[REPLICATION_ORIGIN_PROGRESS_COLS];
1577
0
    bool    nulls[REPLICATION_ORIGIN_PROGRESS_COLS];
1578
0
    char     *roname;
1579
1580
0
    state = &replication_states[i];
1581
1582
    /* unused slot, nothing to display */
1583
0
    if (state->roident == InvalidRepOriginId)
1584
0
      continue;
1585
1586
0
    memset(values, 0, sizeof(values));
1587
0
    memset(nulls, 1, sizeof(nulls));
1588
1589
0
    values[0] = ObjectIdGetDatum(state->roident);
1590
0
    nulls[0] = false;
1591
1592
    /*
1593
     * We're not preventing the origin to be dropped concurrently, so
1594
     * silently accept that it might be gone.
1595
     */
1596
0
    if (replorigin_by_oid(state->roident, true,
1597
0
                &roname))
1598
0
    {
1599
0
      values[1] = CStringGetTextDatum(roname);
1600
0
      nulls[1] = false;
1601
0
    }
1602
1603
0
    LWLockAcquire(&state->lock, LW_SHARED);
1604
1605
0
    values[2] = LSNGetDatum(state->remote_lsn);
1606
0
    nulls[2] = false;
1607
1608
0
    values[3] = LSNGetDatum(state->local_lsn);
1609
0
    nulls[3] = false;
1610
1611
0
    LWLockRelease(&state->lock);
1612
1613
0
    tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc,
1614
0
               values, nulls);
1615
0
  }
1616
1617
0
  LWLockRelease(ReplicationOriginLock);
1618
1619
0
#undef REPLICATION_ORIGIN_PROGRESS_COLS
1620
1621
0
  return (Datum) 0;
1622
0
}