Coverage Report

Created: 2025-08-12 06:43

/src/postgres/src/backend/replication/logical/snapbuild.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * snapbuild.c
4
 *
5
 *    Infrastructure for building historic catalog snapshots based on contents
6
 *    of the WAL, for the purpose of decoding heapam.c style values in the
7
 *    WAL.
8
 *
9
 * NOTES:
10
 *
11
 * We build snapshots which can *only* be used to read catalog contents and we
12
 * do so by reading and interpreting the WAL stream. The aim is to build a
13
 * snapshot that behaves the same as a freshly taken MVCC snapshot would have
14
 * at the time the XLogRecord was generated.
15
 *
16
 * To build the snapshots we reuse the infrastructure built for Hot
17
 * Standby. The in-memory snapshots we build look different than HS' because
18
 * we have different needs. To successfully decode data from the WAL we only
19
 * need to access catalog tables and (sys|rel|cat)cache, not the actual user
20
 * tables since the data we decode is wholly contained in the WAL
21
 * records. Also, our snapshots need to be different in comparison to normal
22
 * MVCC ones because in contrast to those we cannot fully rely on the clog and
23
 * pg_subtrans for information about committed transactions because they might
24
 * commit in the future from the POV of the WAL entry we're currently
25
 * decoding. This definition has the advantage that we only need to prevent
26
 * removal of catalog rows, while normal table's rows can still be
27
 * removed. This is achieved by using the replication slot mechanism.
28
 *
29
 * As the percentage of transactions modifying the catalog normally is fairly
30
 * small in comparisons to ones only manipulating user data, we keep track of
31
 * the committed catalog modifying ones inside [xmin, xmax) instead of keeping
32
 * track of all running transactions like it's done in a normal snapshot. Note
33
 * that we're generally only looking at transactions that have acquired an
34
 * xid. That is we keep a list of transactions between snapshot->(xmin, xmax)
35
 * that we consider committed, everything else is considered aborted/in
36
 * progress. That also allows us not to care about subtransactions before they
37
 * have committed which means this module, in contrast to HS, doesn't have to
38
 * care about suboverflowed subtransactions and similar.
39
 *
40
 * One complexity of doing this is that to e.g. handle mixed DDL/DML
41
 * transactions we need Snapshots that see intermediate versions of the
42
 * catalog in a transaction. During normal operation this is achieved by using
43
 * CommandIds/cmin/cmax. The problem with that however is that for space
44
 * efficiency reasons, the cmin and cmax are not included in WAL records. We
45
 * cannot read the cmin/cmax from the tuple itself, either, because it is
46
 * reset on crash recovery. Even if we could, we could not decode combocids
47
 * which are only tracked in the original backend's memory. To work around
48
 * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a
49
 * catalog row is modified, which includes the cmin and cmax of the
50
 * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the
51
 * reorder buffer, and use them at visibility checks instead of the cmin/cmax
52
 * on the tuple itself. Check the reorderbuffer.c's comment above
53
 * ResolveCminCmaxDuringDecoding() for details.
54
 *
55
 * To facilitate all this we need our own visibility routine, as the normal
56
 * ones are optimized for different usecases.
57
 *
58
 * To replace the normal catalog snapshots with decoding ones use the
59
 * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions.
60
 *
61
 *
62
 *
63
 * The snapbuild machinery is starting up in several stages, as illustrated
64
 * by the following graph describing the SnapBuild->state transitions:
65
 *
66
 *       +-------------------------+
67
 *    +----|     START       |-------------+
68
 *    |    +-------------------------+         |
69
 *    |         |              |
70
 *    |         |              |
71
 *    |      running_xacts #1            |
72
 *    |         |              |
73
 *    |         |              |
74
 *    |         v              |
75
 *    |    +-------------------------+         v
76
 *    |    |   BUILDING_SNAPSHOT   |------------>|
77
 *    |    +-------------------------+         |
78
 *    |         |              |
79
 *    |         |              |
80
 *    | running_xacts #2, xacts from #1 finished   |
81
 *    |         |              |
82
 *    |         |              |
83
 *    |         v              |
84
 *    |    +-------------------------+         v
85
 *    |    |     FULL_SNAPSHOT   |------------>|
86
 *    |    +-------------------------+         |
87
 *    |         |              |
88
 * running_xacts    |            saved snapshot
89
 * with zero xacts    |         at running_xacts's lsn
90
 *    |         |              |
91
 *    | running_xacts with xacts from #2 finished  |
92
 *    |         |              |
93
 *    |         v              |
94
 *    |    +-------------------------+         |
95
 *    +--->|SNAPBUILD_CONSISTENT   |<------------+
96
 *       +-------------------------+
97
 *
98
 * Initially the machinery is in the START stage. When an xl_running_xacts
99
 * record is read that is sufficiently new (above the safe xmin horizon),
100
 * there's a state transition. If there were no running xacts when the
101
 * xl_running_xacts record was generated, we'll directly go into CONSISTENT
102
 * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full
103
 * snapshot means that all transactions that start henceforth can be decoded
104
 * in their entirety, but transactions that started previously can't. In
105
 * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously
106
 * running transactions have committed or aborted.
107
 *
108
 * Only transactions that commit after CONSISTENT state has been reached will
109
 * be replayed, even though they might have started while still in
110
 * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous
111
 * changes has been exported, but all the following ones will be. That point
112
 * is a convenient point to initialize replication from, which is why we
113
 * export a snapshot at that point, which *can* be used to read normal data.
114
 *
115
 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
116
 *
117
 * IDENTIFICATION
118
 *    src/backend/replication/logical/snapbuild.c
119
 *
120
 *-------------------------------------------------------------------------
121
 */
122
123
#include "postgres.h"
124
125
#include <sys/stat.h>
126
#include <unistd.h>
127
128
#include "access/heapam_xlog.h"
129
#include "access/transam.h"
130
#include "access/xact.h"
131
#include "common/file_utils.h"
132
#include "miscadmin.h"
133
#include "pgstat.h"
134
#include "replication/logical.h"
135
#include "replication/reorderbuffer.h"
136
#include "replication/snapbuild.h"
137
#include "replication/snapbuild_internal.h"
138
#include "storage/fd.h"
139
#include "storage/lmgr.h"
140
#include "storage/proc.h"
141
#include "storage/procarray.h"
142
#include "storage/standby.h"
143
#include "utils/builtins.h"
144
#include "utils/memutils.h"
145
#include "utils/snapmgr.h"
146
#include "utils/snapshot.h"
147
/*
148
 * Starting a transaction -- which we need to do while exporting a snapshot --
149
 * removes knowledge about the previously used resowner, so we save it here.
150
 */
151
static ResourceOwner SavedResourceOwnerDuringExport = NULL;
152
static bool ExportInProgress = false;
153
154
/* ->committed and ->catchange manipulation */
155
static void SnapBuildPurgeOlderTxn(SnapBuild *builder);
156
157
/* snapshot building/manipulation/distribution functions */
158
static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder);
159
160
static void SnapBuildFreeSnapshot(Snapshot snap);
161
162
static void SnapBuildSnapIncRefcount(Snapshot snap);
163
164
static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid);
165
166
static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
167
                         uint32 xinfo);
168
169
/* xlog reading helper functions for SnapBuildProcessRunningXacts */
170
static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running);
171
static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff);
172
173
/* serialization functions */
174
static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn);
175
static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn);
176
static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path);
177
178
/*
179
 * Allocate a new snapshot builder.
180
 *
181
 * xmin_horizon is the xid >= which we can be sure no catalog rows have been
182
 * removed, start_lsn is the LSN >= we want to replay commits.
183
 */
184
SnapBuild *
185
AllocateSnapshotBuilder(ReorderBuffer *reorder,
186
            TransactionId xmin_horizon,
187
            XLogRecPtr start_lsn,
188
            bool need_full_snapshot,
189
            bool in_slot_creation,
190
            XLogRecPtr two_phase_at)
191
0
{
192
0
  MemoryContext context;
193
0
  MemoryContext oldcontext;
194
0
  SnapBuild  *builder;
195
196
  /* allocate memory in own context, to have better accountability */
197
0
  context = AllocSetContextCreate(CurrentMemoryContext,
198
0
                  "snapshot builder context",
199
0
                  ALLOCSET_DEFAULT_SIZES);
200
0
  oldcontext = MemoryContextSwitchTo(context);
201
202
0
  builder = palloc0(sizeof(SnapBuild));
203
204
0
  builder->state = SNAPBUILD_START;
205
0
  builder->context = context;
206
0
  builder->reorder = reorder;
207
  /* Other struct members initialized by zeroing via palloc0 above */
208
209
0
  builder->committed.xcnt = 0;
210
0
  builder->committed.xcnt_space = 128;  /* arbitrary number */
211
0
  builder->committed.xip =
212
0
    palloc0(builder->committed.xcnt_space * sizeof(TransactionId));
213
0
  builder->committed.includes_all_transactions = true;
214
215
0
  builder->catchange.xcnt = 0;
216
0
  builder->catchange.xip = NULL;
217
218
0
  builder->initial_xmin_horizon = xmin_horizon;
219
0
  builder->start_decoding_at = start_lsn;
220
0
  builder->in_slot_creation = in_slot_creation;
221
0
  builder->building_full_snapshot = need_full_snapshot;
222
0
  builder->two_phase_at = two_phase_at;
223
224
0
  MemoryContextSwitchTo(oldcontext);
225
226
0
  return builder;
227
0
}
228
229
/*
230
 * Free a snapshot builder.
231
 */
232
void
233
FreeSnapshotBuilder(SnapBuild *builder)
234
0
{
235
0
  MemoryContext context = builder->context;
236
237
  /* free snapshot explicitly, that contains some error checking */
238
0
  if (builder->snapshot != NULL)
239
0
  {
240
0
    SnapBuildSnapDecRefcount(builder->snapshot);
241
0
    builder->snapshot = NULL;
242
0
  }
243
244
  /* other resources are deallocated via memory context reset */
245
0
  MemoryContextDelete(context);
246
0
}
247
248
/*
249
 * Free an unreferenced snapshot that has previously been built by us.
250
 */
251
static void
252
SnapBuildFreeSnapshot(Snapshot snap)
253
0
{
254
  /* make sure we don't get passed an external snapshot */
255
0
  Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
256
257
  /* make sure nobody modified our snapshot */
258
0
  Assert(snap->curcid == FirstCommandId);
259
0
  Assert(!snap->suboverflowed);
260
0
  Assert(!snap->takenDuringRecovery);
261
0
  Assert(snap->regd_count == 0);
262
263
  /* slightly more likely, so it's checked even without c-asserts */
264
0
  if (snap->copied)
265
0
    elog(ERROR, "cannot free a copied snapshot");
266
267
0
  if (snap->active_count)
268
0
    elog(ERROR, "cannot free an active snapshot");
269
270
0
  pfree(snap);
271
0
}
272
273
/*
274
 * In which state of snapshot building are we?
275
 */
276
SnapBuildState
277
SnapBuildCurrentState(SnapBuild *builder)
278
0
{
279
0
  return builder->state;
280
0
}
281
282
/*
283
 * Return the LSN at which the two-phase decoding was first enabled.
284
 */
285
XLogRecPtr
286
SnapBuildGetTwoPhaseAt(SnapBuild *builder)
287
0
{
288
0
  return builder->two_phase_at;
289
0
}
290
291
/*
292
 * Set the LSN at which two-phase decoding is enabled.
293
 */
294
void
295
SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr)
296
0
{
297
0
  builder->two_phase_at = ptr;
298
0
}
299
300
/*
301
 * Should the contents of transaction ending at 'ptr' be decoded?
302
 */
303
bool
304
SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr)
305
0
{
306
0
  return ptr < builder->start_decoding_at;
307
0
}
308
309
/*
310
 * Increase refcount of a snapshot.
311
 *
312
 * This is used when handing out a snapshot to some external resource or when
313
 * adding a Snapshot as builder->snapshot.
314
 */
315
static void
316
SnapBuildSnapIncRefcount(Snapshot snap)
317
0
{
318
0
  snap->active_count++;
319
0
}
320
321
/*
322
 * Decrease refcount of a snapshot and free if the refcount reaches zero.
323
 *
324
 * Externally visible, so that external resources that have been handed an
325
 * IncRef'ed Snapshot can adjust its refcount easily.
326
 */
327
void
328
SnapBuildSnapDecRefcount(Snapshot snap)
329
0
{
330
  /* make sure we don't get passed an external snapshot */
331
0
  Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC);
332
333
  /* make sure nobody modified our snapshot */
334
0
  Assert(snap->curcid == FirstCommandId);
335
0
  Assert(!snap->suboverflowed);
336
0
  Assert(!snap->takenDuringRecovery);
337
338
0
  Assert(snap->regd_count == 0);
339
340
0
  Assert(snap->active_count > 0);
341
342
  /* slightly more likely, so it's checked even without casserts */
343
0
  if (snap->copied)
344
0
    elog(ERROR, "cannot free a copied snapshot");
345
346
0
  snap->active_count--;
347
0
  if (snap->active_count == 0)
348
0
    SnapBuildFreeSnapshot(snap);
349
0
}
350
351
/*
352
 * Build a new snapshot, based on currently committed catalog-modifying
353
 * transactions.
354
 *
355
 * In-progress transactions with catalog access are *not* allowed to modify
356
 * these snapshots; they have to copy them and fill in appropriate ->curcid
357
 * and ->subxip/subxcnt values.
358
 */
359
static Snapshot
360
SnapBuildBuildSnapshot(SnapBuild *builder)
361
0
{
362
0
  Snapshot  snapshot;
363
0
  Size    ssize;
364
365
0
  Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT);
366
367
0
  ssize = sizeof(SnapshotData)
368
0
    + sizeof(TransactionId) * builder->committed.xcnt
369
0
    + sizeof(TransactionId) * 1 /* toplevel xid */ ;
370
371
0
  snapshot = MemoryContextAllocZero(builder->context, ssize);
372
373
0
  snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC;
374
375
  /*
376
   * We misuse the original meaning of SnapshotData's xip and subxip fields
377
   * to make the more fitting for our needs.
378
   *
379
   * In the 'xip' array we store transactions that have to be treated as
380
   * committed. Since we will only ever look at tuples from transactions
381
   * that have modified the catalog it's more efficient to store those few
382
   * that exist between xmin and xmax (frequently there are none).
383
   *
384
   * Snapshots that are used in transactions that have modified the catalog
385
   * also use the 'subxip' array to store their toplevel xid and all the
386
   * subtransaction xids so we can recognize when we need to treat rows as
387
   * visible that are not in xip but still need to be visible. Subxip only
388
   * gets filled when the transaction is copied into the context of a
389
   * catalog modifying transaction since we otherwise share a snapshot
390
   * between transactions. As long as a txn hasn't modified the catalog it
391
   * doesn't need to treat any uncommitted rows as visible, so there is no
392
   * need for those xids.
393
   *
394
   * Both arrays are qsort'ed so that we can use bsearch() on them.
395
   */
396
0
  Assert(TransactionIdIsNormal(builder->xmin));
397
0
  Assert(TransactionIdIsNormal(builder->xmax));
398
399
0
  snapshot->xmin = builder->xmin;
400
0
  snapshot->xmax = builder->xmax;
401
402
  /* store all transactions to be treated as committed by this snapshot */
403
0
  snapshot->xip =
404
0
    (TransactionId *) ((char *) snapshot + sizeof(SnapshotData));
405
0
  snapshot->xcnt = builder->committed.xcnt;
406
0
  memcpy(snapshot->xip,
407
0
       builder->committed.xip,
408
0
       builder->committed.xcnt * sizeof(TransactionId));
409
410
  /* sort so we can bsearch() */
411
0
  qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator);
412
413
  /*
414
   * Initially, subxip is empty, i.e. it's a snapshot to be used by
415
   * transactions that don't modify the catalog. Will be filled by
416
   * ReorderBufferCopySnap() if necessary.
417
   */
418
0
  snapshot->subxcnt = 0;
419
0
  snapshot->subxip = NULL;
420
421
0
  snapshot->suboverflowed = false;
422
0
  snapshot->takenDuringRecovery = false;
423
0
  snapshot->copied = false;
424
0
  snapshot->curcid = FirstCommandId;
425
0
  snapshot->active_count = 0;
426
0
  snapshot->regd_count = 0;
427
0
  snapshot->snapXactCompletionCount = 0;
428
429
0
  return snapshot;
430
0
}
431
432
/*
433
 * Build the initial slot snapshot and convert it to a normal snapshot that
434
 * is understood by HeapTupleSatisfiesMVCC.
435
 *
436
 * The snapshot will be usable directly in current transaction or exported
437
 * for loading in different transaction.
438
 */
439
Snapshot
440
SnapBuildInitialSnapshot(SnapBuild *builder)
441
0
{
442
0
  Snapshot  snap;
443
0
  TransactionId xid;
444
0
  TransactionId safeXid;
445
0
  TransactionId *newxip;
446
0
  int     newxcnt = 0;
447
448
0
  Assert(XactIsoLevel == XACT_REPEATABLE_READ);
449
0
  Assert(builder->building_full_snapshot);
450
451
  /* don't allow older snapshots */
452
0
  InvalidateCatalogSnapshot();  /* about to overwrite MyProc->xmin */
453
0
  if (HaveRegisteredOrActiveSnapshot())
454
0
    elog(ERROR, "cannot build an initial slot snapshot when snapshots exist");
455
0
  Assert(!HistoricSnapshotActive());
456
457
0
  if (builder->state != SNAPBUILD_CONSISTENT)
458
0
    elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state");
459
460
0
  if (!builder->committed.includes_all_transactions)
461
0
    elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore");
462
463
  /* so we don't overwrite the existing value */
464
0
  if (TransactionIdIsValid(MyProc->xmin))
465
0
    elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid");
466
467
0
  snap = SnapBuildBuildSnapshot(builder);
468
469
  /*
470
   * We know that snap->xmin is alive, enforced by the logical xmin
471
   * mechanism. Due to that we can do this without locks, we're only
472
   * changing our own value.
473
   *
474
   * Building an initial snapshot is expensive and an unenforced xmin
475
   * horizon would have bad consequences, therefore always double-check that
476
   * the horizon is enforced.
477
   */
478
0
  LWLockAcquire(ProcArrayLock, LW_SHARED);
479
0
  safeXid = GetOldestSafeDecodingTransactionId(false);
480
0
  LWLockRelease(ProcArrayLock);
481
482
0
  if (TransactionIdFollows(safeXid, snap->xmin))
483
0
    elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u",
484
0
       safeXid, snap->xmin);
485
486
0
  MyProc->xmin = snap->xmin;
487
488
  /* allocate in transaction context */
489
0
  newxip = (TransactionId *)
490
0
    palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount());
491
492
  /*
493
   * snapbuild.c builds transactions in an "inverted" manner, which means it
494
   * stores committed transactions in ->xip, not ones in progress. Build a
495
   * classical snapshot by marking all non-committed transactions as
496
   * in-progress. This can be expensive.
497
   */
498
0
  for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);)
499
0
  {
500
0
    void     *test;
501
502
    /*
503
     * Check whether transaction committed using the decoding snapshot
504
     * meaning of ->xip.
505
     */
506
0
    test = bsearch(&xid, snap->xip, snap->xcnt,
507
0
             sizeof(TransactionId), xidComparator);
508
509
0
    if (test == NULL)
510
0
    {
511
0
      if (newxcnt >= GetMaxSnapshotXidCount())
512
0
        ereport(ERROR,
513
0
            (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE),
514
0
             errmsg("initial slot snapshot too large")));
515
516
0
      newxip[newxcnt++] = xid;
517
0
    }
518
519
0
    TransactionIdAdvance(xid);
520
0
  }
521
522
  /* adjust remaining snapshot fields as needed */
523
0
  snap->snapshot_type = SNAPSHOT_MVCC;
524
0
  snap->xcnt = newxcnt;
525
0
  snap->xip = newxip;
526
527
0
  return snap;
528
0
}
529
530
/*
531
 * Export a snapshot so it can be set in another session with SET TRANSACTION
532
 * SNAPSHOT.
533
 *
534
 * For that we need to start a transaction in the current backend as the
535
 * importing side checks whether the source transaction is still open to make
536
 * sure the xmin horizon hasn't advanced since then.
537
 */
538
const char *
539
SnapBuildExportSnapshot(SnapBuild *builder)
540
0
{
541
0
  Snapshot  snap;
542
0
  char     *snapname;
543
544
0
  if (IsTransactionOrTransactionBlock())
545
0
    elog(ERROR, "cannot export a snapshot from within a transaction");
546
547
0
  if (SavedResourceOwnerDuringExport)
548
0
    elog(ERROR, "can only export one snapshot at a time");
549
550
0
  SavedResourceOwnerDuringExport = CurrentResourceOwner;
551
0
  ExportInProgress = true;
552
553
0
  StartTransactionCommand();
554
555
  /* There doesn't seem to a nice API to set these */
556
0
  XactIsoLevel = XACT_REPEATABLE_READ;
557
0
  XactReadOnly = true;
558
559
0
  snap = SnapBuildInitialSnapshot(builder);
560
561
  /*
562
   * now that we've built a plain snapshot, make it active and use the
563
   * normal mechanisms for exporting it
564
   */
565
0
  snapname = ExportSnapshot(snap);
566
567
0
  ereport(LOG,
568
0
      (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID",
569
0
               "exported logical decoding snapshot: \"%s\" with %u transaction IDs",
570
0
               snap->xcnt,
571
0
               snapname, snap->xcnt)));
572
0
  return snapname;
573
0
}
574
575
/*
576
 * Ensure there is a snapshot and if not build one for current transaction.
577
 */
578
Snapshot
579
SnapBuildGetOrBuildSnapshot(SnapBuild *builder)
580
0
{
581
0
  Assert(builder->state == SNAPBUILD_CONSISTENT);
582
583
  /* only build a new snapshot if we don't have a prebuilt one */
584
0
  if (builder->snapshot == NULL)
585
0
  {
586
0
    builder->snapshot = SnapBuildBuildSnapshot(builder);
587
    /* increase refcount for the snapshot builder */
588
0
    SnapBuildSnapIncRefcount(builder->snapshot);
589
0
  }
590
591
0
  return builder->snapshot;
592
0
}
593
594
/*
595
 * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is
596
 * any. Aborts the previously started transaction and resets the resource
597
 * owner back to its original value.
598
 */
599
void
600
SnapBuildClearExportedSnapshot(void)
601
0
{
602
0
  ResourceOwner tmpResOwner;
603
604
  /* nothing exported, that is the usual case */
605
0
  if (!ExportInProgress)
606
0
    return;
607
608
0
  if (!IsTransactionState())
609
0
    elog(ERROR, "clearing exported snapshot in wrong transaction state");
610
611
  /*
612
   * AbortCurrentTransaction() takes care of resetting the snapshot state,
613
   * so remember SavedResourceOwnerDuringExport.
614
   */
615
0
  tmpResOwner = SavedResourceOwnerDuringExport;
616
617
  /* make sure nothing could have ever happened */
618
0
  AbortCurrentTransaction();
619
620
0
  CurrentResourceOwner = tmpResOwner;
621
0
}
622
623
/*
624
 * Clear snapshot export state during transaction abort.
625
 */
626
void
627
SnapBuildResetExportedSnapshotState(void)
628
0
{
629
0
  SavedResourceOwnerDuringExport = NULL;
630
0
  ExportInProgress = false;
631
0
}
632
633
/*
634
 * Handle the effects of a single heap change, appropriate to the current state
635
 * of the snapshot builder and returns whether changes made at (xid, lsn) can
636
 * be decoded.
637
 */
638
bool
639
SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn)
640
0
{
641
  /*
642
   * We can't handle data in transactions if we haven't built a snapshot
643
   * yet, so don't store them.
644
   */
645
0
  if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
646
0
    return false;
647
648
  /*
649
   * No point in keeping track of changes in transactions that we don't have
650
   * enough information about to decode. This means that they started before
651
   * we got into the SNAPBUILD_FULL_SNAPSHOT state.
652
   */
653
0
  if (builder->state < SNAPBUILD_CONSISTENT &&
654
0
    TransactionIdPrecedes(xid, builder->next_phase_at))
655
0
    return false;
656
657
  /*
658
   * If the reorderbuffer doesn't yet have a snapshot, add one now, it will
659
   * be needed to decode the change we're currently processing.
660
   */
661
0
  if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
662
0
  {
663
    /* only build a new snapshot if we don't have a prebuilt one */
664
0
    if (builder->snapshot == NULL)
665
0
    {
666
0
      builder->snapshot = SnapBuildBuildSnapshot(builder);
667
      /* increase refcount for the snapshot builder */
668
0
      SnapBuildSnapIncRefcount(builder->snapshot);
669
0
    }
670
671
    /*
672
     * Increase refcount for the transaction we're handing the snapshot
673
     * out to.
674
     */
675
0
    SnapBuildSnapIncRefcount(builder->snapshot);
676
0
    ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
677
0
                   builder->snapshot);
678
0
  }
679
680
0
  return true;
681
0
}
682
683
/*
684
 * Do CommandId/combo CID handling after reading an xl_heap_new_cid record.
685
 * This implies that a transaction has done some form of write to system
686
 * catalogs.
687
 */
688
void
689
SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid,
690
             XLogRecPtr lsn, xl_heap_new_cid *xlrec)
691
0
{
692
0
  CommandId cid;
693
694
  /*
695
   * we only log new_cid's if a catalog tuple was modified, so mark the
696
   * transaction as containing catalog modifications
697
   */
698
0
  ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn);
699
700
0
  ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn,
701
0
                 xlrec->target_locator, xlrec->target_tid,
702
0
                 xlrec->cmin, xlrec->cmax,
703
0
                 xlrec->combocid);
704
705
  /* figure out new command id */
706
0
  if (xlrec->cmin != InvalidCommandId &&
707
0
    xlrec->cmax != InvalidCommandId)
708
0
    cid = Max(xlrec->cmin, xlrec->cmax);
709
0
  else if (xlrec->cmax != InvalidCommandId)
710
0
    cid = xlrec->cmax;
711
0
  else if (xlrec->cmin != InvalidCommandId)
712
0
    cid = xlrec->cmin;
713
0
  else
714
0
  {
715
0
    cid = InvalidCommandId; /* silence compiler */
716
0
    elog(ERROR, "xl_heap_new_cid record without a valid CommandId");
717
0
  }
718
719
0
  ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1);
720
0
}
721
722
/*
723
 * Add a new Snapshot and invalidation messages to all transactions we're
724
 * decoding that currently are in-progress so they can see new catalog contents
725
 * made by the transaction that just committed. This is necessary because those
726
 * in-progress transactions will use the new catalog's contents from here on
727
 * (at the very least everything they do needs to be compatible with newer
728
 * catalog contents).
729
 */
730
static void
731
SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid)
732
0
{
733
0
  dlist_iter  txn_i;
734
0
  ReorderBufferTXN *txn;
735
736
  /*
737
   * Iterate through all toplevel transactions. This can include
738
   * subtransactions which we just don't yet know to be that, but that's
739
   * fine, they will just get an unnecessary snapshot and invalidations
740
   * queued.
741
   */
742
0
  dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn)
743
0
  {
744
0
    txn = dlist_container(ReorderBufferTXN, node, txn_i.cur);
745
746
0
    Assert(TransactionIdIsValid(txn->xid));
747
748
    /*
749
     * If we don't have a base snapshot yet, there are no changes in this
750
     * transaction which in turn implies we don't yet need a snapshot at
751
     * all. We'll add a snapshot when the first change gets queued.
752
     *
753
     * Similarly, we don't need to add invalidations to a transaction
754
     * whose base snapshot is not yet set. Once a base snapshot is built,
755
     * it will include the xids of committed transactions that have
756
     * modified the catalog, thus reflecting the new catalog contents. The
757
     * existing catalog cache will have already been invalidated after
758
     * processing the invalidations in the transaction that modified
759
     * catalogs, ensuring that a fresh cache is constructed during
760
     * decoding.
761
     *
762
     * NB: This works correctly even for subtransactions because
763
     * ReorderBufferAssignChild() takes care to transfer the base snapshot
764
     * to the top-level transaction, and while iterating the changequeue
765
     * we'll get the change from the subtxn.
766
     */
767
0
    if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid))
768
0
      continue;
769
770
    /*
771
     * We don't need to add snapshot or invalidations to prepared
772
     * transactions as they should not see the new catalog contents.
773
     */
774
0
    if (rbtxn_is_prepared(txn))
775
0
      continue;
776
777
0
    elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X",
778
0
       txn->xid, LSN_FORMAT_ARGS(lsn));
779
780
    /*
781
     * increase the snapshot's refcount for the transaction we are handing
782
     * it out to
783
     */
784
0
    SnapBuildSnapIncRefcount(builder->snapshot);
785
0
    ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn,
786
0
                 builder->snapshot);
787
788
    /*
789
     * Add invalidation messages to the reorder buffer of in-progress
790
     * transactions except the current committed transaction, for which we
791
     * will execute invalidations at the end.
792
     *
793
     * It is required, otherwise, we will end up using the stale catcache
794
     * contents built by the current transaction even after its decoding,
795
     * which should have been invalidated due to concurrent catalog
796
     * changing transaction.
797
     *
798
     * Distribute only the invalidation messages generated by the current
799
     * committed transaction. Invalidation messages received from other
800
     * transactions would have already been propagated to the relevant
801
     * in-progress transactions. This transaction would have processed
802
     * those invalidations, ensuring that subsequent transactions observe
803
     * a consistent cache state.
804
     */
805
0
    if (txn->xid != xid)
806
0
    {
807
0
      uint32    ninvalidations;
808
0
      SharedInvalidationMessage *msgs = NULL;
809
810
0
      ninvalidations = ReorderBufferGetInvalidations(builder->reorder,
811
0
                               xid, &msgs);
812
813
0
      if (ninvalidations > 0)
814
0
      {
815
0
        Assert(msgs != NULL);
816
817
0
        ReorderBufferAddDistributedInvalidations(builder->reorder,
818
0
                             txn->xid, lsn,
819
0
                             ninvalidations, msgs);
820
0
      }
821
0
    }
822
0
  }
823
0
}
824
825
/*
826
 * Keep track of a new catalog changing transaction that has committed.
827
 */
828
static void
829
SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid)
830
0
{
831
0
  Assert(TransactionIdIsValid(xid));
832
833
0
  if (builder->committed.xcnt == builder->committed.xcnt_space)
834
0
  {
835
0
    builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1;
836
837
0
    elog(DEBUG1, "increasing space for committed transactions to %u",
838
0
       (uint32) builder->committed.xcnt_space);
839
840
0
    builder->committed.xip = repalloc(builder->committed.xip,
841
0
                      builder->committed.xcnt_space * sizeof(TransactionId));
842
0
  }
843
844
  /*
845
   * TODO: It might make sense to keep the array sorted here instead of
846
   * doing it every time we build a new snapshot. On the other hand this
847
   * gets called repeatedly when a transaction with subtransactions commits.
848
   */
849
0
  builder->committed.xip[builder->committed.xcnt++] = xid;
850
0
}
851
852
/*
853
 * Remove knowledge about transactions we treat as committed or containing catalog
854
 * changes that are smaller than ->xmin. Those won't ever get checked via
855
 * the ->committed or ->catchange array, respectively. The committed xids will
856
 * get checked via the clog machinery.
857
 *
858
 * We can ideally remove the transaction from catchange array once it is
859
 * finished (committed/aborted) but that could be costly as we need to maintain
860
 * the xids order in the array.
861
 */
862
static void
863
SnapBuildPurgeOlderTxn(SnapBuild *builder)
864
0
{
865
0
  int     off;
866
0
  TransactionId *workspace;
867
0
  int     surviving_xids = 0;
868
869
  /* not ready yet */
870
0
  if (!TransactionIdIsNormal(builder->xmin))
871
0
    return;
872
873
  /* TODO: Neater algorithm than just copying and iterating? */
874
0
  workspace =
875
0
    MemoryContextAlloc(builder->context,
876
0
               builder->committed.xcnt * sizeof(TransactionId));
877
878
  /* copy xids that still are interesting to workspace */
879
0
  for (off = 0; off < builder->committed.xcnt; off++)
880
0
  {
881
0
    if (NormalTransactionIdPrecedes(builder->committed.xip[off],
882
0
                    builder->xmin))
883
0
      ;          /* remove */
884
0
    else
885
0
      workspace[surviving_xids++] = builder->committed.xip[off];
886
0
  }
887
888
  /* copy workspace back to persistent state */
889
0
  memcpy(builder->committed.xip, workspace,
890
0
       surviving_xids * sizeof(TransactionId));
891
892
0
  elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u",
893
0
     (uint32) builder->committed.xcnt, (uint32) surviving_xids,
894
0
     builder->xmin, builder->xmax);
895
0
  builder->committed.xcnt = surviving_xids;
896
897
0
  pfree(workspace);
898
899
  /*
900
   * Purge xids in ->catchange as well. The purged array must also be sorted
901
   * in xidComparator order.
902
   */
903
0
  if (builder->catchange.xcnt > 0)
904
0
  {
905
    /*
906
     * Since catchange.xip is sorted, we find the lower bound of xids that
907
     * are still interesting.
908
     */
909
0
    for (off = 0; off < builder->catchange.xcnt; off++)
910
0
    {
911
0
      if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
912
0
                       builder->xmin))
913
0
        break;
914
0
    }
915
916
0
    surviving_xids = builder->catchange.xcnt - off;
917
918
0
    if (surviving_xids > 0)
919
0
    {
920
0
      memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
921
0
          surviving_xids * sizeof(TransactionId));
922
0
    }
923
0
    else
924
0
    {
925
0
      pfree(builder->catchange.xip);
926
0
      builder->catchange.xip = NULL;
927
0
    }
928
929
0
    elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
930
0
       (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
931
0
       builder->xmin, builder->xmax);
932
0
    builder->catchange.xcnt = surviving_xids;
933
0
  }
934
0
}
935
936
/*
937
 * Handle everything that needs to be done when a transaction commits
938
 */
939
void
940
SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid,
941
           int nsubxacts, TransactionId *subxacts, uint32 xinfo)
942
0
{
943
0
  int     nxact;
944
945
0
  bool    needs_snapshot = false;
946
0
  bool    needs_timetravel = false;
947
0
  bool    sub_needs_timetravel = false;
948
949
0
  TransactionId xmax = xid;
950
951
  /*
952
   * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor
953
   * will they be part of a snapshot.  So we don't need to record anything.
954
   */
955
0
  if (builder->state == SNAPBUILD_START ||
956
0
    (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
957
0
     TransactionIdPrecedes(xid, builder->next_phase_at)))
958
0
  {
959
    /* ensure that only commits after this are getting replayed */
960
0
    if (builder->start_decoding_at <= lsn)
961
0
      builder->start_decoding_at = lsn + 1;
962
0
    return;
963
0
  }
964
965
0
  if (builder->state < SNAPBUILD_CONSISTENT)
966
0
  {
967
    /* ensure that only commits after this are getting replayed */
968
0
    if (builder->start_decoding_at <= lsn)
969
0
      builder->start_decoding_at = lsn + 1;
970
971
    /*
972
     * If building an exportable snapshot, force xid to be tracked, even
973
     * if the transaction didn't modify the catalog.
974
     */
975
0
    if (builder->building_full_snapshot)
976
0
    {
977
0
      needs_timetravel = true;
978
0
    }
979
0
  }
980
981
0
  for (nxact = 0; nxact < nsubxacts; nxact++)
982
0
  {
983
0
    TransactionId subxid = subxacts[nxact];
984
985
    /*
986
     * Add subtransaction to base snapshot if catalog modifying, we don't
987
     * distinguish to toplevel transactions there.
988
     */
989
0
    if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo))
990
0
    {
991
0
      sub_needs_timetravel = true;
992
0
      needs_snapshot = true;
993
994
0
      elog(DEBUG1, "found subtransaction %u:%u with catalog changes",
995
0
         xid, subxid);
996
997
0
      SnapBuildAddCommittedTxn(builder, subxid);
998
999
0
      if (NormalTransactionIdFollows(subxid, xmax))
1000
0
        xmax = subxid;
1001
0
    }
1002
1003
    /*
1004
     * If we're forcing timetravel we also need visibility information
1005
     * about subtransaction, so keep track of subtransaction's state, even
1006
     * if not catalog modifying.  Don't need to distribute a snapshot in
1007
     * that case.
1008
     */
1009
0
    else if (needs_timetravel)
1010
0
    {
1011
0
      SnapBuildAddCommittedTxn(builder, subxid);
1012
0
      if (NormalTransactionIdFollows(subxid, xmax))
1013
0
        xmax = subxid;
1014
0
    }
1015
0
  }
1016
1017
  /* if top-level modified catalog, it'll need a snapshot */
1018
0
  if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo))
1019
0
  {
1020
0
    elog(DEBUG2, "found top level transaction %u, with catalog changes",
1021
0
       xid);
1022
0
    needs_snapshot = true;
1023
0
    needs_timetravel = true;
1024
0
    SnapBuildAddCommittedTxn(builder, xid);
1025
0
  }
1026
0
  else if (sub_needs_timetravel)
1027
0
  {
1028
    /* track toplevel txn as well, subxact alone isn't meaningful */
1029
0
    elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions",
1030
0
       xid);
1031
0
    needs_timetravel = true;
1032
0
    SnapBuildAddCommittedTxn(builder, xid);
1033
0
  }
1034
0
  else if (needs_timetravel)
1035
0
  {
1036
0
    elog(DEBUG2, "forced transaction %u to do timetravel", xid);
1037
1038
0
    SnapBuildAddCommittedTxn(builder, xid);
1039
0
  }
1040
1041
0
  if (!needs_timetravel)
1042
0
  {
1043
    /* record that we cannot export a general snapshot anymore */
1044
0
    builder->committed.includes_all_transactions = false;
1045
0
  }
1046
1047
0
  Assert(!needs_snapshot || needs_timetravel);
1048
1049
  /*
1050
   * Adjust xmax of the snapshot builder, we only do that for committed,
1051
   * catalog modifying, transactions, everything else isn't interesting for
1052
   * us since we'll never look at the respective rows.
1053
   */
1054
0
  if (needs_timetravel &&
1055
0
    (!TransactionIdIsValid(builder->xmax) ||
1056
0
     TransactionIdFollowsOrEquals(xmax, builder->xmax)))
1057
0
  {
1058
0
    builder->xmax = xmax;
1059
0
    TransactionIdAdvance(builder->xmax);
1060
0
  }
1061
1062
  /* if there's any reason to build a historic snapshot, do so now */
1063
0
  if (needs_snapshot)
1064
0
  {
1065
    /*
1066
     * If we haven't built a complete snapshot yet there's no need to hand
1067
     * it out, it wouldn't (and couldn't) be used anyway.
1068
     */
1069
0
    if (builder->state < SNAPBUILD_FULL_SNAPSHOT)
1070
0
      return;
1071
1072
    /*
1073
     * Decrease the snapshot builder's refcount of the old snapshot, note
1074
     * that it still will be used if it has been handed out to the
1075
     * reorderbuffer earlier.
1076
     */
1077
0
    if (builder->snapshot)
1078
0
      SnapBuildSnapDecRefcount(builder->snapshot);
1079
1080
0
    builder->snapshot = SnapBuildBuildSnapshot(builder);
1081
1082
    /* we might need to execute invalidations, add snapshot */
1083
0
    if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid))
1084
0
    {
1085
0
      SnapBuildSnapIncRefcount(builder->snapshot);
1086
0
      ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn,
1087
0
                     builder->snapshot);
1088
0
    }
1089
1090
    /* refcount of the snapshot builder for the new snapshot */
1091
0
    SnapBuildSnapIncRefcount(builder->snapshot);
1092
1093
    /*
1094
     * Add a new catalog snapshot and invalidations messages to all
1095
     * currently running transactions.
1096
     */
1097
0
    SnapBuildDistributeSnapshotAndInval(builder, lsn, xid);
1098
0
  }
1099
0
}
1100
1101
/*
1102
 * Check the reorder buffer and the snapshot to see if the given transaction has
1103
 * modified catalogs.
1104
 */
1105
static inline bool
1106
SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid,
1107
                uint32 xinfo)
1108
0
{
1109
0
  if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid))
1110
0
    return true;
1111
1112
  /*
1113
   * The transactions that have changed catalogs must have invalidation
1114
   * info.
1115
   */
1116
0
  if (!(xinfo & XACT_XINFO_HAS_INVALS))
1117
0
    return false;
1118
1119
  /* Check the catchange XID array */
1120
0
  return ((builder->catchange.xcnt > 0) &&
1121
0
      (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt,
1122
0
           sizeof(TransactionId), xidComparator) != NULL));
1123
0
}
1124
1125
/* -----------------------------------
1126
 * Snapshot building functions dealing with xlog records
1127
 * -----------------------------------
1128
 */
1129
1130
/*
1131
 * Process a running xacts record, and use its information to first build a
1132
 * historic snapshot and later to release resources that aren't needed
1133
 * anymore.
1134
 */
1135
void
1136
SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1137
0
{
1138
0
  ReorderBufferTXN *txn;
1139
0
  TransactionId xmin;
1140
1141
  /*
1142
   * If we're not consistent yet, inspect the record to see whether it
1143
   * allows to get closer to being consistent. If we are consistent, dump
1144
   * our snapshot so others or we, after a restart, can use it.
1145
   */
1146
0
  if (builder->state < SNAPBUILD_CONSISTENT)
1147
0
  {
1148
    /* returns false if there's no point in performing cleanup just yet */
1149
0
    if (!SnapBuildFindSnapshot(builder, lsn, running))
1150
0
      return;
1151
0
  }
1152
0
  else
1153
0
    SnapBuildSerialize(builder, lsn);
1154
1155
  /*
1156
   * Update range of interesting xids based on the running xacts
1157
   * information. We don't increase ->xmax using it, because once we are in
1158
   * a consistent state we can do that ourselves and much more efficiently
1159
   * so, because we only need to do it for catalog transactions since we
1160
   * only ever look at those.
1161
   *
1162
   * NB: We only increase xmax when a catalog modifying transaction commits
1163
   * (see SnapBuildCommitTxn).  Because of this, xmax can be lower than
1164
   * xmin, which looks odd but is correct and actually more efficient, since
1165
   * we hit fast paths in heapam_visibility.c.
1166
   */
1167
0
  builder->xmin = running->oldestRunningXid;
1168
1169
  /* Remove transactions we don't need to keep track off anymore */
1170
0
  SnapBuildPurgeOlderTxn(builder);
1171
1172
  /*
1173
   * Advance the xmin limit for the current replication slot, to allow
1174
   * vacuum to clean up the tuples this slot has been protecting.
1175
   *
1176
   * The reorderbuffer might have an xmin among the currently running
1177
   * snapshots; use it if so.  If not, we need only consider the snapshots
1178
   * we'll produce later, which can't be less than the oldest running xid in
1179
   * the record we're reading now.
1180
   */
1181
0
  xmin = ReorderBufferGetOldestXmin(builder->reorder);
1182
0
  if (xmin == InvalidTransactionId)
1183
0
    xmin = running->oldestRunningXid;
1184
0
  elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u",
1185
0
     builder->xmin, builder->xmax, running->oldestRunningXid, xmin);
1186
0
  LogicalIncreaseXminForSlot(lsn, xmin);
1187
1188
  /*
1189
   * Also tell the slot where we can restart decoding from. We don't want to
1190
   * do that after every commit because changing that implies an fsync of
1191
   * the logical slot's state file, so we only do it every time we see a
1192
   * running xacts record.
1193
   *
1194
   * Do so by looking for the oldest in progress transaction (determined by
1195
   * the first LSN of any of its relevant records). Every transaction
1196
   * remembers the last location we stored the snapshot to disk before its
1197
   * beginning. That point is where we can restart from.
1198
   */
1199
1200
  /*
1201
   * Can't know about a serialized snapshot's location if we're not
1202
   * consistent.
1203
   */
1204
0
  if (builder->state < SNAPBUILD_CONSISTENT)
1205
0
    return;
1206
1207
0
  txn = ReorderBufferGetOldestTXN(builder->reorder);
1208
1209
  /*
1210
   * oldest ongoing txn might have started when we didn't yet serialize
1211
   * anything because we hadn't reached a consistent state yet.
1212
   */
1213
0
  if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr)
1214
0
    LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn);
1215
1216
  /*
1217
   * No in-progress transaction, can reuse the last serialized snapshot if
1218
   * we have one.
1219
   */
1220
0
  else if (txn == NULL &&
1221
0
       builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr &&
1222
0
       builder->last_serialized_snapshot != InvalidXLogRecPtr)
1223
0
    LogicalIncreaseRestartDecodingForSlot(lsn,
1224
0
                        builder->last_serialized_snapshot);
1225
0
}
1226
1227
1228
/*
1229
 * Build the start of a snapshot that's capable of decoding the catalog.
1230
 *
1231
 * Helper function for SnapBuildProcessRunningXacts() while we're not yet
1232
 * consistent.
1233
 *
1234
 * Returns true if there is a point in performing internal maintenance/cleanup
1235
 * using the xl_running_xacts record.
1236
 */
1237
static bool
1238
SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running)
1239
0
{
1240
  /* ---
1241
   * Build catalog decoding snapshot incrementally using information about
1242
   * the currently running transactions. There are several ways to do that:
1243
   *
1244
   * a) There were no running transactions when the xl_running_xacts record
1245
   *    was inserted, jump to CONSISTENT immediately. We might find such a
1246
   *    state while waiting on c)'s sub-states.
1247
   *
1248
   * b) This (in a previous run) or another decoding slot serialized a
1249
   *    snapshot to disk that we can use. Can't use this method while finding
1250
   *    the start point for decoding changes as the restart LSN would be an
1251
   *    arbitrary LSN but we need to find the start point to extract changes
1252
   *    where we won't see the data for partial transactions. Also, we cannot
1253
   *    use this method when a slot needs a full snapshot for export or direct
1254
   *    use, as that snapshot will only contain catalog modifying transactions.
1255
   *
1256
   * c) First incrementally build a snapshot for catalog tuples
1257
   *    (BUILDING_SNAPSHOT), that requires all, already in-progress,
1258
   *    transactions to finish.  Every transaction starting after that
1259
   *    (FULL_SNAPSHOT state), has enough information to be decoded.  But
1260
   *    for older running transactions no viable snapshot exists yet, so
1261
   *    CONSISTENT will only be reached once all of those have finished.
1262
   * ---
1263
   */
1264
1265
  /*
1266
   * xl_running_xacts record is older than what we can use, we might not
1267
   * have all necessary catalog rows anymore.
1268
   */
1269
0
  if (TransactionIdIsNormal(builder->initial_xmin_horizon) &&
1270
0
    NormalTransactionIdPrecedes(running->oldestRunningXid,
1271
0
                  builder->initial_xmin_horizon))
1272
0
  {
1273
0
    ereport(DEBUG1,
1274
0
        errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low",
1275
0
                LSN_FORMAT_ARGS(lsn)),
1276
0
        errdetail_internal("initial xmin horizon of %u vs the snapshot's %u",
1277
0
                   builder->initial_xmin_horizon, running->oldestRunningXid));
1278
1279
1280
0
    SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon);
1281
1282
0
    return true;
1283
0
  }
1284
1285
  /*
1286
   * a) No transaction were running, we can jump to consistent.
1287
   *
1288
   * This is not affected by races around xl_running_xacts, because we can
1289
   * miss transaction commits, but currently not transactions starting.
1290
   *
1291
   * NB: We might have already started to incrementally assemble a snapshot,
1292
   * so we need to be careful to deal with that.
1293
   */
1294
0
  if (running->oldestRunningXid == running->nextXid)
1295
0
  {
1296
0
    if (builder->start_decoding_at == InvalidXLogRecPtr ||
1297
0
      builder->start_decoding_at <= lsn)
1298
      /* can decode everything after this */
1299
0
      builder->start_decoding_at = lsn + 1;
1300
1301
    /* As no transactions were running xmin/xmax can be trivially set. */
1302
0
    builder->xmin = running->nextXid; /* < are finished */
1303
0
    builder->xmax = running->nextXid; /* >= are running */
1304
1305
    /* so we can safely use the faster comparisons */
1306
0
    Assert(TransactionIdIsNormal(builder->xmin));
1307
0
    Assert(TransactionIdIsNormal(builder->xmax));
1308
1309
0
    builder->state = SNAPBUILD_CONSISTENT;
1310
0
    builder->next_phase_at = InvalidTransactionId;
1311
1312
0
    ereport(LOG,
1313
0
        errmsg("logical decoding found consistent point at %X/%08X",
1314
0
             LSN_FORMAT_ARGS(lsn)),
1315
0
        errdetail("There are no running transactions."));
1316
1317
0
    return false;
1318
0
  }
1319
1320
  /*
1321
   * b) valid on disk state and while neither building full snapshot nor
1322
   * creating a slot.
1323
   */
1324
0
  else if (!builder->building_full_snapshot &&
1325
0
       !builder->in_slot_creation &&
1326
0
       SnapBuildRestore(builder, lsn))
1327
0
  {
1328
    /* there won't be any state to cleanup */
1329
0
    return false;
1330
0
  }
1331
1332
  /*
1333
   * c) transition from START to BUILDING_SNAPSHOT.
1334
   *
1335
   * In START state, and a xl_running_xacts record with running xacts is
1336
   * encountered.  In that case, switch to BUILDING_SNAPSHOT state, and
1337
   * record xl_running_xacts->nextXid.  Once all running xacts have finished
1338
   * (i.e. they're all >= nextXid), we have a complete catalog snapshot.  It
1339
   * might look that we could use xl_running_xacts's ->xids information to
1340
   * get there quicker, but that is problematic because transactions marked
1341
   * as running, might already have inserted their commit record - it's
1342
   * infeasible to change that with locking.
1343
   */
1344
0
  else if (builder->state == SNAPBUILD_START)
1345
0
  {
1346
0
    builder->state = SNAPBUILD_BUILDING_SNAPSHOT;
1347
0
    builder->next_phase_at = running->nextXid;
1348
1349
    /*
1350
     * Start with an xmin/xmax that's correct for future, when all the
1351
     * currently running transactions have finished. We'll update both
1352
     * while waiting for the pending transactions to finish.
1353
     */
1354
0
    builder->xmin = running->nextXid; /* < are finished */
1355
0
    builder->xmax = running->nextXid; /* >= are running */
1356
1357
    /* so we can safely use the faster comparisons */
1358
0
    Assert(TransactionIdIsNormal(builder->xmin));
1359
0
    Assert(TransactionIdIsNormal(builder->xmax));
1360
1361
0
    ereport(LOG,
1362
0
        errmsg("logical decoding found initial starting point at %X/%08X",
1363
0
             LSN_FORMAT_ARGS(lsn)),
1364
0
        errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1365
0
              running->xcnt, running->nextXid));
1366
1367
0
    SnapBuildWaitSnapshot(running, running->nextXid);
1368
0
  }
1369
1370
  /*
1371
   * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT.
1372
   *
1373
   * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid
1374
   * is >= than nextXid from when we switched to BUILDING_SNAPSHOT.  This
1375
   * means all transactions starting afterwards have enough information to
1376
   * be decoded.  Switch to FULL_SNAPSHOT.
1377
   */
1378
0
  else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT &&
1379
0
       TransactionIdPrecedesOrEquals(builder->next_phase_at,
1380
0
                       running->oldestRunningXid))
1381
0
  {
1382
0
    builder->state = SNAPBUILD_FULL_SNAPSHOT;
1383
0
    builder->next_phase_at = running->nextXid;
1384
1385
0
    ereport(LOG,
1386
0
        errmsg("logical decoding found initial consistent point at %X/%08X",
1387
0
             LSN_FORMAT_ARGS(lsn)),
1388
0
        errdetail("Waiting for transactions (approximately %d) older than %u to end.",
1389
0
              running->xcnt, running->nextXid));
1390
1391
0
    SnapBuildWaitSnapshot(running, running->nextXid);
1392
0
  }
1393
1394
  /*
1395
   * c) transition from FULL_SNAPSHOT to CONSISTENT.
1396
   *
1397
   * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is
1398
   * >= than nextXid from when we switched to FULL_SNAPSHOT.  This means all
1399
   * transactions that are currently in progress have a catalog snapshot,
1400
   * and all their changes have been collected.  Switch to CONSISTENT.
1401
   */
1402
0
  else if (builder->state == SNAPBUILD_FULL_SNAPSHOT &&
1403
0
       TransactionIdPrecedesOrEquals(builder->next_phase_at,
1404
0
                       running->oldestRunningXid))
1405
0
  {
1406
0
    builder->state = SNAPBUILD_CONSISTENT;
1407
0
    builder->next_phase_at = InvalidTransactionId;
1408
1409
0
    ereport(LOG,
1410
0
        errmsg("logical decoding found consistent point at %X/%08X",
1411
0
             LSN_FORMAT_ARGS(lsn)),
1412
0
        errdetail("There are no old transactions anymore."));
1413
0
  }
1414
1415
  /*
1416
   * We already started to track running xacts and need to wait for all
1417
   * in-progress ones to finish. We fall through to the normal processing of
1418
   * records so incremental cleanup can be performed.
1419
   */
1420
0
  return true;
1421
0
}
1422
1423
/* ---
1424
 * Iterate through xids in record, wait for all older than the cutoff to
1425
 * finish.  Then, if possible, log a new xl_running_xacts record.
1426
 *
1427
 * This isn't required for the correctness of decoding, but to:
1428
 * a) allow isolationtester to notice that we're currently waiting for
1429
 *    something.
1430
 * b) log a new xl_running_xacts record where it'd be helpful, without having
1431
 *    to wait for bgwriter or checkpointer.
1432
 * ---
1433
 */
1434
static void
1435
SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff)
1436
0
{
1437
0
  int     off;
1438
1439
0
  for (off = 0; off < running->xcnt; off++)
1440
0
  {
1441
0
    TransactionId xid = running->xids[off];
1442
1443
    /*
1444
     * Upper layers should prevent that we ever need to wait on ourselves.
1445
     * Check anyway, since failing to do so would either result in an
1446
     * endless wait or an Assert() failure.
1447
     */
1448
0
    if (TransactionIdIsCurrentTransactionId(xid))
1449
0
      elog(ERROR, "waiting for ourselves");
1450
1451
0
    if (TransactionIdFollows(xid, cutoff))
1452
0
      continue;
1453
1454
0
    XactLockTableWait(xid, NULL, NULL, XLTW_None);
1455
0
  }
1456
1457
  /*
1458
   * All transactions we needed to finish finished - try to ensure there is
1459
   * another xl_running_xacts record in a timely manner, without having to
1460
   * wait for bgwriter or checkpointer to log one.  During recovery we can't
1461
   * enforce that, so we'll have to wait.
1462
   */
1463
0
  if (!RecoveryInProgress())
1464
0
  {
1465
0
    LogStandbySnapshot();
1466
0
  }
1467
0
}
1468
1469
#define SnapBuildOnDiskConstantSize \
1470
0
  offsetof(SnapBuildOnDisk, builder)
1471
#define SnapBuildOnDiskNotChecksummedSize \
1472
  offsetof(SnapBuildOnDisk, version)
1473
1474
0
#define SNAPBUILD_MAGIC 0x51A1E001
1475
0
#define SNAPBUILD_VERSION 6
1476
1477
/*
1478
 * Store/Load a snapshot from disk, depending on the snapshot builder's state.
1479
 *
1480
 * Supposed to be used by external (i.e. not snapbuild.c) code that just read
1481
 * a record that's a potential location for a serialized snapshot.
1482
 */
1483
void
1484
SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn)
1485
0
{
1486
0
  if (builder->state < SNAPBUILD_CONSISTENT)
1487
0
    SnapBuildRestore(builder, lsn);
1488
0
  else
1489
0
    SnapBuildSerialize(builder, lsn);
1490
0
}
1491
1492
/*
1493
 * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already
1494
 * been done by another decoding process.
1495
 */
1496
static void
1497
SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn)
1498
0
{
1499
0
  Size    needed_length;
1500
0
  SnapBuildOnDisk *ondisk = NULL;
1501
0
  TransactionId *catchange_xip = NULL;
1502
0
  MemoryContext old_ctx;
1503
0
  size_t    catchange_xcnt;
1504
0
  char     *ondisk_c;
1505
0
  int     fd;
1506
0
  char    tmppath[MAXPGPATH];
1507
0
  char    path[MAXPGPATH];
1508
0
  int     ret;
1509
0
  struct stat stat_buf;
1510
0
  Size    sz;
1511
1512
0
  Assert(lsn != InvalidXLogRecPtr);
1513
0
  Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr ||
1514
0
       builder->last_serialized_snapshot <= lsn);
1515
1516
  /*
1517
   * no point in serializing if we cannot continue to work immediately after
1518
   * restoring the snapshot
1519
   */
1520
0
  if (builder->state < SNAPBUILD_CONSISTENT)
1521
0
    return;
1522
1523
  /* consistent snapshots have no next phase */
1524
0
  Assert(builder->next_phase_at == InvalidTransactionId);
1525
1526
  /*
1527
   * We identify snapshots by the LSN they are valid for. We don't need to
1528
   * include timelines in the name as each LSN maps to exactly one timeline
1529
   * unless the user used pg_resetwal or similar. If a user did so, there's
1530
   * no hope continuing to decode anyway.
1531
   */
1532
0
  sprintf(path, "%s/%X-%X.snap",
1533
0
      PG_LOGICAL_SNAPSHOTS_DIR,
1534
0
      LSN_FORMAT_ARGS(lsn));
1535
1536
  /*
1537
   * first check whether some other backend already has written the snapshot
1538
   * for this LSN. It's perfectly fine if there's none, so we accept ENOENT
1539
   * as a valid state. Everything else is an unexpected error.
1540
   */
1541
0
  ret = stat(path, &stat_buf);
1542
1543
0
  if (ret != 0 && errno != ENOENT)
1544
0
    ereport(ERROR,
1545
0
        (errcode_for_file_access(),
1546
0
         errmsg("could not stat file \"%s\": %m", path)));
1547
1548
0
  else if (ret == 0)
1549
0
  {
1550
    /*
1551
     * somebody else has already serialized to this point, don't overwrite
1552
     * but remember location, so we don't need to read old data again.
1553
     *
1554
     * To be sure it has been synced to disk after the rename() from the
1555
     * tempfile filename to the real filename, we just repeat the fsync.
1556
     * That ought to be cheap because in most scenarios it should already
1557
     * be safely on disk.
1558
     */
1559
0
    fsync_fname(path, false);
1560
0
    fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1561
1562
0
    builder->last_serialized_snapshot = lsn;
1563
0
    goto out;
1564
0
  }
1565
1566
  /*
1567
   * there is an obvious race condition here between the time we stat(2) the
1568
   * file and us writing the file. But we rename the file into place
1569
   * atomically and all files created need to contain the same data anyway,
1570
   * so this is perfectly fine, although a bit of a resource waste. Locking
1571
   * seems like pointless complication.
1572
   */
1573
0
  elog(DEBUG1, "serializing snapshot to %s", path);
1574
1575
  /* to make sure only we will write to this tempfile, include pid */
1576
0
  sprintf(tmppath, "%s/%X-%X.snap.%d.tmp",
1577
0
      PG_LOGICAL_SNAPSHOTS_DIR,
1578
0
      LSN_FORMAT_ARGS(lsn), MyProcPid);
1579
1580
  /*
1581
   * Unlink temporary file if it already exists, needs to have been before a
1582
   * crash/error since we won't enter this function twice from within a
1583
   * single decoding slot/backend and the temporary file contains the pid of
1584
   * the current process.
1585
   */
1586
0
  if (unlink(tmppath) != 0 && errno != ENOENT)
1587
0
    ereport(ERROR,
1588
0
        (errcode_for_file_access(),
1589
0
         errmsg("could not remove file \"%s\": %m", tmppath)));
1590
1591
0
  old_ctx = MemoryContextSwitchTo(builder->context);
1592
1593
  /* Get the catalog modifying transactions that are yet not committed */
1594
0
  catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder);
1595
0
  catchange_xcnt = dclist_count(&builder->reorder->catchange_txns);
1596
1597
0
  needed_length = sizeof(SnapBuildOnDisk) +
1598
0
    sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt);
1599
1600
0
  ondisk_c = palloc0(needed_length);
1601
0
  ondisk = (SnapBuildOnDisk *) ondisk_c;
1602
0
  ondisk->magic = SNAPBUILD_MAGIC;
1603
0
  ondisk->version = SNAPBUILD_VERSION;
1604
0
  ondisk->length = needed_length;
1605
0
  INIT_CRC32C(ondisk->checksum);
1606
0
  COMP_CRC32C(ondisk->checksum,
1607
0
        ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1608
0
        SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1609
0
  ondisk_c += sizeof(SnapBuildOnDisk);
1610
1611
0
  memcpy(&ondisk->builder, builder, sizeof(SnapBuild));
1612
  /* NULL-ify memory-only data */
1613
0
  ondisk->builder.context = NULL;
1614
0
  ondisk->builder.snapshot = NULL;
1615
0
  ondisk->builder.reorder = NULL;
1616
0
  ondisk->builder.committed.xip = NULL;
1617
0
  ondisk->builder.catchange.xip = NULL;
1618
  /* update catchange only on disk data */
1619
0
  ondisk->builder.catchange.xcnt = catchange_xcnt;
1620
1621
0
  COMP_CRC32C(ondisk->checksum,
1622
0
        &ondisk->builder,
1623
0
        sizeof(SnapBuild));
1624
1625
  /* copy committed xacts */
1626
0
  if (builder->committed.xcnt > 0)
1627
0
  {
1628
0
    sz = sizeof(TransactionId) * builder->committed.xcnt;
1629
0
    memcpy(ondisk_c, builder->committed.xip, sz);
1630
0
    COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1631
0
    ondisk_c += sz;
1632
0
  }
1633
1634
  /* copy catalog modifying xacts */
1635
0
  if (catchange_xcnt > 0)
1636
0
  {
1637
0
    sz = sizeof(TransactionId) * catchange_xcnt;
1638
0
    memcpy(ondisk_c, catchange_xip, sz);
1639
0
    COMP_CRC32C(ondisk->checksum, ondisk_c, sz);
1640
0
    ondisk_c += sz;
1641
0
  }
1642
1643
0
  FIN_CRC32C(ondisk->checksum);
1644
1645
  /* we have valid data now, open tempfile and write it there */
1646
0
  fd = OpenTransientFile(tmppath,
1647
0
               O_CREAT | O_EXCL | O_WRONLY | PG_BINARY);
1648
0
  if (fd < 0)
1649
0
    ereport(ERROR,
1650
0
        (errcode_for_file_access(),
1651
0
         errmsg("could not open file \"%s\": %m", tmppath)));
1652
1653
0
  errno = 0;
1654
0
  pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE);
1655
0
  if ((write(fd, ondisk, needed_length)) != needed_length)
1656
0
  {
1657
0
    int     save_errno = errno;
1658
1659
0
    CloseTransientFile(fd);
1660
1661
    /* if write didn't set errno, assume problem is no disk space */
1662
0
    errno = save_errno ? save_errno : ENOSPC;
1663
0
    ereport(ERROR,
1664
0
        (errcode_for_file_access(),
1665
0
         errmsg("could not write to file \"%s\": %m", tmppath)));
1666
0
  }
1667
0
  pgstat_report_wait_end();
1668
1669
  /*
1670
   * fsync the file before renaming so that even if we crash after this we
1671
   * have either a fully valid file or nothing.
1672
   *
1673
   * It's safe to just ERROR on fsync() here because we'll retry the whole
1674
   * operation including the writes.
1675
   *
1676
   * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has
1677
   * some noticeable overhead since it's performed synchronously during
1678
   * decoding?
1679
   */
1680
0
  pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC);
1681
0
  if (pg_fsync(fd) != 0)
1682
0
  {
1683
0
    int     save_errno = errno;
1684
1685
0
    CloseTransientFile(fd);
1686
0
    errno = save_errno;
1687
0
    ereport(ERROR,
1688
0
        (errcode_for_file_access(),
1689
0
         errmsg("could not fsync file \"%s\": %m", tmppath)));
1690
0
  }
1691
0
  pgstat_report_wait_end();
1692
1693
0
  if (CloseTransientFile(fd) != 0)
1694
0
    ereport(ERROR,
1695
0
        (errcode_for_file_access(),
1696
0
         errmsg("could not close file \"%s\": %m", tmppath)));
1697
1698
0
  fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1699
1700
  /*
1701
   * We may overwrite the work from some other backend, but that's ok, our
1702
   * snapshot is valid as well, we'll just have done some superfluous work.
1703
   */
1704
0
  if (rename(tmppath, path) != 0)
1705
0
  {
1706
0
    ereport(ERROR,
1707
0
        (errcode_for_file_access(),
1708
0
         errmsg("could not rename file \"%s\" to \"%s\": %m",
1709
0
            tmppath, path)));
1710
0
  }
1711
1712
  /* make sure we persist */
1713
0
  fsync_fname(path, false);
1714
0
  fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1715
1716
  /*
1717
   * Now there's no way we can lose the dumped state anymore, remember this
1718
   * as a serialization point.
1719
   */
1720
0
  builder->last_serialized_snapshot = lsn;
1721
1722
0
  MemoryContextSwitchTo(old_ctx);
1723
1724
0
out:
1725
0
  ReorderBufferSetRestartPoint(builder->reorder,
1726
0
                 builder->last_serialized_snapshot);
1727
  /* be tidy */
1728
0
  if (ondisk)
1729
0
    pfree(ondisk);
1730
0
  if (catchange_xip)
1731
0
    pfree(catchange_xip);
1732
0
}
1733
1734
/*
1735
 * Restore the logical snapshot file contents to 'ondisk'.
1736
 *
1737
 * 'context' is the memory context where the catalog modifying/committed xid
1738
 * will live.
1739
 * If 'missing_ok' is true, will not throw an error if the file is not found.
1740
 */
1741
bool
1742
SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
1743
             MemoryContext context, bool missing_ok)
1744
0
{
1745
0
  int     fd;
1746
0
  pg_crc32c checksum;
1747
0
  Size    sz;
1748
0
  char    path[MAXPGPATH];
1749
1750
0
  sprintf(path, "%s/%X-%X.snap",
1751
0
      PG_LOGICAL_SNAPSHOTS_DIR,
1752
0
      LSN_FORMAT_ARGS(lsn));
1753
1754
0
  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
1755
1756
0
  if (fd < 0)
1757
0
  {
1758
0
    if (missing_ok && errno == ENOENT)
1759
0
      return false;
1760
1761
0
    ereport(ERROR,
1762
0
        (errcode_for_file_access(),
1763
0
         errmsg("could not open file \"%s\": %m", path)));
1764
0
  }
1765
1766
  /* ----
1767
   * Make sure the snapshot had been stored safely to disk, that's normally
1768
   * cheap.
1769
   * Note that we do not need PANIC here, nobody will be able to use the
1770
   * slot without fsyncing, and saving it won't succeed without an fsync()
1771
   * either...
1772
   * ----
1773
   */
1774
0
  fsync_fname(path, false);
1775
0
  fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true);
1776
1777
  /* read statically sized portion of snapshot */
1778
0
  SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path);
1779
1780
0
  if (ondisk->magic != SNAPBUILD_MAGIC)
1781
0
    ereport(ERROR,
1782
0
        (errcode(ERRCODE_DATA_CORRUPTED),
1783
0
         errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u",
1784
0
            path, ondisk->magic, SNAPBUILD_MAGIC)));
1785
1786
0
  if (ondisk->version != SNAPBUILD_VERSION)
1787
0
    ereport(ERROR,
1788
0
        (errcode(ERRCODE_DATA_CORRUPTED),
1789
0
         errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u",
1790
0
            path, ondisk->version, SNAPBUILD_VERSION)));
1791
1792
0
  INIT_CRC32C(checksum);
1793
0
  COMP_CRC32C(checksum,
1794
0
        ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize,
1795
0
        SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize);
1796
1797
  /* read SnapBuild */
1798
0
  SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path);
1799
0
  COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild));
1800
1801
  /* restore committed xacts information */
1802
0
  if (ondisk->builder.committed.xcnt > 0)
1803
0
  {
1804
0
    sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt;
1805
0
    ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz);
1806
0
    SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path);
1807
0
    COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz);
1808
0
  }
1809
1810
  /* restore catalog modifying xacts information */
1811
0
  if (ondisk->builder.catchange.xcnt > 0)
1812
0
  {
1813
0
    sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt;
1814
0
    ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz);
1815
0
    SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path);
1816
0
    COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz);
1817
0
  }
1818
1819
0
  if (CloseTransientFile(fd) != 0)
1820
0
    ereport(ERROR,
1821
0
        (errcode_for_file_access(),
1822
0
         errmsg("could not close file \"%s\": %m", path)));
1823
1824
0
  FIN_CRC32C(checksum);
1825
1826
  /* verify checksum of what we've read */
1827
0
  if (!EQ_CRC32C(checksum, ondisk->checksum))
1828
0
    ereport(ERROR,
1829
0
        (errcode(ERRCODE_DATA_CORRUPTED),
1830
0
         errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u",
1831
0
            path, checksum, ondisk->checksum)));
1832
1833
0
  return true;
1834
0
}
1835
1836
/*
1837
 * Restore a snapshot into 'builder' if previously one has been stored at the
1838
 * location indicated by 'lsn'. Returns true if successful, false otherwise.
1839
 */
1840
static bool
1841
SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
1842
0
{
1843
0
  SnapBuildOnDisk ondisk;
1844
1845
  /* no point in loading a snapshot if we're already there */
1846
0
  if (builder->state == SNAPBUILD_CONSISTENT)
1847
0
    return false;
1848
1849
  /* validate and restore the snapshot to 'ondisk' */
1850
0
  if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
1851
0
    return false;
1852
1853
  /*
1854
   * ok, we now have a sensible snapshot here, figure out if it has more
1855
   * information than we have.
1856
   */
1857
1858
  /*
1859
   * We are only interested in consistent snapshots for now, comparing
1860
   * whether one incomplete snapshot is more "advanced" seems to be
1861
   * unnecessarily complex.
1862
   */
1863
0
  if (ondisk.builder.state < SNAPBUILD_CONSISTENT)
1864
0
    goto snapshot_not_interesting;
1865
1866
  /*
1867
   * Don't use a snapshot that requires an xmin that we cannot guarantee to
1868
   * be available.
1869
   */
1870
0
  if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon))
1871
0
    goto snapshot_not_interesting;
1872
1873
  /*
1874
   * Consistent snapshots have no next phase. Reset next_phase_at as it is
1875
   * possible that an old value may remain.
1876
   */
1877
0
  Assert(ondisk.builder.next_phase_at == InvalidTransactionId);
1878
0
  builder->next_phase_at = InvalidTransactionId;
1879
1880
  /* ok, we think the snapshot is sensible, copy over everything important */
1881
0
  builder->xmin = ondisk.builder.xmin;
1882
0
  builder->xmax = ondisk.builder.xmax;
1883
0
  builder->state = ondisk.builder.state;
1884
1885
0
  builder->committed.xcnt = ondisk.builder.committed.xcnt;
1886
  /* We only allocated/stored xcnt, not xcnt_space xids ! */
1887
  /* don't overwrite preallocated xip, if we don't have anything here */
1888
0
  if (builder->committed.xcnt > 0)
1889
0
  {
1890
0
    pfree(builder->committed.xip);
1891
0
    builder->committed.xcnt_space = ondisk.builder.committed.xcnt;
1892
0
    builder->committed.xip = ondisk.builder.committed.xip;
1893
0
  }
1894
0
  ondisk.builder.committed.xip = NULL;
1895
1896
  /* set catalog modifying transactions */
1897
0
  if (builder->catchange.xip)
1898
0
    pfree(builder->catchange.xip);
1899
0
  builder->catchange.xcnt = ondisk.builder.catchange.xcnt;
1900
0
  builder->catchange.xip = ondisk.builder.catchange.xip;
1901
0
  ondisk.builder.catchange.xip = NULL;
1902
1903
  /* our snapshot is not interesting anymore, build a new one */
1904
0
  if (builder->snapshot != NULL)
1905
0
  {
1906
0
    SnapBuildSnapDecRefcount(builder->snapshot);
1907
0
  }
1908
0
  builder->snapshot = SnapBuildBuildSnapshot(builder);
1909
0
  SnapBuildSnapIncRefcount(builder->snapshot);
1910
1911
0
  ReorderBufferSetRestartPoint(builder->reorder, lsn);
1912
1913
0
  Assert(builder->state == SNAPBUILD_CONSISTENT);
1914
1915
0
  ereport(LOG,
1916
0
      errmsg("logical decoding found consistent point at %X/%08X",
1917
0
           LSN_FORMAT_ARGS(lsn)),
1918
0
      errdetail("Logical decoding will begin using saved snapshot."));
1919
0
  return true;
1920
1921
0
snapshot_not_interesting:
1922
0
  if (ondisk.builder.committed.xip != NULL)
1923
0
    pfree(ondisk.builder.committed.xip);
1924
0
  if (ondisk.builder.catchange.xip != NULL)
1925
0
    pfree(ondisk.builder.catchange.xip);
1926
0
  return false;
1927
0
}
1928
1929
/*
1930
 * Read the contents of the serialized snapshot to 'dest'.
1931
 */
1932
static void
1933
SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path)
1934
0
{
1935
0
  int     readBytes;
1936
1937
0
  pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ);
1938
0
  readBytes = read(fd, dest, size);
1939
0
  pgstat_report_wait_end();
1940
0
  if (readBytes != size)
1941
0
  {
1942
0
    int     save_errno = errno;
1943
1944
0
    CloseTransientFile(fd);
1945
1946
0
    if (readBytes < 0)
1947
0
    {
1948
0
      errno = save_errno;
1949
0
      ereport(ERROR,
1950
0
          (errcode_for_file_access(),
1951
0
           errmsg("could not read file \"%s\": %m", path)));
1952
0
    }
1953
0
    else
1954
0
      ereport(ERROR,
1955
0
          (errcode(ERRCODE_DATA_CORRUPTED),
1956
0
           errmsg("could not read file \"%s\": read %d of %zu",
1957
0
              path, readBytes, size)));
1958
0
  }
1959
0
}
1960
1961
/*
1962
 * Remove all serialized snapshots that are not required anymore because no
1963
 * slot can need them. This doesn't actually have to run during a checkpoint,
1964
 * but it's a convenient point to schedule this.
1965
 *
1966
 * NB: We run this during checkpoints even if logical decoding is disabled so
1967
 * we cleanup old slots at some point after it got disabled.
1968
 */
1969
void
1970
CheckPointSnapBuild(void)
1971
0
{
1972
0
  XLogRecPtr  cutoff;
1973
0
  XLogRecPtr  redo;
1974
0
  DIR      *snap_dir;
1975
0
  struct dirent *snap_de;
1976
0
  char    path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)];
1977
1978
  /*
1979
   * We start off with a minimum of the last redo pointer. No new
1980
   * replication slot will start before that, so that's a safe upper bound
1981
   * for removal.
1982
   */
1983
0
  redo = GetRedoRecPtr();
1984
1985
  /* now check for the restart ptrs from existing slots */
1986
0
  cutoff = ReplicationSlotsComputeLogicalRestartLSN();
1987
1988
  /* don't start earlier than the restart lsn */
1989
0
  if (redo < cutoff)
1990
0
    cutoff = redo;
1991
1992
0
  snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR);
1993
0
  while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL)
1994
0
  {
1995
0
    uint32    hi;
1996
0
    uint32    lo;
1997
0
    XLogRecPtr  lsn;
1998
0
    PGFileType  de_type;
1999
2000
0
    if (strcmp(snap_de->d_name, ".") == 0 ||
2001
0
      strcmp(snap_de->d_name, "..") == 0)
2002
0
      continue;
2003
2004
0
    snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name);
2005
0
    de_type = get_dirent_type(path, snap_de, false, DEBUG1);
2006
2007
0
    if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG)
2008
0
    {
2009
0
      elog(DEBUG1, "only regular files expected: %s", path);
2010
0
      continue;
2011
0
    }
2012
2013
    /*
2014
     * temporary filenames from SnapBuildSerialize() include the LSN and
2015
     * everything but are postfixed by .$pid.tmp. We can just remove them
2016
     * the same as other files because there can be none that are
2017
     * currently being written that are older than cutoff.
2018
     *
2019
     * We just log a message if a file doesn't fit the pattern, it's
2020
     * probably some editors lock/state file or similar...
2021
     */
2022
0
    if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2)
2023
0
    {
2024
0
      ereport(LOG,
2025
0
          (errmsg("could not parse file name \"%s\"", path)));
2026
0
      continue;
2027
0
    }
2028
2029
0
    lsn = ((uint64) hi) << 32 | lo;
2030
2031
    /* check whether we still need it */
2032
0
    if (lsn < cutoff || cutoff == InvalidXLogRecPtr)
2033
0
    {
2034
0
      elog(DEBUG1, "removing snapbuild snapshot %s", path);
2035
2036
      /*
2037
       * It's not particularly harmful, though strange, if we can't
2038
       * remove the file here. Don't prevent the checkpoint from
2039
       * completing, that'd be a cure worse than the disease.
2040
       */
2041
0
      if (unlink(path) < 0)
2042
0
      {
2043
0
        ereport(LOG,
2044
0
            (errcode_for_file_access(),
2045
0
             errmsg("could not remove file \"%s\": %m",
2046
0
                path)));
2047
0
        continue;
2048
0
      }
2049
0
    }
2050
0
  }
2051
0
  FreeDir(snap_dir);
2052
0
}
2053
2054
/*
2055
 * Check if a logical snapshot at the specified point has been serialized.
2056
 */
2057
bool
2058
SnapBuildSnapshotExists(XLogRecPtr lsn)
2059
0
{
2060
0
  char    path[MAXPGPATH];
2061
0
  int     ret;
2062
0
  struct stat stat_buf;
2063
2064
0
  sprintf(path, "%s/%08X-%08X.snap",
2065
0
      PG_LOGICAL_SNAPSHOTS_DIR,
2066
0
      LSN_FORMAT_ARGS(lsn));
2067
2068
0
  ret = stat(path, &stat_buf);
2069
2070
0
  if (ret != 0 && errno != ENOENT)
2071
0
    ereport(ERROR,
2072
0
        (errcode_for_file_access(),
2073
0
         errmsg("could not stat file \"%s\": %m", path)));
2074
2075
0
  return ret == 0;
2076
0
}