Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/replication/logical/reorderbuffer.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * reorderbuffer.c
4
 *    PostgreSQL logical replay/reorder buffer management
5
 *
6
 *
7
 * Copyright (c) 2012-2025, PostgreSQL Global Development Group
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/replication/logical/reorderbuffer.c
12
 *
13
 * NOTES
14
 *    This module gets handed individual pieces of transactions in the order
15
 *    they are written to the WAL and is responsible to reassemble them into
16
 *    toplevel transaction sized pieces. When a transaction is completely
17
 *    reassembled - signaled by reading the transaction commit record - it
18
 *    will then call the output plugin (cf. ReorderBufferCommit()) with the
19
 *    individual changes. The output plugins rely on snapshots built by
20
 *    snapbuild.c which hands them to us.
21
 *
22
 *    Transactions and subtransactions/savepoints in postgres are not
23
 *    immediately linked to each other from outside the performing
24
 *    backend. Only at commit/abort (or special xact_assignment records) they
25
 *    are linked together. Which means that we will have to splice together a
26
 *    toplevel transaction from its subtransactions. To do that efficiently we
27
 *    build a binary heap indexed by the smallest current lsn of the individual
28
 *    subtransactions' changestreams. As the individual streams are inherently
29
 *    ordered by LSN - since that is where we build them from - the transaction
30
 *    can easily be reassembled by always using the subtransaction with the
31
 *    smallest current LSN from the heap.
32
 *
33
 *    In order to cope with large transactions - which can be several times as
34
 *    big as the available memory - this module supports spooling the contents
35
 *    of large transactions to disk. When the transaction is replayed the
36
 *    contents of individual (sub-)transactions will be read from disk in
37
 *    chunks.
38
 *
39
 *    This module also has to deal with reassembling toast records from the
40
 *    individual chunks stored in WAL. When a new (or initial) version of a
41
 *    tuple is stored in WAL it will always be preceded by the toast chunks
42
 *    emitted for the columns stored out of line. Within a single toplevel
43
 *    transaction there will be no other data carrying records between a row's
44
 *    toast chunks and the row data itself. See ReorderBufferToast* for
45
 *    details.
46
 *
47
 *    ReorderBuffer uses two special memory context types - SlabContext for
48
 *    allocations of fixed-length structures (changes and transactions), and
49
 *    GenerationContext for the variable-length transaction data (allocated
50
 *    and freed in groups with similar lifespans).
51
 *
52
 *    To limit the amount of memory used by decoded changes, we track memory
53
 *    used at the reorder buffer level (i.e. total amount of memory), and for
54
 *    each transaction. When the total amount of used memory exceeds the
55
 *    limit, the transaction consuming the most memory is then serialized to
56
 *    disk.
57
 *
58
 *    Only decoded changes are evicted from memory (spilled to disk), not the
59
 *    transaction records. The number of toplevel transactions is limited,
60
 *    but a transaction with many subtransactions may still consume significant
61
 *    amounts of memory. However, the transaction records are fairly small and
62
 *    are not included in the memory limit.
63
 *
64
 *    The current eviction algorithm is very simple - the transaction is
65
 *    picked merely by size, while it might be useful to also consider age
66
 *    (LSN) of the changes for example. With the new Generational memory
67
 *    allocator, evicting the oldest changes would make it more likely the
68
 *    memory gets actually freed.
69
 *
70
 *    We use a max-heap with transaction size as the key to efficiently find
71
 *    the largest transaction. We update the max-heap whenever the memory
72
 *    counter is updated; however transactions with size 0 are not stored in
73
 *    the heap, because they have no changes to evict.
74
 *
75
 *    We still rely on max_changes_in_memory when loading serialized changes
76
 *    back into memory. At that point we can't use the memory limit directly
77
 *    as we load the subxacts independently. One option to deal with this
78
 *    would be to count the subxacts, and allow each to allocate 1/N of the
79
 *    memory limit. That however does not seem very appealing, because with
80
 *    many subtransactions it may easily cause thrashing (short cycles of
81
 *    deserializing and applying very few changes). We probably should give
82
 *    a bit more memory to the oldest subtransactions, because it's likely
83
 *    they are the source for the next sequence of changes.
84
 *
85
 * -------------------------------------------------------------------------
86
 */
87
#include "postgres.h"
88
89
#include <unistd.h>
90
#include <sys/stat.h>
91
92
#include "access/detoast.h"
93
#include "access/heapam.h"
94
#include "access/rewriteheap.h"
95
#include "access/transam.h"
96
#include "access/xact.h"
97
#include "access/xlog_internal.h"
98
#include "catalog/catalog.h"
99
#include "common/int.h"
100
#include "lib/binaryheap.h"
101
#include "miscadmin.h"
102
#include "pgstat.h"
103
#include "replication/logical.h"
104
#include "replication/reorderbuffer.h"
105
#include "replication/slot.h"
106
#include "replication/snapbuild.h"  /* just for SnapBuildSnapDecRefcount */
107
#include "storage/bufmgr.h"
108
#include "storage/fd.h"
109
#include "storage/procarray.h"
110
#include "storage/sinval.h"
111
#include "utils/builtins.h"
112
#include "utils/memutils.h"
113
#include "utils/rel.h"
114
#include "utils/relfilenumbermap.h"
115
116
/* entry for a hash table we use to map from xid to our transaction state */
117
typedef struct ReorderBufferTXNByIdEnt
118
{
119
  TransactionId xid;
120
  ReorderBufferTXN *txn;
121
} ReorderBufferTXNByIdEnt;
122
123
/* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */
124
typedef struct ReorderBufferTupleCidKey
125
{
126
  RelFileLocator rlocator;
127
  ItemPointerData tid;
128
} ReorderBufferTupleCidKey;
129
130
typedef struct ReorderBufferTupleCidEnt
131
{
132
  ReorderBufferTupleCidKey key;
133
  CommandId cmin;
134
  CommandId cmax;
135
  CommandId combocid;   /* just for debugging */
136
} ReorderBufferTupleCidEnt;
137
138
/* Virtual file descriptor with file offset tracking */
139
typedef struct TXNEntryFile
140
{
141
  File    vfd;      /* -1 when the file is closed */
142
  off_t   curOffset;    /* offset for next write or read. Reset to 0
143
                 * when vfd is opened. */
144
} TXNEntryFile;
145
146
/* k-way in-order change iteration support structures */
147
typedef struct ReorderBufferIterTXNEntry
148
{
149
  XLogRecPtr  lsn;
150
  ReorderBufferChange *change;
151
  ReorderBufferTXN *txn;
152
  TXNEntryFile file;
153
  XLogSegNo segno;
154
} ReorderBufferIterTXNEntry;
155
156
typedef struct ReorderBufferIterTXNState
157
{
158
  binaryheap *heap;
159
  Size    nr_txns;
160
  dlist_head  old_change;
161
  ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER];
162
} ReorderBufferIterTXNState;
163
164
/* toast datastructures */
165
typedef struct ReorderBufferToastEnt
166
{
167
  Oid     chunk_id;   /* toast_table.chunk_id */
168
  int32   last_chunk_seq; /* toast_table.chunk_seq of the last chunk we
169
                 * have seen */
170
  Size    num_chunks;   /* number of chunks we've already seen */
171
  Size    size;     /* combined size of chunks seen */
172
  dlist_head  chunks;     /* linked list of chunks */
173
  struct varlena *reconstructed;  /* reconstructed varlena now pointed to in
174
                   * main tup */
175
} ReorderBufferToastEnt;
176
177
/* Disk serialization support datastructures */
178
typedef struct ReorderBufferDiskChange
179
{
180
  Size    size;
181
  ReorderBufferChange change;
182
  /* data follows */
183
} ReorderBufferDiskChange;
184
185
0
#define IsSpecInsert(action) \
186
0
( \
187
0
  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
188
0
)
189
0
#define IsSpecConfirmOrAbort(action) \
190
0
( \
191
0
  (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \
192
0
  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \
193
0
)
194
0
#define IsInsertOrUpdate(action) \
195
0
( \
196
0
  (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
197
0
  ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
198
0
  ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
199
0
)
200
201
/*
202
 * Maximum number of changes kept in memory, per transaction. After that,
203
 * changes are spooled to disk.
204
 *
205
 * The current value should be sufficient to decode the entire transaction
206
 * without hitting disk in OLTP workloads, while starting to spool to disk in
207
 * other workloads reasonably fast.
208
 *
209
 * At some point in the future it probably makes sense to have a more elaborate
210
 * resource management here, but it's not entirely clear what that would look
211
 * like.
212
 */
213
int     logical_decoding_work_mem;
214
static const Size max_changes_in_memory = 4096; /* XXX for restore only */
215
216
/* GUC variable */
217
int     debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED;
218
219
/* ---------------------------------------
220
 * primary reorderbuffer support routines
221
 * ---------------------------------------
222
 */
223
static ReorderBufferTXN *ReorderBufferAllocTXN(ReorderBuffer *rb);
224
static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
225
static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb,
226
                         TransactionId xid, bool create, bool *is_new,
227
                         XLogRecPtr lsn, bool create_as_top);
228
static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
229
                        ReorderBufferTXN *subtxn);
230
231
static void AssertTXNLsnOrder(ReorderBuffer *rb);
232
233
/* ---------------------------------------
234
 * support functions for lsn-order iterating over the ->changes of a
235
 * transaction and its subtransactions
236
 *
237
 * used for iteration over the k-way heap merge of a transaction and its
238
 * subtransactions
239
 * ---------------------------------------
240
 */
241
static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
242
                   ReorderBufferIterTXNState *volatile *iter_state);
243
static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state);
244
static void ReorderBufferIterTXNFinish(ReorderBuffer *rb,
245
                     ReorderBufferIterTXNState *state);
246
static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs);
247
248
/*
249
 * ---------------------------------------
250
 * Disk serialization support functions
251
 * ---------------------------------------
252
 */
253
static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb);
254
static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
255
static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
256
                     int fd, ReorderBufferChange *change);
257
static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
258
                    TXNEntryFile *file, XLogSegNo *segno);
259
static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
260
                     char *data);
261
static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
262
static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
263
                   bool txn_prepared);
264
static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn);
265
static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
266
static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
267
static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
268
                    TransactionId xid, XLogSegNo segno);
269
static int  ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg);
270
271
static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
272
static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
273
                    ReorderBufferTXN *txn, CommandId cid);
274
275
/*
276
 * ---------------------------------------
277
 * Streaming support functions
278
 * ---------------------------------------
279
 */
280
static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
281
static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
282
static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
283
static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
284
285
/* ---------------------------------------
286
 * toast reassembly support
287
 * ---------------------------------------
288
 */
289
static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn);
290
static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn);
291
static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
292
                    Relation relation, ReorderBufferChange *change);
293
static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
294
                      Relation relation, ReorderBufferChange *change);
295
296
/*
297
 * ---------------------------------------
298
 * memory accounting
299
 * ---------------------------------------
300
 */
301
static Size ReorderBufferChangeSize(ReorderBufferChange *change);
302
static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
303
                      ReorderBufferChange *change,
304
                      ReorderBufferTXN *txn,
305
                      bool addition, Size sz);
306
307
/*
308
 * Allocate a new ReorderBuffer and clean out any old serialized state from
309
 * prior ReorderBuffer instances for the same slot.
310
 */
311
ReorderBuffer *
312
ReorderBufferAllocate(void)
313
0
{
314
0
  ReorderBuffer *buffer;
315
0
  HASHCTL   hash_ctl;
316
0
  MemoryContext new_ctx;
317
318
0
  Assert(MyReplicationSlot != NULL);
319
320
  /* allocate memory in own context, to have better accountability */
321
0
  new_ctx = AllocSetContextCreate(CurrentMemoryContext,
322
0
                  "ReorderBuffer",
323
0
                  ALLOCSET_DEFAULT_SIZES);
324
325
0
  buffer =
326
0
    (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer));
327
328
0
  memset(&hash_ctl, 0, sizeof(hash_ctl));
329
330
0
  buffer->context = new_ctx;
331
332
0
  buffer->change_context = SlabContextCreate(new_ctx,
333
0
                         "Change",
334
0
                         SLAB_DEFAULT_BLOCK_SIZE,
335
0
                         sizeof(ReorderBufferChange));
336
337
0
  buffer->txn_context = SlabContextCreate(new_ctx,
338
0
                      "TXN",
339
0
                      SLAB_DEFAULT_BLOCK_SIZE,
340
0
                      sizeof(ReorderBufferTXN));
341
342
  /*
343
   * To minimize memory fragmentation caused by long-running transactions
344
   * with changes spanning multiple memory blocks, we use a single
345
   * fixed-size memory block for decoded tuple storage. The performance
346
   * testing showed that the default memory block size maintains logical
347
   * decoding performance without causing fragmentation due to concurrent
348
   * transactions. One might think that we can use the max size as
349
   * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve
350
   * the memory fragmentation.
351
   */
352
0
  buffer->tup_context = GenerationContextCreate(new_ctx,
353
0
                          "Tuples",
354
0
                          SLAB_DEFAULT_BLOCK_SIZE,
355
0
                          SLAB_DEFAULT_BLOCK_SIZE,
356
0
                          SLAB_DEFAULT_BLOCK_SIZE);
357
358
0
  hash_ctl.keysize = sizeof(TransactionId);
359
0
  hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt);
360
0
  hash_ctl.hcxt = buffer->context;
361
362
0
  buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl,
363
0
                 HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
364
365
0
  buffer->by_txn_last_xid = InvalidTransactionId;
366
0
  buffer->by_txn_last_txn = NULL;
367
368
0
  buffer->outbuf = NULL;
369
0
  buffer->outbufsize = 0;
370
0
  buffer->size = 0;
371
372
  /* txn_heap is ordered by transaction size */
373
0
  buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL);
374
375
0
  buffer->spillTxns = 0;
376
0
  buffer->spillCount = 0;
377
0
  buffer->spillBytes = 0;
378
0
  buffer->streamTxns = 0;
379
0
  buffer->streamCount = 0;
380
0
  buffer->streamBytes = 0;
381
0
  buffer->totalTxns = 0;
382
0
  buffer->totalBytes = 0;
383
384
0
  buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
385
386
0
  dlist_init(&buffer->toplevel_by_lsn);
387
0
  dlist_init(&buffer->txns_by_base_snapshot_lsn);
388
0
  dclist_init(&buffer->catchange_txns);
389
390
  /*
391
   * Ensure there's no stale data from prior uses of this slot, in case some
392
   * prior exit avoided calling ReorderBufferFree. Failure to do this can
393
   * produce duplicated txns, and it's very cheap if there's nothing there.
394
   */
395
0
  ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
396
397
0
  return buffer;
398
0
}
399
400
/*
401
 * Free a ReorderBuffer
402
 */
403
void
404
ReorderBufferFree(ReorderBuffer *rb)
405
0
{
406
0
  MemoryContext context = rb->context;
407
408
  /*
409
   * We free separately allocated data by entirely scrapping reorderbuffer's
410
   * memory context.
411
   */
412
0
  MemoryContextDelete(context);
413
414
  /* Free disk space used by unconsumed reorder buffers */
415
0
  ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name));
416
0
}
417
418
/*
419
 * Allocate a new ReorderBufferTXN.
420
 */
421
static ReorderBufferTXN *
422
ReorderBufferAllocTXN(ReorderBuffer *rb)
423
0
{
424
0
  ReorderBufferTXN *txn;
425
426
0
  txn = (ReorderBufferTXN *)
427
0
    MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN));
428
429
0
  memset(txn, 0, sizeof(ReorderBufferTXN));
430
431
0
  dlist_init(&txn->changes);
432
0
  dlist_init(&txn->tuplecids);
433
0
  dlist_init(&txn->subtxns);
434
435
  /* InvalidCommandId is not zero, so set it explicitly */
436
0
  txn->command_id = InvalidCommandId;
437
0
  txn->output_plugin_private = NULL;
438
439
0
  return txn;
440
0
}
441
442
/*
443
 * Free a ReorderBufferTXN.
444
 */
445
static void
446
ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
447
0
{
448
  /* clean the lookup cache if we were cached (quite likely) */
449
0
  if (rb->by_txn_last_xid == txn->xid)
450
0
  {
451
0
    rb->by_txn_last_xid = InvalidTransactionId;
452
0
    rb->by_txn_last_txn = NULL;
453
0
  }
454
455
  /* free data that's contained */
456
457
0
  if (txn->gid != NULL)
458
0
  {
459
0
    pfree(txn->gid);
460
0
    txn->gid = NULL;
461
0
  }
462
463
0
  if (txn->tuplecid_hash != NULL)
464
0
  {
465
0
    hash_destroy(txn->tuplecid_hash);
466
0
    txn->tuplecid_hash = NULL;
467
0
  }
468
469
0
  if (txn->invalidations)
470
0
  {
471
0
    pfree(txn->invalidations);
472
0
    txn->invalidations = NULL;
473
0
  }
474
475
  /* Reset the toast hash */
476
0
  ReorderBufferToastReset(rb, txn);
477
478
  /* All changes must be deallocated */
479
0
  Assert(txn->size == 0);
480
481
0
  pfree(txn);
482
0
}
483
484
/*
485
 * Allocate a ReorderBufferChange.
486
 */
487
ReorderBufferChange *
488
ReorderBufferAllocChange(ReorderBuffer *rb)
489
0
{
490
0
  ReorderBufferChange *change;
491
492
0
  change = (ReorderBufferChange *)
493
0
    MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange));
494
495
0
  memset(change, 0, sizeof(ReorderBufferChange));
496
0
  return change;
497
0
}
498
499
/*
500
 * Free a ReorderBufferChange and update memory accounting, if requested.
501
 */
502
void
503
ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change,
504
            bool upd_mem)
505
0
{
506
  /* update memory accounting info */
507
0
  if (upd_mem)
508
0
    ReorderBufferChangeMemoryUpdate(rb, change, NULL, false,
509
0
                    ReorderBufferChangeSize(change));
510
511
  /* free contained data */
512
0
  switch (change->action)
513
0
  {
514
0
    case REORDER_BUFFER_CHANGE_INSERT:
515
0
    case REORDER_BUFFER_CHANGE_UPDATE:
516
0
    case REORDER_BUFFER_CHANGE_DELETE:
517
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
518
0
      if (change->data.tp.newtuple)
519
0
      {
520
0
        ReorderBufferFreeTupleBuf(change->data.tp.newtuple);
521
0
        change->data.tp.newtuple = NULL;
522
0
      }
523
524
0
      if (change->data.tp.oldtuple)
525
0
      {
526
0
        ReorderBufferFreeTupleBuf(change->data.tp.oldtuple);
527
0
        change->data.tp.oldtuple = NULL;
528
0
      }
529
0
      break;
530
0
    case REORDER_BUFFER_CHANGE_MESSAGE:
531
0
      if (change->data.msg.prefix != NULL)
532
0
        pfree(change->data.msg.prefix);
533
0
      change->data.msg.prefix = NULL;
534
0
      if (change->data.msg.message != NULL)
535
0
        pfree(change->data.msg.message);
536
0
      change->data.msg.message = NULL;
537
0
      break;
538
0
    case REORDER_BUFFER_CHANGE_INVALIDATION:
539
0
      if (change->data.inval.invalidations)
540
0
        pfree(change->data.inval.invalidations);
541
0
      change->data.inval.invalidations = NULL;
542
0
      break;
543
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
544
0
      if (change->data.snapshot)
545
0
      {
546
0
        ReorderBufferFreeSnap(rb, change->data.snapshot);
547
0
        change->data.snapshot = NULL;
548
0
      }
549
0
      break;
550
      /* no data in addition to the struct itself */
551
0
    case REORDER_BUFFER_CHANGE_TRUNCATE:
552
0
      if (change->data.truncate.relids != NULL)
553
0
      {
554
0
        ReorderBufferFreeRelids(rb, change->data.truncate.relids);
555
0
        change->data.truncate.relids = NULL;
556
0
      }
557
0
      break;
558
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
559
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
560
0
    case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
561
0
    case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
562
0
      break;
563
0
  }
564
565
0
  pfree(change);
566
0
}
567
568
/*
569
 * Allocate a HeapTuple fitting a tuple of size tuple_len (excluding header
570
 * overhead).
571
 */
572
HeapTuple
573
ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len)
574
0
{
575
0
  HeapTuple tuple;
576
0
  Size    alloc_len;
577
578
0
  alloc_len = tuple_len + SizeofHeapTupleHeader;
579
580
0
  tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context,
581
0
                       HEAPTUPLESIZE + alloc_len);
582
0
  tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE);
583
584
0
  return tuple;
585
0
}
586
587
/*
588
 * Free a HeapTuple returned by ReorderBufferAllocTupleBuf().
589
 */
590
void
591
ReorderBufferFreeTupleBuf(HeapTuple tuple)
592
0
{
593
0
  pfree(tuple);
594
0
}
595
596
/*
597
 * Allocate an array for relids of truncated relations.
598
 *
599
 * We use the global memory context (for the whole reorder buffer), because
600
 * none of the existing ones seems like a good match (some are SLAB, so we
601
 * can't use those, and tup_context is meant for tuple data, not relids). We
602
 * could add yet another context, but it seems like an overkill - TRUNCATE is
603
 * not particularly common operation, so it does not seem worth it.
604
 */
605
Oid *
606
ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids)
607
0
{
608
0
  Oid      *relids;
609
0
  Size    alloc_len;
610
611
0
  alloc_len = sizeof(Oid) * nrelids;
612
613
0
  relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len);
614
615
0
  return relids;
616
0
}
617
618
/*
619
 * Free an array of relids.
620
 */
621
void
622
ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids)
623
0
{
624
0
  pfree(relids);
625
0
}
626
627
/*
628
 * Return the ReorderBufferTXN from the given buffer, specified by Xid.
629
 * If create is true, and a transaction doesn't already exist, create it
630
 * (with the given LSN, and as top transaction if that's specified);
631
 * when this happens, is_new is set to true.
632
 */
633
static ReorderBufferTXN *
634
ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
635
            bool *is_new, XLogRecPtr lsn, bool create_as_top)
636
0
{
637
0
  ReorderBufferTXN *txn;
638
0
  ReorderBufferTXNByIdEnt *ent;
639
0
  bool    found;
640
641
0
  Assert(TransactionIdIsValid(xid));
642
643
  /*
644
   * Check the one-entry lookup cache first
645
   */
646
0
  if (TransactionIdIsValid(rb->by_txn_last_xid) &&
647
0
    rb->by_txn_last_xid == xid)
648
0
  {
649
0
    txn = rb->by_txn_last_txn;
650
651
0
    if (txn != NULL)
652
0
    {
653
      /* found it, and it's valid */
654
0
      if (is_new)
655
0
        *is_new = false;
656
0
      return txn;
657
0
    }
658
659
    /*
660
     * cached as non-existent, and asked not to create? Then nothing else
661
     * to do.
662
     */
663
0
    if (!create)
664
0
      return NULL;
665
    /* otherwise fall through to create it */
666
0
  }
667
668
  /*
669
   * If the cache wasn't hit or it yielded a "does-not-exist" and we want to
670
   * create an entry.
671
   */
672
673
  /* search the lookup table */
674
0
  ent = (ReorderBufferTXNByIdEnt *)
675
0
    hash_search(rb->by_txn,
676
0
          &xid,
677
0
          create ? HASH_ENTER : HASH_FIND,
678
0
          &found);
679
0
  if (found)
680
0
    txn = ent->txn;
681
0
  else if (create)
682
0
  {
683
    /* initialize the new entry, if creation was requested */
684
0
    Assert(ent != NULL);
685
0
    Assert(lsn != InvalidXLogRecPtr);
686
687
0
    ent->txn = ReorderBufferAllocTXN(rb);
688
0
    ent->txn->xid = xid;
689
0
    txn = ent->txn;
690
0
    txn->first_lsn = lsn;
691
0
    txn->restart_decoding_lsn = rb->current_restart_decoding_lsn;
692
693
0
    if (create_as_top)
694
0
    {
695
0
      dlist_push_tail(&rb->toplevel_by_lsn, &txn->node);
696
0
      AssertTXNLsnOrder(rb);
697
0
    }
698
0
  }
699
0
  else
700
0
    txn = NULL;       /* not found and not asked to create */
701
702
  /* update cache */
703
0
  rb->by_txn_last_xid = xid;
704
0
  rb->by_txn_last_txn = txn;
705
706
0
  if (is_new)
707
0
    *is_new = !found;
708
709
0
  Assert(!create || txn != NULL);
710
0
  return txn;
711
0
}
712
713
/*
714
 * Record the partial change for the streaming of in-progress transactions.  We
715
 * can stream only complete changes so if we have a partial change like toast
716
 * table insert or speculative insert then we mark such a 'txn' so that it
717
 * can't be streamed.  We also ensure that if the changes in such a 'txn' can
718
 * be streamed and are above logical_decoding_work_mem threshold then we stream
719
 * them as soon as we have a complete change.
720
 */
721
static void
722
ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
723
                  ReorderBufferChange *change,
724
                  bool toast_insert)
725
0
{
726
0
  ReorderBufferTXN *toptxn;
727
728
  /*
729
   * The partial changes need to be processed only while streaming
730
   * in-progress transactions.
731
   */
732
0
  if (!ReorderBufferCanStream(rb))
733
0
    return;
734
735
  /* Get the top transaction. */
736
0
  toptxn = rbtxn_get_toptxn(txn);
737
738
  /*
739
   * Indicate a partial change for toast inserts.  The change will be
740
   * considered as complete once we get the insert or update on the main
741
   * table and we are sure that the pending toast chunks are not required
742
   * anymore.
743
   *
744
   * If we allow streaming when there are pending toast chunks then such
745
   * chunks won't be released till the insert (multi_insert) is complete and
746
   * we expect the txn to have streamed all changes after streaming.  This
747
   * restriction is mainly to ensure the correctness of streamed
748
   * transactions and it doesn't seem worth uplifting such a restriction
749
   * just to allow this case because anyway we will stream the transaction
750
   * once such an insert is complete.
751
   */
752
0
  if (toast_insert)
753
0
    toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
754
0
  else if (rbtxn_has_partial_change(toptxn) &&
755
0
       IsInsertOrUpdate(change->action) &&
756
0
       change->data.tp.clear_toast_afterwards)
757
0
    toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
758
759
  /*
760
   * Indicate a partial change for speculative inserts.  The change will be
761
   * considered as complete once we get the speculative confirm or abort
762
   * token.
763
   */
764
0
  if (IsSpecInsert(change->action))
765
0
    toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE;
766
0
  else if (rbtxn_has_partial_change(toptxn) &&
767
0
       IsSpecConfirmOrAbort(change->action))
768
0
    toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE;
769
770
  /*
771
   * Stream the transaction if it is serialized before and the changes are
772
   * now complete in the top-level transaction.
773
   *
774
   * The reason for doing the streaming of such a transaction as soon as we
775
   * get the complete change for it is that previously it would have reached
776
   * the memory threshold and wouldn't get streamed because of incomplete
777
   * changes.  Delaying such transactions would increase apply lag for them.
778
   */
779
0
  if (ReorderBufferCanStartStreaming(rb) &&
780
0
    !(rbtxn_has_partial_change(toptxn)) &&
781
0
    rbtxn_is_serialized(txn) &&
782
0
    rbtxn_has_streamable_change(toptxn))
783
0
    ReorderBufferStreamTXN(rb, toptxn);
784
0
}
785
786
/*
787
 * Queue a change into a transaction so it can be replayed upon commit or will be
788
 * streamed when we reach logical_decoding_work_mem threshold.
789
 */
790
void
791
ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
792
             ReorderBufferChange *change, bool toast_insert)
793
0
{
794
0
  ReorderBufferTXN *txn;
795
796
0
  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
797
798
  /*
799
   * If we have detected that the transaction is aborted while streaming the
800
   * previous changes or by checking its CLOG, there is no point in
801
   * collecting further changes for it.
802
   */
803
0
  if (rbtxn_is_aborted(txn))
804
0
  {
805
    /*
806
     * We don't need to update memory accounting for this change as we
807
     * have not added it to the queue yet.
808
     */
809
0
    ReorderBufferFreeChange(rb, change, false);
810
0
    return;
811
0
  }
812
813
  /*
814
   * The changes that are sent downstream are considered streamable.  We
815
   * remember such transactions so that only those will later be considered
816
   * for streaming.
817
   */
818
0
  if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
819
0
    change->action == REORDER_BUFFER_CHANGE_UPDATE ||
820
0
    change->action == REORDER_BUFFER_CHANGE_DELETE ||
821
0
    change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
822
0
    change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
823
0
    change->action == REORDER_BUFFER_CHANGE_MESSAGE)
824
0
  {
825
0
    ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
826
827
0
    toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
828
0
  }
829
830
0
  change->lsn = lsn;
831
0
  change->txn = txn;
832
833
0
  Assert(InvalidXLogRecPtr != lsn);
834
0
  dlist_push_tail(&txn->changes, &change->node);
835
0
  txn->nentries++;
836
0
  txn->nentries_mem++;
837
838
  /* update memory accounting information */
839
0
  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
840
0
                  ReorderBufferChangeSize(change));
841
842
  /* process partial change */
843
0
  ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
844
845
  /* check the memory limits and evict something if needed */
846
0
  ReorderBufferCheckMemoryLimit(rb);
847
0
}
848
849
/*
850
 * A transactional message is queued to be processed upon commit and a
851
 * non-transactional message gets processed immediately.
852
 */
853
void
854
ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
855
              Snapshot snap, XLogRecPtr lsn,
856
              bool transactional, const char *prefix,
857
              Size message_size, const char *message)
858
0
{
859
0
  if (transactional)
860
0
  {
861
0
    MemoryContext oldcontext;
862
0
    ReorderBufferChange *change;
863
864
0
    Assert(xid != InvalidTransactionId);
865
866
    /*
867
     * We don't expect snapshots for transactional changes - we'll use the
868
     * snapshot derived later during apply (unless the change gets
869
     * skipped).
870
     */
871
0
    Assert(!snap);
872
873
0
    oldcontext = MemoryContextSwitchTo(rb->context);
874
875
0
    change = ReorderBufferAllocChange(rb);
876
0
    change->action = REORDER_BUFFER_CHANGE_MESSAGE;
877
0
    change->data.msg.prefix = pstrdup(prefix);
878
0
    change->data.msg.message_size = message_size;
879
0
    change->data.msg.message = palloc(message_size);
880
0
    memcpy(change->data.msg.message, message, message_size);
881
882
0
    ReorderBufferQueueChange(rb, xid, lsn, change, false);
883
884
0
    MemoryContextSwitchTo(oldcontext);
885
0
  }
886
0
  else
887
0
  {
888
0
    ReorderBufferTXN *txn = NULL;
889
0
    volatile Snapshot snapshot_now = snap;
890
891
    /* Non-transactional changes require a valid snapshot. */
892
0
    Assert(snapshot_now);
893
894
0
    if (xid != InvalidTransactionId)
895
0
      txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
896
897
    /* setup snapshot to allow catalog access */
898
0
    SetupHistoricSnapshot(snapshot_now, NULL);
899
0
    PG_TRY();
900
0
    {
901
0
      rb->message(rb, txn, lsn, false, prefix, message_size, message);
902
903
0
      TeardownHistoricSnapshot(false);
904
0
    }
905
0
    PG_CATCH();
906
0
    {
907
0
      TeardownHistoricSnapshot(true);
908
0
      PG_RE_THROW();
909
0
    }
910
0
    PG_END_TRY();
911
0
  }
912
0
}
913
914
/*
915
 * AssertTXNLsnOrder
916
 *    Verify LSN ordering of transaction lists in the reorderbuffer
917
 *
918
 * Other LSN-related invariants are checked too.
919
 *
920
 * No-op if assertions are not in use.
921
 */
922
static void
923
AssertTXNLsnOrder(ReorderBuffer *rb)
924
0
{
925
#ifdef USE_ASSERT_CHECKING
926
  LogicalDecodingContext *ctx = rb->private_data;
927
  dlist_iter  iter;
928
  XLogRecPtr  prev_first_lsn = InvalidXLogRecPtr;
929
  XLogRecPtr  prev_base_snap_lsn = InvalidXLogRecPtr;
930
931
  /*
932
   * Skip the verification if we don't reach the LSN at which we start
933
   * decoding the contents of transactions yet because until we reach the
934
   * LSN, we could have transactions that don't have the association between
935
   * the top-level transaction and subtransaction yet and consequently have
936
   * the same LSN.  We don't guarantee this association until we try to
937
   * decode the actual contents of transaction. The ordering of the records
938
   * prior to the start_decoding_at LSN should have been checked before the
939
   * restart.
940
   */
941
  if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr))
942
    return;
943
944
  dlist_foreach(iter, &rb->toplevel_by_lsn)
945
  {
946
    ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node,
947
                          iter.cur);
948
949
    /* start LSN must be set */
950
    Assert(cur_txn->first_lsn != InvalidXLogRecPtr);
951
952
    /* If there is an end LSN, it must be higher than start LSN */
953
    if (cur_txn->end_lsn != InvalidXLogRecPtr)
954
      Assert(cur_txn->first_lsn <= cur_txn->end_lsn);
955
956
    /* Current initial LSN must be strictly higher than previous */
957
    if (prev_first_lsn != InvalidXLogRecPtr)
958
      Assert(prev_first_lsn < cur_txn->first_lsn);
959
960
    /* known-as-subtxn txns must not be listed */
961
    Assert(!rbtxn_is_known_subxact(cur_txn));
962
963
    prev_first_lsn = cur_txn->first_lsn;
964
  }
965
966
  dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
967
  {
968
    ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN,
969
                          base_snapshot_node,
970
                          iter.cur);
971
972
    /* base snapshot (and its LSN) must be set */
973
    Assert(cur_txn->base_snapshot != NULL);
974
    Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr);
975
976
    /* current LSN must be strictly higher than previous */
977
    if (prev_base_snap_lsn != InvalidXLogRecPtr)
978
      Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn);
979
980
    /* known-as-subtxn txns must not be listed */
981
    Assert(!rbtxn_is_known_subxact(cur_txn));
982
983
    prev_base_snap_lsn = cur_txn->base_snapshot_lsn;
984
  }
985
#endif
986
0
}
987
988
/*
989
 * AssertChangeLsnOrder
990
 *
991
 * Check ordering of changes in the (sub)transaction.
992
 */
993
static void
994
AssertChangeLsnOrder(ReorderBufferTXN *txn)
995
0
{
996
#ifdef USE_ASSERT_CHECKING
997
  dlist_iter  iter;
998
  XLogRecPtr  prev_lsn = txn->first_lsn;
999
1000
  dlist_foreach(iter, &txn->changes)
1001
  {
1002
    ReorderBufferChange *cur_change;
1003
1004
    cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
1005
1006
    Assert(txn->first_lsn != InvalidXLogRecPtr);
1007
    Assert(cur_change->lsn != InvalidXLogRecPtr);
1008
    Assert(txn->first_lsn <= cur_change->lsn);
1009
1010
    if (txn->end_lsn != InvalidXLogRecPtr)
1011
      Assert(cur_change->lsn <= txn->end_lsn);
1012
1013
    Assert(prev_lsn <= cur_change->lsn);
1014
1015
    prev_lsn = cur_change->lsn;
1016
  }
1017
#endif
1018
0
}
1019
1020
/*
1021
 * ReorderBufferGetOldestTXN
1022
 *    Return oldest transaction in reorderbuffer
1023
 */
1024
ReorderBufferTXN *
1025
ReorderBufferGetOldestTXN(ReorderBuffer *rb)
1026
0
{
1027
0
  ReorderBufferTXN *txn;
1028
1029
0
  AssertTXNLsnOrder(rb);
1030
1031
0
  if (dlist_is_empty(&rb->toplevel_by_lsn))
1032
0
    return NULL;
1033
1034
0
  txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn);
1035
1036
0
  Assert(!rbtxn_is_known_subxact(txn));
1037
0
  Assert(txn->first_lsn != InvalidXLogRecPtr);
1038
0
  return txn;
1039
0
}
1040
1041
/*
1042
 * ReorderBufferGetOldestXmin
1043
 *    Return oldest Xmin in reorderbuffer
1044
 *
1045
 * Returns oldest possibly running Xid from the point of view of snapshots
1046
 * used in the transactions kept by reorderbuffer, or InvalidTransactionId if
1047
 * there are none.
1048
 *
1049
 * Since snapshots are assigned monotonically, this equals the Xmin of the
1050
 * base snapshot with minimal base_snapshot_lsn.
1051
 */
1052
TransactionId
1053
ReorderBufferGetOldestXmin(ReorderBuffer *rb)
1054
0
{
1055
0
  ReorderBufferTXN *txn;
1056
1057
0
  AssertTXNLsnOrder(rb);
1058
1059
0
  if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn))
1060
0
    return InvalidTransactionId;
1061
1062
0
  txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node,
1063
0
               &rb->txns_by_base_snapshot_lsn);
1064
0
  return txn->base_snapshot->xmin;
1065
0
}
1066
1067
void
1068
ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr)
1069
0
{
1070
0
  rb->current_restart_decoding_lsn = ptr;
1071
0
}
1072
1073
/*
1074
 * ReorderBufferAssignChild
1075
 *
1076
 * Make note that we know that subxid is a subtransaction of xid, seen as of
1077
 * the given lsn.
1078
 */
1079
void
1080
ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid,
1081
             TransactionId subxid, XLogRecPtr lsn)
1082
0
{
1083
0
  ReorderBufferTXN *txn;
1084
0
  ReorderBufferTXN *subtxn;
1085
0
  bool    new_top;
1086
0
  bool    new_sub;
1087
1088
0
  txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true);
1089
0
  subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false);
1090
1091
0
  if (!new_sub)
1092
0
  {
1093
0
    if (rbtxn_is_known_subxact(subtxn))
1094
0
    {
1095
      /* already associated, nothing to do */
1096
0
      return;
1097
0
    }
1098
0
    else
1099
0
    {
1100
      /*
1101
       * We already saw this transaction, but initially added it to the
1102
       * list of top-level txns.  Now that we know it's not top-level,
1103
       * remove it from there.
1104
       */
1105
0
      dlist_delete(&subtxn->node);
1106
0
    }
1107
0
  }
1108
1109
0
  subtxn->txn_flags |= RBTXN_IS_SUBXACT;
1110
0
  subtxn->toplevel_xid = xid;
1111
0
  Assert(subtxn->nsubtxns == 0);
1112
1113
  /* set the reference to top-level transaction */
1114
0
  subtxn->toptxn = txn;
1115
1116
  /* add to subtransaction list */
1117
0
  dlist_push_tail(&txn->subtxns, &subtxn->node);
1118
0
  txn->nsubtxns++;
1119
1120
  /* Possibly transfer the subtxn's snapshot to its top-level txn. */
1121
0
  ReorderBufferTransferSnapToParent(txn, subtxn);
1122
1123
  /* Verify LSN-ordering invariant */
1124
0
  AssertTXNLsnOrder(rb);
1125
0
}
1126
1127
/*
1128
 * ReorderBufferTransferSnapToParent
1129
 *    Transfer base snapshot from subtxn to top-level txn, if needed
1130
 *
1131
 * This is done if the top-level txn doesn't have a base snapshot, or if the
1132
 * subtxn's base snapshot has an earlier LSN than the top-level txn's base
1133
 * snapshot's LSN.  This can happen if there are no changes in the toplevel
1134
 * txn but there are some in the subtxn, or the first change in subtxn has
1135
 * earlier LSN than first change in the top-level txn and we learned about
1136
 * their kinship only now.
1137
 *
1138
 * The subtransaction's snapshot is cleared regardless of the transfer
1139
 * happening, since it's not needed anymore in either case.
1140
 *
1141
 * We do this as soon as we become aware of their kinship, to avoid queueing
1142
 * extra snapshots to txns known-as-subtxns -- only top-level txns will
1143
 * receive further snapshots.
1144
 */
1145
static void
1146
ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn,
1147
                  ReorderBufferTXN *subtxn)
1148
0
{
1149
0
  Assert(subtxn->toplevel_xid == txn->xid);
1150
1151
0
  if (subtxn->base_snapshot != NULL)
1152
0
  {
1153
0
    if (txn->base_snapshot == NULL ||
1154
0
      subtxn->base_snapshot_lsn < txn->base_snapshot_lsn)
1155
0
    {
1156
      /*
1157
       * If the toplevel transaction already has a base snapshot but
1158
       * it's newer than the subxact's, purge it.
1159
       */
1160
0
      if (txn->base_snapshot != NULL)
1161
0
      {
1162
0
        SnapBuildSnapDecRefcount(txn->base_snapshot);
1163
0
        dlist_delete(&txn->base_snapshot_node);
1164
0
      }
1165
1166
      /*
1167
       * The snapshot is now the top transaction's; transfer it, and
1168
       * adjust the list position of the top transaction in the list by
1169
       * moving it to where the subtransaction is.
1170
       */
1171
0
      txn->base_snapshot = subtxn->base_snapshot;
1172
0
      txn->base_snapshot_lsn = subtxn->base_snapshot_lsn;
1173
0
      dlist_insert_before(&subtxn->base_snapshot_node,
1174
0
                &txn->base_snapshot_node);
1175
1176
      /*
1177
       * The subtransaction doesn't have a snapshot anymore (so it
1178
       * mustn't be in the list.)
1179
       */
1180
0
      subtxn->base_snapshot = NULL;
1181
0
      subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1182
0
      dlist_delete(&subtxn->base_snapshot_node);
1183
0
    }
1184
0
    else
1185
0
    {
1186
      /* Base snap of toplevel is fine, so subxact's is not needed */
1187
0
      SnapBuildSnapDecRefcount(subtxn->base_snapshot);
1188
0
      dlist_delete(&subtxn->base_snapshot_node);
1189
0
      subtxn->base_snapshot = NULL;
1190
0
      subtxn->base_snapshot_lsn = InvalidXLogRecPtr;
1191
0
    }
1192
0
  }
1193
0
}
1194
1195
/*
1196
 * Associate a subtransaction with its toplevel transaction at commit
1197
 * time. There may be no further changes added after this.
1198
 */
1199
void
1200
ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid,
1201
             TransactionId subxid, XLogRecPtr commit_lsn,
1202
             XLogRecPtr end_lsn)
1203
0
{
1204
0
  ReorderBufferTXN *subtxn;
1205
1206
0
  subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL,
1207
0
                   InvalidXLogRecPtr, false);
1208
1209
  /*
1210
   * No need to do anything if that subtxn didn't contain any changes
1211
   */
1212
0
  if (!subtxn)
1213
0
    return;
1214
1215
0
  subtxn->final_lsn = commit_lsn;
1216
0
  subtxn->end_lsn = end_lsn;
1217
1218
  /*
1219
   * Assign this subxact as a child of the toplevel xact (no-op if already
1220
   * done.)
1221
   */
1222
0
  ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr);
1223
0
}
1224
1225
1226
/*
1227
 * Support for efficiently iterating over a transaction's and its
1228
 * subtransactions' changes.
1229
 *
1230
 * We do by doing a k-way merge between transactions/subtransactions. For that
1231
 * we model the current heads of the different transactions as a binary heap
1232
 * so we easily know which (sub-)transaction has the change with the smallest
1233
 * lsn next.
1234
 *
1235
 * We assume the changes in individual transactions are already sorted by LSN.
1236
 */
1237
1238
/*
1239
 * Binary heap comparison function.
1240
 */
1241
static int
1242
ReorderBufferIterCompare(Datum a, Datum b, void *arg)
1243
0
{
1244
0
  ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg;
1245
0
  XLogRecPtr  pos_a = state->entries[DatumGetInt32(a)].lsn;
1246
0
  XLogRecPtr  pos_b = state->entries[DatumGetInt32(b)].lsn;
1247
1248
0
  if (pos_a < pos_b)
1249
0
    return 1;
1250
0
  else if (pos_a == pos_b)
1251
0
    return 0;
1252
0
  return -1;
1253
0
}
1254
1255
/*
1256
 * Allocate & initialize an iterator which iterates in lsn order over a
1257
 * transaction and all its subtransactions.
1258
 *
1259
 * Note: The iterator state is returned through iter_state parameter rather
1260
 * than the function's return value.  This is because the state gets cleaned up
1261
 * in a PG_CATCH block in the caller, so we want to make sure the caller gets
1262
 * back the state even if this function throws an exception.
1263
 */
1264
static void
1265
ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
1266
             ReorderBufferIterTXNState *volatile *iter_state)
1267
0
{
1268
0
  Size    nr_txns = 0;
1269
0
  ReorderBufferIterTXNState *state;
1270
0
  dlist_iter  cur_txn_i;
1271
0
  int32   off;
1272
1273
0
  *iter_state = NULL;
1274
1275
  /* Check ordering of changes in the toplevel transaction. */
1276
0
  AssertChangeLsnOrder(txn);
1277
1278
  /*
1279
   * Calculate the size of our heap: one element for every transaction that
1280
   * contains changes.  (Besides the transactions already in the reorder
1281
   * buffer, we count the one we were directly passed.)
1282
   */
1283
0
  if (txn->nentries > 0)
1284
0
    nr_txns++;
1285
1286
0
  dlist_foreach(cur_txn_i, &txn->subtxns)
1287
0
  {
1288
0
    ReorderBufferTXN *cur_txn;
1289
1290
0
    cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1291
1292
    /* Check ordering of changes in this subtransaction. */
1293
0
    AssertChangeLsnOrder(cur_txn);
1294
1295
0
    if (cur_txn->nentries > 0)
1296
0
      nr_txns++;
1297
0
  }
1298
1299
  /* allocate iteration state */
1300
0
  state = (ReorderBufferIterTXNState *)
1301
0
    MemoryContextAllocZero(rb->context,
1302
0
                 sizeof(ReorderBufferIterTXNState) +
1303
0
                 sizeof(ReorderBufferIterTXNEntry) * nr_txns);
1304
1305
0
  state->nr_txns = nr_txns;
1306
0
  dlist_init(&state->old_change);
1307
1308
0
  for (off = 0; off < state->nr_txns; off++)
1309
0
  {
1310
0
    state->entries[off].file.vfd = -1;
1311
0
    state->entries[off].segno = 0;
1312
0
  }
1313
1314
  /* allocate heap */
1315
0
  state->heap = binaryheap_allocate(state->nr_txns,
1316
0
                    ReorderBufferIterCompare,
1317
0
                    state);
1318
1319
  /* Now that the state fields are initialized, it is safe to return it. */
1320
0
  *iter_state = state;
1321
1322
  /*
1323
   * Now insert items into the binary heap, in an unordered fashion.  (We
1324
   * will run a heap assembly step at the end; this is more efficient.)
1325
   */
1326
1327
0
  off = 0;
1328
1329
  /* add toplevel transaction if it contains changes */
1330
0
  if (txn->nentries > 0)
1331
0
  {
1332
0
    ReorderBufferChange *cur_change;
1333
1334
0
    if (rbtxn_is_serialized(txn))
1335
0
    {
1336
      /* serialize remaining changes */
1337
0
      ReorderBufferSerializeTXN(rb, txn);
1338
0
      ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file,
1339
0
                    &state->entries[off].segno);
1340
0
    }
1341
1342
0
    cur_change = dlist_head_element(ReorderBufferChange, node,
1343
0
                    &txn->changes);
1344
1345
0
    state->entries[off].lsn = cur_change->lsn;
1346
0
    state->entries[off].change = cur_change;
1347
0
    state->entries[off].txn = txn;
1348
1349
0
    binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1350
0
  }
1351
1352
  /* add subtransactions if they contain changes */
1353
0
  dlist_foreach(cur_txn_i, &txn->subtxns)
1354
0
  {
1355
0
    ReorderBufferTXN *cur_txn;
1356
1357
0
    cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
1358
1359
0
    if (cur_txn->nentries > 0)
1360
0
    {
1361
0
      ReorderBufferChange *cur_change;
1362
1363
0
      if (rbtxn_is_serialized(cur_txn))
1364
0
      {
1365
        /* serialize remaining changes */
1366
0
        ReorderBufferSerializeTXN(rb, cur_txn);
1367
0
        ReorderBufferRestoreChanges(rb, cur_txn,
1368
0
                      &state->entries[off].file,
1369
0
                      &state->entries[off].segno);
1370
0
      }
1371
0
      cur_change = dlist_head_element(ReorderBufferChange, node,
1372
0
                      &cur_txn->changes);
1373
1374
0
      state->entries[off].lsn = cur_change->lsn;
1375
0
      state->entries[off].change = cur_change;
1376
0
      state->entries[off].txn = cur_txn;
1377
1378
0
      binaryheap_add_unordered(state->heap, Int32GetDatum(off++));
1379
0
    }
1380
0
  }
1381
1382
  /* assemble a valid binary heap */
1383
0
  binaryheap_build(state->heap);
1384
0
}
1385
1386
/*
1387
 * Return the next change when iterating over a transaction and its
1388
 * subtransactions.
1389
 *
1390
 * Returns NULL when no further changes exist.
1391
 */
1392
static ReorderBufferChange *
1393
ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
1394
0
{
1395
0
  ReorderBufferChange *change;
1396
0
  ReorderBufferIterTXNEntry *entry;
1397
0
  int32   off;
1398
1399
  /* nothing there anymore */
1400
0
  if (state->heap->bh_size == 0)
1401
0
    return NULL;
1402
1403
0
  off = DatumGetInt32(binaryheap_first(state->heap));
1404
0
  entry = &state->entries[off];
1405
1406
  /* free memory we might have "leaked" in the previous *Next call */
1407
0
  if (!dlist_is_empty(&state->old_change))
1408
0
  {
1409
0
    change = dlist_container(ReorderBufferChange, node,
1410
0
                 dlist_pop_head_node(&state->old_change));
1411
0
    ReorderBufferFreeChange(rb, change, true);
1412
0
    Assert(dlist_is_empty(&state->old_change));
1413
0
  }
1414
1415
0
  change = entry->change;
1416
1417
  /*
1418
   * update heap with information about which transaction has the next
1419
   * relevant change in LSN order
1420
   */
1421
1422
  /* there are in-memory changes */
1423
0
  if (dlist_has_next(&entry->txn->changes, &entry->change->node))
1424
0
  {
1425
0
    dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node);
1426
0
    ReorderBufferChange *next_change =
1427
0
      dlist_container(ReorderBufferChange, node, next);
1428
1429
    /* txn stays the same */
1430
0
    state->entries[off].lsn = next_change->lsn;
1431
0
    state->entries[off].change = next_change;
1432
1433
0
    binaryheap_replace_first(state->heap, Int32GetDatum(off));
1434
0
    return change;
1435
0
  }
1436
1437
  /* try to load changes from disk */
1438
0
  if (entry->txn->nentries != entry->txn->nentries_mem)
1439
0
  {
1440
    /*
1441
     * Ugly: restoring changes will reuse *Change records, thus delete the
1442
     * current one from the per-tx list and only free in the next call.
1443
     */
1444
0
    dlist_delete(&change->node);
1445
0
    dlist_push_tail(&state->old_change, &change->node);
1446
1447
    /*
1448
     * Update the total bytes processed by the txn for which we are
1449
     * releasing the current set of changes and restoring the new set of
1450
     * changes.
1451
     */
1452
0
    rb->totalBytes += entry->txn->size;
1453
0
    if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file,
1454
0
                    &state->entries[off].segno))
1455
0
    {
1456
      /* successfully restored changes from disk */
1457
0
      ReorderBufferChange *next_change =
1458
0
        dlist_head_element(ReorderBufferChange, node,
1459
0
                   &entry->txn->changes);
1460
1461
0
      elog(DEBUG2, "restored %u/%u changes from disk",
1462
0
         (uint32) entry->txn->nentries_mem,
1463
0
         (uint32) entry->txn->nentries);
1464
1465
0
      Assert(entry->txn->nentries_mem);
1466
      /* txn stays the same */
1467
0
      state->entries[off].lsn = next_change->lsn;
1468
0
      state->entries[off].change = next_change;
1469
0
      binaryheap_replace_first(state->heap, Int32GetDatum(off));
1470
1471
0
      return change;
1472
0
    }
1473
0
  }
1474
1475
  /* ok, no changes there anymore, remove */
1476
0
  binaryheap_remove_first(state->heap);
1477
1478
0
  return change;
1479
0
}
1480
1481
/*
1482
 * Deallocate the iterator
1483
 */
1484
static void
1485
ReorderBufferIterTXNFinish(ReorderBuffer *rb,
1486
               ReorderBufferIterTXNState *state)
1487
0
{
1488
0
  int32   off;
1489
1490
0
  for (off = 0; off < state->nr_txns; off++)
1491
0
  {
1492
0
    if (state->entries[off].file.vfd != -1)
1493
0
      FileClose(state->entries[off].file.vfd);
1494
0
  }
1495
1496
  /* free memory we might have "leaked" in the last *Next call */
1497
0
  if (!dlist_is_empty(&state->old_change))
1498
0
  {
1499
0
    ReorderBufferChange *change;
1500
1501
0
    change = dlist_container(ReorderBufferChange, node,
1502
0
                 dlist_pop_head_node(&state->old_change));
1503
0
    ReorderBufferFreeChange(rb, change, true);
1504
0
    Assert(dlist_is_empty(&state->old_change));
1505
0
  }
1506
1507
0
  binaryheap_free(state->heap);
1508
0
  pfree(state);
1509
0
}
1510
1511
/*
1512
 * Cleanup the contents of a transaction, usually after the transaction
1513
 * committed or aborted.
1514
 */
1515
static void
1516
ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1517
0
{
1518
0
  bool    found;
1519
0
  dlist_mutable_iter iter;
1520
0
  Size    mem_freed = 0;
1521
1522
  /* cleanup subtransactions & their changes */
1523
0
  dlist_foreach_modify(iter, &txn->subtxns)
1524
0
  {
1525
0
    ReorderBufferTXN *subtxn;
1526
1527
0
    subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1528
1529
    /*
1530
     * Subtransactions are always associated to the toplevel TXN, even if
1531
     * they originally were happening inside another subtxn, so we won't
1532
     * ever recurse more than one level deep here.
1533
     */
1534
0
    Assert(rbtxn_is_known_subxact(subtxn));
1535
0
    Assert(subtxn->nsubtxns == 0);
1536
1537
0
    ReorderBufferCleanupTXN(rb, subtxn);
1538
0
  }
1539
1540
  /* cleanup changes in the txn */
1541
0
  dlist_foreach_modify(iter, &txn->changes)
1542
0
  {
1543
0
    ReorderBufferChange *change;
1544
1545
0
    change = dlist_container(ReorderBufferChange, node, iter.cur);
1546
1547
    /* Check we're not mixing changes from different transactions. */
1548
0
    Assert(change->txn == txn);
1549
1550
    /*
1551
     * Instead of updating the memory counter for individual changes, we
1552
     * sum up the size of memory to free so we can update the memory
1553
     * counter all together below. This saves costs of maintaining the
1554
     * max-heap.
1555
     */
1556
0
    mem_freed += ReorderBufferChangeSize(change);
1557
1558
0
    ReorderBufferFreeChange(rb, change, false);
1559
0
  }
1560
1561
  /* Update the memory counter */
1562
0
  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1563
1564
  /*
1565
   * Cleanup the tuplecids we stored for decoding catalog snapshot access.
1566
   * They are always stored in the toplevel transaction.
1567
   */
1568
0
  dlist_foreach_modify(iter, &txn->tuplecids)
1569
0
  {
1570
0
    ReorderBufferChange *change;
1571
1572
0
    change = dlist_container(ReorderBufferChange, node, iter.cur);
1573
1574
    /* Check we're not mixing changes from different transactions. */
1575
0
    Assert(change->txn == txn);
1576
0
    Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1577
1578
0
    ReorderBufferFreeChange(rb, change, true);
1579
0
  }
1580
1581
  /*
1582
   * Cleanup the base snapshot, if set.
1583
   */
1584
0
  if (txn->base_snapshot != NULL)
1585
0
  {
1586
0
    SnapBuildSnapDecRefcount(txn->base_snapshot);
1587
0
    dlist_delete(&txn->base_snapshot_node);
1588
0
  }
1589
1590
  /*
1591
   * Cleanup the snapshot for the last streamed run.
1592
   */
1593
0
  if (txn->snapshot_now != NULL)
1594
0
  {
1595
0
    Assert(rbtxn_is_streamed(txn));
1596
0
    ReorderBufferFreeSnap(rb, txn->snapshot_now);
1597
0
  }
1598
1599
  /*
1600
   * Remove TXN from its containing lists.
1601
   *
1602
   * Note: if txn is known as subxact, we are deleting the TXN from its
1603
   * parent's list of known subxacts; this leaves the parent's nsubxacts
1604
   * count too high, but we don't care.  Otherwise, we are deleting the TXN
1605
   * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the
1606
   * list of catalog modifying transactions as well.
1607
   */
1608
0
  dlist_delete(&txn->node);
1609
0
  if (rbtxn_has_catalog_changes(txn))
1610
0
    dclist_delete_from(&rb->catchange_txns, &txn->catchange_node);
1611
1612
  /* now remove reference from buffer */
1613
0
  hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found);
1614
0
  Assert(found);
1615
1616
  /* remove entries spilled to disk */
1617
0
  if (rbtxn_is_serialized(txn))
1618
0
    ReorderBufferRestoreCleanup(rb, txn);
1619
1620
  /* deallocate */
1621
0
  ReorderBufferFreeTXN(rb, txn);
1622
0
}
1623
1624
/*
1625
 * Discard changes from a transaction (and subtransactions), either after
1626
 * streaming, decoding them at PREPARE, or detecting the transaction abort.
1627
 * Keep the remaining info - transactions, tuplecids, invalidations and
1628
 * snapshots.
1629
 *
1630
 * We additionally remove tuplecids after decoding the transaction at prepare
1631
 * time as we only need to perform invalidation at rollback or commit prepared.
1632
 *
1633
 * 'txn_prepared' indicates that we have decoded the transaction at prepare
1634
 * time.
1635
 */
1636
static void
1637
ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
1638
0
{
1639
0
  dlist_mutable_iter iter;
1640
0
  Size    mem_freed = 0;
1641
1642
  /* cleanup subtransactions & their changes */
1643
0
  dlist_foreach_modify(iter, &txn->subtxns)
1644
0
  {
1645
0
    ReorderBufferTXN *subtxn;
1646
1647
0
    subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
1648
1649
    /*
1650
     * Subtransactions are always associated to the toplevel TXN, even if
1651
     * they originally were happening inside another subtxn, so we won't
1652
     * ever recurse more than one level deep here.
1653
     */
1654
0
    Assert(rbtxn_is_known_subxact(subtxn));
1655
0
    Assert(subtxn->nsubtxns == 0);
1656
1657
0
    ReorderBufferMaybeMarkTXNStreamed(rb, subtxn);
1658
0
    ReorderBufferTruncateTXN(rb, subtxn, txn_prepared);
1659
0
  }
1660
1661
  /* cleanup changes in the txn */
1662
0
  dlist_foreach_modify(iter, &txn->changes)
1663
0
  {
1664
0
    ReorderBufferChange *change;
1665
1666
0
    change = dlist_container(ReorderBufferChange, node, iter.cur);
1667
1668
    /* Check we're not mixing changes from different transactions. */
1669
0
    Assert(change->txn == txn);
1670
1671
    /* remove the change from its containing list */
1672
0
    dlist_delete(&change->node);
1673
1674
    /*
1675
     * Instead of updating the memory counter for individual changes, we
1676
     * sum up the size of memory to free so we can update the memory
1677
     * counter all together below. This saves costs of maintaining the
1678
     * max-heap.
1679
     */
1680
0
    mem_freed += ReorderBufferChangeSize(change);
1681
1682
0
    ReorderBufferFreeChange(rb, change, false);
1683
0
  }
1684
1685
  /* Update the memory counter */
1686
0
  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
1687
1688
0
  if (txn_prepared)
1689
0
  {
1690
    /*
1691
     * If this is a prepared txn, cleanup the tuplecids we stored for
1692
     * decoding catalog snapshot access. They are always stored in the
1693
     * toplevel transaction.
1694
     */
1695
0
    dlist_foreach_modify(iter, &txn->tuplecids)
1696
0
    {
1697
0
      ReorderBufferChange *change;
1698
1699
0
      change = dlist_container(ReorderBufferChange, node, iter.cur);
1700
1701
      /* Check we're not mixing changes from different transactions. */
1702
0
      Assert(change->txn == txn);
1703
0
      Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1704
1705
      /* Remove the change from its containing list. */
1706
0
      dlist_delete(&change->node);
1707
1708
0
      ReorderBufferFreeChange(rb, change, true);
1709
0
    }
1710
0
  }
1711
1712
  /*
1713
   * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any
1714
   * memory. We could also keep the hash table and update it with new ctid
1715
   * values, but this seems simpler and good enough for now.
1716
   */
1717
0
  if (txn->tuplecid_hash != NULL)
1718
0
  {
1719
0
    hash_destroy(txn->tuplecid_hash);
1720
0
    txn->tuplecid_hash = NULL;
1721
0
  }
1722
1723
  /* If this txn is serialized then clean the disk space. */
1724
0
  if (rbtxn_is_serialized(txn))
1725
0
  {
1726
0
    ReorderBufferRestoreCleanup(rb, txn);
1727
0
    txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
1728
1729
    /*
1730
     * We set this flag to indicate if the transaction is ever serialized.
1731
     * We need this to accurately update the stats as otherwise the same
1732
     * transaction can be counted as serialized multiple times.
1733
     */
1734
0
    txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
1735
0
  }
1736
1737
  /* also reset the number of entries in the transaction */
1738
0
  txn->nentries_mem = 0;
1739
0
  txn->nentries = 0;
1740
0
}
1741
1742
/*
1743
 * Check the transaction status by CLOG lookup and discard all changes if
1744
 * the transaction is aborted. The transaction status is cached in
1745
 * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the
1746
 * next call.
1747
 *
1748
 * Return true if the transaction is aborted, otherwise return false.
1749
 *
1750
 * When the 'debug_logical_replication_streaming' is set to "immediate", we
1751
 * don't check the transaction status, meaning the caller will always process
1752
 * this transaction.
1753
 */
1754
static bool
1755
ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
1756
0
{
1757
  /* Quick return for regression tests */
1758
0
  if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
1759
0
    return false;
1760
1761
  /*
1762
   * Quick return if the transaction status is already known.
1763
   */
1764
1765
0
  if (rbtxn_is_committed(txn))
1766
0
    return false;
1767
0
  if (rbtxn_is_aborted(txn))
1768
0
  {
1769
    /* Already-aborted transactions should not have any changes */
1770
0
    Assert(txn->size == 0);
1771
1772
0
    return true;
1773
0
  }
1774
1775
  /* Otherwise, check the transaction status using CLOG lookup */
1776
1777
0
  if (TransactionIdIsInProgress(txn->xid))
1778
0
    return false;
1779
1780
0
  if (TransactionIdDidCommit(txn->xid))
1781
0
  {
1782
    /*
1783
     * Remember the transaction is committed so that we can skip CLOG
1784
     * check next time, avoiding the pressure on CLOG lookup.
1785
     */
1786
0
    Assert(!rbtxn_is_aborted(txn));
1787
0
    txn->txn_flags |= RBTXN_IS_COMMITTED;
1788
0
    return false;
1789
0
  }
1790
1791
  /*
1792
   * The transaction aborted. We discard both the changes collected so far
1793
   * and the toast reconstruction data. The full cleanup will happen as part
1794
   * of decoding ABORT record of this transaction.
1795
   */
1796
0
  ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
1797
0
  ReorderBufferToastReset(rb, txn);
1798
1799
  /* All changes should be discarded */
1800
0
  Assert(txn->size == 0);
1801
1802
  /*
1803
   * Mark the transaction as aborted so we can ignore future changes of this
1804
   * transaction.
1805
   */
1806
0
  Assert(!rbtxn_is_committed(txn));
1807
0
  txn->txn_flags |= RBTXN_IS_ABORTED;
1808
1809
0
  return true;
1810
0
}
1811
1812
/*
1813
 * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by
1814
 * HeapTupleSatisfiesHistoricMVCC.
1815
 */
1816
static void
1817
ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
1818
0
{
1819
0
  dlist_iter  iter;
1820
0
  HASHCTL   hash_ctl;
1821
1822
0
  if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids))
1823
0
    return;
1824
1825
0
  hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey);
1826
0
  hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt);
1827
0
  hash_ctl.hcxt = rb->context;
1828
1829
  /*
1830
   * create the hash with the exact number of to-be-stored tuplecids from
1831
   * the start
1832
   */
1833
0
  txn->tuplecid_hash =
1834
0
    hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl,
1835
0
          HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
1836
1837
0
  dlist_foreach(iter, &txn->tuplecids)
1838
0
  {
1839
0
    ReorderBufferTupleCidKey key;
1840
0
    ReorderBufferTupleCidEnt *ent;
1841
0
    bool    found;
1842
0
    ReorderBufferChange *change;
1843
1844
0
    change = dlist_container(ReorderBufferChange, node, iter.cur);
1845
1846
0
    Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
1847
1848
    /* be careful about padding */
1849
0
    memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
1850
1851
0
    key.rlocator = change->data.tuplecid.locator;
1852
1853
0
    ItemPointerCopy(&change->data.tuplecid.tid,
1854
0
            &key.tid);
1855
1856
0
    ent = (ReorderBufferTupleCidEnt *)
1857
0
      hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found);
1858
0
    if (!found)
1859
0
    {
1860
0
      ent->cmin = change->data.tuplecid.cmin;
1861
0
      ent->cmax = change->data.tuplecid.cmax;
1862
0
      ent->combocid = change->data.tuplecid.combocid;
1863
0
    }
1864
0
    else
1865
0
    {
1866
      /*
1867
       * Maybe we already saw this tuple before in this transaction, but
1868
       * if so it must have the same cmin.
1869
       */
1870
0
      Assert(ent->cmin == change->data.tuplecid.cmin);
1871
1872
      /*
1873
       * cmax may be initially invalid, but once set it can only grow,
1874
       * and never become invalid again.
1875
       */
1876
0
      Assert((ent->cmax == InvalidCommandId) ||
1877
0
           ((change->data.tuplecid.cmax != InvalidCommandId) &&
1878
0
          (change->data.tuplecid.cmax > ent->cmax)));
1879
0
      ent->cmax = change->data.tuplecid.cmax;
1880
0
    }
1881
0
  }
1882
0
}
1883
1884
/*
1885
 * Copy a provided snapshot so we can modify it privately. This is needed so
1886
 * that catalog modifying transactions can look into intermediate catalog
1887
 * states.
1888
 */
1889
static Snapshot
1890
ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
1891
            ReorderBufferTXN *txn, CommandId cid)
1892
0
{
1893
0
  Snapshot  snap;
1894
0
  dlist_iter  iter;
1895
0
  int     i = 0;
1896
0
  Size    size;
1897
1898
0
  size = sizeof(SnapshotData) +
1899
0
    sizeof(TransactionId) * orig_snap->xcnt +
1900
0
    sizeof(TransactionId) * (txn->nsubtxns + 1);
1901
1902
0
  snap = MemoryContextAllocZero(rb->context, size);
1903
0
  memcpy(snap, orig_snap, sizeof(SnapshotData));
1904
1905
0
  snap->copied = true;
1906
0
  snap->active_count = 1;   /* mark as active so nobody frees it */
1907
0
  snap->regd_count = 0;
1908
0
  snap->xip = (TransactionId *) (snap + 1);
1909
1910
0
  memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt);
1911
1912
  /*
1913
   * snap->subxip contains all txids that belong to our transaction which we
1914
   * need to check via cmin/cmax. That's why we store the toplevel
1915
   * transaction in there as well.
1916
   */
1917
0
  snap->subxip = snap->xip + snap->xcnt;
1918
0
  snap->subxip[i++] = txn->xid;
1919
1920
  /*
1921
   * txn->nsubtxns isn't decreased when subtransactions abort, so count
1922
   * manually. Since it's an upper boundary it is safe to use it for the
1923
   * allocation above.
1924
   */
1925
0
  snap->subxcnt = 1;
1926
1927
0
  dlist_foreach(iter, &txn->subtxns)
1928
0
  {
1929
0
    ReorderBufferTXN *sub_txn;
1930
1931
0
    sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur);
1932
0
    snap->subxip[i++] = sub_txn->xid;
1933
0
    snap->subxcnt++;
1934
0
  }
1935
1936
  /* sort so we can bsearch() later */
1937
0
  qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator);
1938
1939
  /* store the specified current CommandId */
1940
0
  snap->curcid = cid;
1941
1942
0
  return snap;
1943
0
}
1944
1945
/*
1946
 * Free a previously ReorderBufferCopySnap'ed snapshot
1947
 */
1948
static void
1949
ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
1950
0
{
1951
0
  if (snap->copied)
1952
0
    pfree(snap);
1953
0
  else
1954
0
    SnapBuildSnapDecRefcount(snap);
1955
0
}
1956
1957
/*
1958
 * If the transaction was (partially) streamed, we need to prepare or commit
1959
 * it in a 'streamed' way.  That is, we first stream the remaining part of the
1960
 * transaction, and then invoke stream_prepare or stream_commit message as per
1961
 * the case.
1962
 */
1963
static void
1964
ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
1965
0
{
1966
  /* we should only call this for previously streamed transactions */
1967
0
  Assert(rbtxn_is_streamed(txn));
1968
1969
0
  ReorderBufferStreamTXN(rb, txn);
1970
1971
0
  if (rbtxn_is_prepared(txn))
1972
0
  {
1973
    /*
1974
     * Note, we send stream prepare even if a concurrent abort is
1975
     * detected. See DecodePrepare for more information.
1976
     */
1977
0
    Assert(!rbtxn_sent_prepare(txn));
1978
0
    rb->stream_prepare(rb, txn, txn->final_lsn);
1979
0
    txn->txn_flags |= RBTXN_SENT_PREPARE;
1980
1981
    /*
1982
     * This is a PREPARED transaction, part of a two-phase commit. The
1983
     * full cleanup will happen as part of the COMMIT PREPAREDs, so now
1984
     * just truncate txn by removing changes and tuplecids.
1985
     */
1986
0
    ReorderBufferTruncateTXN(rb, txn, true);
1987
    /* Reset the CheckXidAlive */
1988
0
    CheckXidAlive = InvalidTransactionId;
1989
0
  }
1990
0
  else
1991
0
  {
1992
0
    rb->stream_commit(rb, txn, txn->final_lsn);
1993
0
    ReorderBufferCleanupTXN(rb, txn);
1994
0
  }
1995
0
}
1996
1997
/*
1998
 * Set xid to detect concurrent aborts.
1999
 *
2000
 * While streaming an in-progress transaction or decoding a prepared
2001
 * transaction there is a possibility that the (sub)transaction might get
2002
 * aborted concurrently.  In such case if the (sub)transaction has catalog
2003
 * update then we might decode the tuple using wrong catalog version.  For
2004
 * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0).  Now,
2005
 * the transaction 501 updates the catalog tuple and after that we will have
2006
 * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0).  Now, if 501 is
2007
 * aborted and some other transaction say 502 updates the same catalog tuple
2008
 * then the first tuple will be changed to (xmin: 500, xmax: 502).  So, the
2009
 * problem is that when we try to decode the tuple inserted/updated in 501
2010
 * after the catalog update, we will see the catalog tuple with (xmin: 500,
2011
 * xmax: 502) as visible because it will consider that the tuple is deleted by
2012
 * xid 502 which is not visible to our snapshot.  And when we will try to
2013
 * decode with that catalog tuple, it can lead to a wrong result or a crash.
2014
 * So, it is necessary to detect concurrent aborts to allow streaming of
2015
 * in-progress transactions or decoding of prepared transactions.
2016
 *
2017
 * For detecting the concurrent abort we set CheckXidAlive to the current
2018
 * (sub)transaction's xid for which this change belongs to.  And, during
2019
 * catalog scan we can check the status of the xid and if it is aborted we will
2020
 * report a specific error so that we can stop streaming current transaction
2021
 * and discard the already streamed changes on such an error.  We might have
2022
 * already streamed some of the changes for the aborted (sub)transaction, but
2023
 * that is fine because when we decode the abort we will stream abort message
2024
 * to truncate the changes in the subscriber. Similarly, for prepared
2025
 * transactions, we stop decoding if concurrent abort is detected and then
2026
 * rollback the changes when rollback prepared is encountered. See
2027
 * DecodePrepare.
2028
 */
2029
static inline void
2030
SetupCheckXidLive(TransactionId xid)
2031
0
{
2032
  /*
2033
   * If the input transaction id is already set as a CheckXidAlive then
2034
   * nothing to do.
2035
   */
2036
0
  if (TransactionIdEquals(CheckXidAlive, xid))
2037
0
    return;
2038
2039
  /*
2040
   * setup CheckXidAlive if it's not committed yet.  We don't check if the
2041
   * xid is aborted.  That will happen during catalog access.
2042
   */
2043
0
  if (!TransactionIdDidCommit(xid))
2044
0
    CheckXidAlive = xid;
2045
0
  else
2046
0
    CheckXidAlive = InvalidTransactionId;
2047
0
}
2048
2049
/*
2050
 * Helper function for ReorderBufferProcessTXN for applying change.
2051
 */
2052
static inline void
2053
ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
2054
             Relation relation, ReorderBufferChange *change,
2055
             bool streaming)
2056
0
{
2057
0
  if (streaming)
2058
0
    rb->stream_change(rb, txn, relation, change);
2059
0
  else
2060
0
    rb->apply_change(rb, txn, relation, change);
2061
0
}
2062
2063
/*
2064
 * Helper function for ReorderBufferProcessTXN for applying the truncate.
2065
 */
2066
static inline void
2067
ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
2068
               int nrelations, Relation *relations,
2069
               ReorderBufferChange *change, bool streaming)
2070
0
{
2071
0
  if (streaming)
2072
0
    rb->stream_truncate(rb, txn, nrelations, relations, change);
2073
0
  else
2074
0
    rb->apply_truncate(rb, txn, nrelations, relations, change);
2075
0
}
2076
2077
/*
2078
 * Helper function for ReorderBufferProcessTXN for applying the message.
2079
 */
2080
static inline void
2081
ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
2082
              ReorderBufferChange *change, bool streaming)
2083
0
{
2084
0
  if (streaming)
2085
0
    rb->stream_message(rb, txn, change->lsn, true,
2086
0
               change->data.msg.prefix,
2087
0
               change->data.msg.message_size,
2088
0
               change->data.msg.message);
2089
0
  else
2090
0
    rb->message(rb, txn, change->lsn, true,
2091
0
          change->data.msg.prefix,
2092
0
          change->data.msg.message_size,
2093
0
          change->data.msg.message);
2094
0
}
2095
2096
/*
2097
 * Function to store the command id and snapshot at the end of the current
2098
 * stream so that we can reuse the same while sending the next stream.
2099
 */
2100
static inline void
2101
ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
2102
               Snapshot snapshot_now, CommandId command_id)
2103
0
{
2104
0
  txn->command_id = command_id;
2105
2106
  /* Avoid copying if it's already copied. */
2107
0
  if (snapshot_now->copied)
2108
0
    txn->snapshot_now = snapshot_now;
2109
0
  else
2110
0
    txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2111
0
                          txn, command_id);
2112
0
}
2113
2114
/*
2115
 * Mark the given transaction as streamed if it's a top-level transaction
2116
 * or has changes.
2117
 */
2118
static void
2119
ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn)
2120
0
{
2121
  /*
2122
   * The top-level transaction, is marked as streamed always, even if it
2123
   * does not contain any changes (that is, when all the changes are in
2124
   * subtransactions).
2125
   *
2126
   * For subtransactions, we only mark them as streamed when there are
2127
   * changes in them.
2128
   *
2129
   * We do it this way because of aborts - we don't want to send aborts for
2130
   * XIDs the downstream is not aware of. And of course, it always knows
2131
   * about the top-level xact (we send the XID in all messages), but we
2132
   * never stream XIDs of empty subxacts.
2133
   */
2134
0
  if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0))
2135
0
    txn->txn_flags |= RBTXN_IS_STREAMED;
2136
0
}
2137
2138
/*
2139
 * Helper function for ReorderBufferProcessTXN to handle the concurrent
2140
 * abort of the streaming transaction.  This resets the TXN such that it
2141
 * can be used to stream the remaining data of transaction being processed.
2142
 * This can happen when the subtransaction is aborted and we still want to
2143
 * continue processing the main or other subtransactions data.
2144
 */
2145
static void
2146
ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2147
            Snapshot snapshot_now,
2148
            CommandId command_id,
2149
            XLogRecPtr last_lsn,
2150
            ReorderBufferChange *specinsert)
2151
0
{
2152
  /* Discard the changes that we just streamed */
2153
0
  ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
2154
2155
  /* Free all resources allocated for toast reconstruction */
2156
0
  ReorderBufferToastReset(rb, txn);
2157
2158
  /* Return the spec insert change if it is not NULL */
2159
0
  if (specinsert != NULL)
2160
0
  {
2161
0
    ReorderBufferFreeChange(rb, specinsert, true);
2162
0
    specinsert = NULL;
2163
0
  }
2164
2165
  /*
2166
   * For the streaming case, stop the stream and remember the command ID and
2167
   * snapshot for the streaming run.
2168
   */
2169
0
  if (rbtxn_is_streamed(txn))
2170
0
  {
2171
0
    rb->stream_stop(rb, txn, last_lsn);
2172
0
    ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2173
0
  }
2174
2175
  /* All changes must be deallocated */
2176
0
  Assert(txn->size == 0);
2177
0
}
2178
2179
/*
2180
 * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN.
2181
 *
2182
 * Send data of a transaction (and its subtransactions) to the
2183
 * output plugin. We iterate over the top and subtransactions (using a k-way
2184
 * merge) and replay the changes in lsn order.
2185
 *
2186
 * If streaming is true then data will be sent using stream API.
2187
 *
2188
 * Note: "volatile" markers on some parameters are to avoid trouble with
2189
 * PG_TRY inside the function.
2190
 */
2191
static void
2192
ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
2193
            XLogRecPtr commit_lsn,
2194
            volatile Snapshot snapshot_now,
2195
            volatile CommandId command_id,
2196
            bool streaming)
2197
0
{
2198
0
  bool    using_subtxn;
2199
0
  MemoryContext ccxt = CurrentMemoryContext;
2200
0
  ReorderBufferIterTXNState *volatile iterstate = NULL;
2201
0
  volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
2202
0
  ReorderBufferChange *volatile specinsert = NULL;
2203
0
  volatile bool stream_started = false;
2204
0
  ReorderBufferTXN *volatile curtxn = NULL;
2205
2206
  /* build data to be able to lookup the CommandIds of catalog tuples */
2207
0
  ReorderBufferBuildTupleCidHash(rb, txn);
2208
2209
  /* setup the initial snapshot */
2210
0
  SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2211
2212
  /*
2213
   * Decoding needs access to syscaches et al., which in turn use
2214
   * heavyweight locks and such. Thus we need to have enough state around to
2215
   * keep track of those.  The easiest way is to simply use a transaction
2216
   * internally.  That also allows us to easily enforce that nothing writes
2217
   * to the database by checking for xid assignments.
2218
   *
2219
   * When we're called via the SQL SRF there's already a transaction
2220
   * started, so start an explicit subtransaction there.
2221
   */
2222
0
  using_subtxn = IsTransactionOrTransactionBlock();
2223
2224
0
  PG_TRY();
2225
0
  {
2226
0
    ReorderBufferChange *change;
2227
0
    int     changes_count = 0;  /* used to accumulate the number of
2228
                     * changes */
2229
2230
0
    if (using_subtxn)
2231
0
      BeginInternalSubTransaction(streaming ? "stream" : "replay");
2232
0
    else
2233
0
      StartTransactionCommand();
2234
2235
    /*
2236
     * We only need to send begin/begin-prepare for non-streamed
2237
     * transactions.
2238
     */
2239
0
    if (!streaming)
2240
0
    {
2241
0
      if (rbtxn_is_prepared(txn))
2242
0
        rb->begin_prepare(rb, txn);
2243
0
      else
2244
0
        rb->begin(rb, txn);
2245
0
    }
2246
2247
0
    ReorderBufferIterTXNInit(rb, txn, &iterstate);
2248
0
    while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
2249
0
    {
2250
0
      Relation  relation = NULL;
2251
0
      Oid     reloid;
2252
2253
0
      CHECK_FOR_INTERRUPTS();
2254
2255
      /*
2256
       * We can't call start stream callback before processing first
2257
       * change.
2258
       */
2259
0
      if (prev_lsn == InvalidXLogRecPtr)
2260
0
      {
2261
0
        if (streaming)
2262
0
        {
2263
0
          txn->origin_id = change->origin_id;
2264
0
          rb->stream_start(rb, txn, change->lsn);
2265
0
          stream_started = true;
2266
0
        }
2267
0
      }
2268
2269
      /*
2270
       * Enforce correct ordering of changes, merged from multiple
2271
       * subtransactions. The changes may have the same LSN due to
2272
       * MULTI_INSERT xlog records.
2273
       */
2274
0
      Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
2275
2276
0
      prev_lsn = change->lsn;
2277
2278
      /*
2279
       * Set the current xid to detect concurrent aborts. This is
2280
       * required for the cases when we decode the changes before the
2281
       * COMMIT record is processed.
2282
       */
2283
0
      if (streaming || rbtxn_is_prepared(change->txn))
2284
0
      {
2285
0
        curtxn = change->txn;
2286
0
        SetupCheckXidLive(curtxn->xid);
2287
0
      }
2288
2289
0
      switch (change->action)
2290
0
      {
2291
0
        case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
2292
2293
          /*
2294
           * Confirmation for speculative insertion arrived. Simply
2295
           * use as a normal record. It'll be cleaned up at the end
2296
           * of INSERT processing.
2297
           */
2298
0
          if (specinsert == NULL)
2299
0
            elog(ERROR, "invalid ordering of speculative insertion changes");
2300
0
          Assert(specinsert->data.tp.oldtuple == NULL);
2301
0
          change = specinsert;
2302
0
          change->action = REORDER_BUFFER_CHANGE_INSERT;
2303
2304
          /* intentionally fall through */
2305
0
        case REORDER_BUFFER_CHANGE_INSERT:
2306
0
        case REORDER_BUFFER_CHANGE_UPDATE:
2307
0
        case REORDER_BUFFER_CHANGE_DELETE:
2308
0
          Assert(snapshot_now);
2309
2310
0
          reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid,
2311
0
                          change->data.tp.rlocator.relNumber);
2312
2313
          /*
2314
           * Mapped catalog tuple without data, emitted while
2315
           * catalog table was in the process of being rewritten. We
2316
           * can fail to look up the relfilenumber, because the
2317
           * relmapper has no "historic" view, in contrast to the
2318
           * normal catalog during decoding. Thus repeated rewrites
2319
           * can cause a lookup failure. That's OK because we do not
2320
           * decode catalog changes anyway. Normally such tuples
2321
           * would be skipped over below, but we can't identify
2322
           * whether the table should be logically logged without
2323
           * mapping the relfilenumber to the oid.
2324
           */
2325
0
          if (reloid == InvalidOid &&
2326
0
            change->data.tp.newtuple == NULL &&
2327
0
            change->data.tp.oldtuple == NULL)
2328
0
            goto change_done;
2329
0
          else if (reloid == InvalidOid)
2330
0
            elog(ERROR, "could not map filenumber \"%s\" to relation OID",
2331
0
               relpathperm(change->data.tp.rlocator,
2332
0
                     MAIN_FORKNUM).str);
2333
2334
0
          relation = RelationIdGetRelation(reloid);
2335
2336
0
          if (!RelationIsValid(relation))
2337
0
            elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")",
2338
0
               reloid,
2339
0
               relpathperm(change->data.tp.rlocator,
2340
0
                     MAIN_FORKNUM).str);
2341
2342
0
          if (!RelationIsLogicallyLogged(relation))
2343
0
            goto change_done;
2344
2345
          /*
2346
           * Ignore temporary heaps created during DDL unless the
2347
           * plugin has asked for them.
2348
           */
2349
0
          if (relation->rd_rel->relrewrite && !rb->output_rewrites)
2350
0
            goto change_done;
2351
2352
          /*
2353
           * For now ignore sequence changes entirely. Most of the
2354
           * time they don't log changes using records we
2355
           * understand, so it doesn't make sense to handle the few
2356
           * cases we do.
2357
           */
2358
0
          if (relation->rd_rel->relkind == RELKIND_SEQUENCE)
2359
0
            goto change_done;
2360
2361
          /* user-triggered change */
2362
0
          if (!IsToastRelation(relation))
2363
0
          {
2364
0
            ReorderBufferToastReplace(rb, txn, relation, change);
2365
0
            ReorderBufferApplyChange(rb, txn, relation, change,
2366
0
                         streaming);
2367
2368
            /*
2369
             * Only clear reassembled toast chunks if we're sure
2370
             * they're not required anymore. The creator of the
2371
             * tuple tells us.
2372
             */
2373
0
            if (change->data.tp.clear_toast_afterwards)
2374
0
              ReorderBufferToastReset(rb, txn);
2375
0
          }
2376
          /* we're not interested in toast deletions */
2377
0
          else if (change->action == REORDER_BUFFER_CHANGE_INSERT)
2378
0
          {
2379
            /*
2380
             * Need to reassemble the full toasted Datum in
2381
             * memory, to ensure the chunks don't get reused till
2382
             * we're done remove it from the list of this
2383
             * transaction's changes. Otherwise it will get
2384
             * freed/reused while restoring spooled data from
2385
             * disk.
2386
             */
2387
0
            Assert(change->data.tp.newtuple != NULL);
2388
2389
0
            dlist_delete(&change->node);
2390
0
            ReorderBufferToastAppendChunk(rb, txn, relation,
2391
0
                            change);
2392
0
          }
2393
2394
0
      change_done:
2395
2396
          /*
2397
           * If speculative insertion was confirmed, the record
2398
           * isn't needed anymore.
2399
           */
2400
0
          if (specinsert != NULL)
2401
0
          {
2402
0
            ReorderBufferFreeChange(rb, specinsert, true);
2403
0
            specinsert = NULL;
2404
0
          }
2405
2406
0
          if (RelationIsValid(relation))
2407
0
          {
2408
0
            RelationClose(relation);
2409
0
            relation = NULL;
2410
0
          }
2411
0
          break;
2412
2413
0
        case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
2414
2415
          /*
2416
           * Speculative insertions are dealt with by delaying the
2417
           * processing of the insert until the confirmation record
2418
           * arrives. For that we simply unlink the record from the
2419
           * chain, so it does not get freed/reused while restoring
2420
           * spooled data from disk.
2421
           *
2422
           * This is safe in the face of concurrent catalog changes
2423
           * because the relevant relation can't be changed between
2424
           * speculative insertion and confirmation due to
2425
           * CheckTableNotInUse() and locking.
2426
           */
2427
2428
          /* clear out a pending (and thus failed) speculation */
2429
0
          if (specinsert != NULL)
2430
0
          {
2431
0
            ReorderBufferFreeChange(rb, specinsert, true);
2432
0
            specinsert = NULL;
2433
0
          }
2434
2435
          /* and memorize the pending insertion */
2436
0
          dlist_delete(&change->node);
2437
0
          specinsert = change;
2438
0
          break;
2439
2440
0
        case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
2441
2442
          /*
2443
           * Abort for speculative insertion arrived. So cleanup the
2444
           * specinsert tuple and toast hash.
2445
           *
2446
           * Note that we get the spec abort change for each toast
2447
           * entry but we need to perform the cleanup only the first
2448
           * time we get it for the main table.
2449
           */
2450
0
          if (specinsert != NULL)
2451
0
          {
2452
            /*
2453
             * We must clean the toast hash before processing a
2454
             * completely new tuple to avoid confusion about the
2455
             * previous tuple's toast chunks.
2456
             */
2457
0
            Assert(change->data.tp.clear_toast_afterwards);
2458
0
            ReorderBufferToastReset(rb, txn);
2459
2460
            /* We don't need this record anymore. */
2461
0
            ReorderBufferFreeChange(rb, specinsert, true);
2462
0
            specinsert = NULL;
2463
0
          }
2464
0
          break;
2465
2466
0
        case REORDER_BUFFER_CHANGE_TRUNCATE:
2467
0
          {
2468
0
            int     i;
2469
0
            int     nrelids = change->data.truncate.nrelids;
2470
0
            int     nrelations = 0;
2471
0
            Relation   *relations;
2472
2473
0
            relations = palloc0(nrelids * sizeof(Relation));
2474
0
            for (i = 0; i < nrelids; i++)
2475
0
            {
2476
0
              Oid     relid = change->data.truncate.relids[i];
2477
0
              Relation  rel;
2478
2479
0
              rel = RelationIdGetRelation(relid);
2480
2481
0
              if (!RelationIsValid(rel))
2482
0
                elog(ERROR, "could not open relation with OID %u", relid);
2483
2484
0
              if (!RelationIsLogicallyLogged(rel))
2485
0
                continue;
2486
2487
0
              relations[nrelations++] = rel;
2488
0
            }
2489
2490
            /* Apply the truncate. */
2491
0
            ReorderBufferApplyTruncate(rb, txn, nrelations,
2492
0
                           relations, change,
2493
0
                           streaming);
2494
2495
0
            for (i = 0; i < nrelations; i++)
2496
0
              RelationClose(relations[i]);
2497
2498
0
            break;
2499
0
          }
2500
2501
0
        case REORDER_BUFFER_CHANGE_MESSAGE:
2502
0
          ReorderBufferApplyMessage(rb, txn, change, streaming);
2503
0
          break;
2504
2505
0
        case REORDER_BUFFER_CHANGE_INVALIDATION:
2506
          /* Execute the invalidation messages locally */
2507
0
          ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations,
2508
0
                            change->data.inval.invalidations);
2509
0
          break;
2510
2511
0
        case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
2512
          /* get rid of the old */
2513
0
          TeardownHistoricSnapshot(false);
2514
2515
0
          if (snapshot_now->copied)
2516
0
          {
2517
0
            ReorderBufferFreeSnap(rb, snapshot_now);
2518
0
            snapshot_now =
2519
0
              ReorderBufferCopySnap(rb, change->data.snapshot,
2520
0
                          txn, command_id);
2521
0
          }
2522
2523
          /*
2524
           * Restored from disk, need to be careful not to double
2525
           * free. We could introduce refcounting for that, but for
2526
           * now this seems infrequent enough not to care.
2527
           */
2528
0
          else if (change->data.snapshot->copied)
2529
0
          {
2530
0
            snapshot_now =
2531
0
              ReorderBufferCopySnap(rb, change->data.snapshot,
2532
0
                          txn, command_id);
2533
0
          }
2534
0
          else
2535
0
          {
2536
0
            snapshot_now = change->data.snapshot;
2537
0
          }
2538
2539
          /* and continue with the new one */
2540
0
          SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2541
0
          break;
2542
2543
0
        case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
2544
0
          Assert(change->data.command_id != InvalidCommandId);
2545
2546
0
          if (command_id < change->data.command_id)
2547
0
          {
2548
0
            command_id = change->data.command_id;
2549
2550
0
            if (!snapshot_now->copied)
2551
0
            {
2552
              /* we don't use the global one anymore */
2553
0
              snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
2554
0
                                 txn, command_id);
2555
0
            }
2556
2557
0
            snapshot_now->curcid = command_id;
2558
2559
0
            TeardownHistoricSnapshot(false);
2560
0
            SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
2561
0
          }
2562
2563
0
          break;
2564
2565
0
        case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
2566
0
          elog(ERROR, "tuplecid value in changequeue");
2567
0
          break;
2568
0
      }
2569
2570
      /*
2571
       * It is possible that the data is not sent to downstream for a
2572
       * long time either because the output plugin filtered it or there
2573
       * is a DDL that generates a lot of data that is not processed by
2574
       * the plugin. So, in such cases, the downstream can timeout. To
2575
       * avoid that we try to send a keepalive message if required.
2576
       * Trying to send a keepalive message after every change has some
2577
       * overhead, but testing showed there is no noticeable overhead if
2578
       * we do it after every ~100 changes.
2579
       */
2580
0
#define CHANGES_THRESHOLD 100
2581
2582
0
      if (++changes_count >= CHANGES_THRESHOLD)
2583
0
      {
2584
0
        rb->update_progress_txn(rb, txn, change->lsn);
2585
0
        changes_count = 0;
2586
0
      }
2587
0
    }
2588
2589
    /* speculative insertion record must be freed by now */
2590
0
    Assert(!specinsert);
2591
2592
    /* clean up the iterator */
2593
0
    ReorderBufferIterTXNFinish(rb, iterstate);
2594
0
    iterstate = NULL;
2595
2596
    /*
2597
     * Update total transaction count and total bytes processed by the
2598
     * transaction and its subtransactions. Ensure to not count the
2599
     * streamed transaction multiple times.
2600
     *
2601
     * Note that the statistics computation has to be done after
2602
     * ReorderBufferIterTXNFinish as it releases the serialized change
2603
     * which we have already accounted in ReorderBufferIterTXNNext.
2604
     */
2605
0
    if (!rbtxn_is_streamed(txn))
2606
0
      rb->totalTxns++;
2607
2608
0
    rb->totalBytes += txn->total_size;
2609
2610
    /*
2611
     * Done with current changes, send the last message for this set of
2612
     * changes depending upon streaming mode.
2613
     */
2614
0
    if (streaming)
2615
0
    {
2616
0
      if (stream_started)
2617
0
      {
2618
0
        rb->stream_stop(rb, txn, prev_lsn);
2619
0
        stream_started = false;
2620
0
      }
2621
0
    }
2622
0
    else
2623
0
    {
2624
      /*
2625
       * Call either PREPARE (for two-phase transactions) or COMMIT (for
2626
       * regular ones).
2627
       */
2628
0
      if (rbtxn_is_prepared(txn))
2629
0
      {
2630
0
        Assert(!rbtxn_sent_prepare(txn));
2631
0
        rb->prepare(rb, txn, commit_lsn);
2632
0
        txn->txn_flags |= RBTXN_SENT_PREPARE;
2633
0
      }
2634
0
      else
2635
0
        rb->commit(rb, txn, commit_lsn);
2636
0
    }
2637
2638
    /* this is just a sanity check against bad output plugin behaviour */
2639
0
    if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
2640
0
      elog(ERROR, "output plugin used XID %u",
2641
0
         GetCurrentTransactionId());
2642
2643
    /*
2644
     * Remember the command ID and snapshot for the next set of changes in
2645
     * streaming mode.
2646
     */
2647
0
    if (streaming)
2648
0
      ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
2649
0
    else if (snapshot_now->copied)
2650
0
      ReorderBufferFreeSnap(rb, snapshot_now);
2651
2652
    /* cleanup */
2653
0
    TeardownHistoricSnapshot(false);
2654
2655
    /*
2656
     * Aborting the current (sub-)transaction as a whole has the right
2657
     * semantics. We want all locks acquired in here to be released, not
2658
     * reassigned to the parent and we do not want any database access
2659
     * have persistent effects.
2660
     */
2661
0
    AbortCurrentTransaction();
2662
2663
    /* make sure there's no cache pollution */
2664
0
    ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations);
2665
2666
0
    if (using_subtxn)
2667
0
      RollbackAndReleaseCurrentSubTransaction();
2668
2669
    /*
2670
     * We are here due to one of the four reasons: 1. Decoding an
2671
     * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a
2672
     * prepared txn that was (partially) streamed. 4. Decoding a committed
2673
     * txn.
2674
     *
2675
     * For 1, we allow truncation of txn data by removing the changes
2676
     * already streamed but still keeping other things like invalidations,
2677
     * snapshot, and tuplecids. For 2 and 3, we indicate
2678
     * ReorderBufferTruncateTXN to do more elaborate truncation of txn
2679
     * data as the entire transaction has been decoded except for commit.
2680
     * For 4, as the entire txn has been decoded, we can fully clean up
2681
     * the TXN reorder buffer.
2682
     */
2683
0
    if (streaming || rbtxn_is_prepared(txn))
2684
0
    {
2685
0
      if (streaming)
2686
0
        ReorderBufferMaybeMarkTXNStreamed(rb, txn);
2687
2688
0
      ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn));
2689
      /* Reset the CheckXidAlive */
2690
0
      CheckXidAlive = InvalidTransactionId;
2691
0
    }
2692
0
    else
2693
0
      ReorderBufferCleanupTXN(rb, txn);
2694
0
  }
2695
0
  PG_CATCH();
2696
0
  {
2697
0
    MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
2698
0
    ErrorData  *errdata = CopyErrorData();
2699
2700
    /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
2701
0
    if (iterstate)
2702
0
      ReorderBufferIterTXNFinish(rb, iterstate);
2703
2704
0
    TeardownHistoricSnapshot(true);
2705
2706
    /*
2707
     * Force cache invalidation to happen outside of a valid transaction
2708
     * to prevent catalog access as we just caught an error.
2709
     */
2710
0
    AbortCurrentTransaction();
2711
2712
    /* make sure there's no cache pollution */
2713
0
    ReorderBufferExecuteInvalidations(txn->ninvalidations,
2714
0
                      txn->invalidations);
2715
2716
0
    if (using_subtxn)
2717
0
      RollbackAndReleaseCurrentSubTransaction();
2718
2719
    /*
2720
     * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
2721
     * abort of the (sub)transaction we are streaming or preparing. We
2722
     * need to do the cleanup and return gracefully on this error, see
2723
     * SetupCheckXidLive.
2724
     *
2725
     * This error code can be thrown by one of the callbacks we call
2726
     * during decoding so we need to ensure that we return gracefully only
2727
     * when we are sending the data in streaming mode and the streaming is
2728
     * not finished yet or when we are sending the data out on a PREPARE
2729
     * during a two-phase commit.
2730
     */
2731
0
    if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK &&
2732
0
      (stream_started || rbtxn_is_prepared(txn)))
2733
0
    {
2734
      /* curtxn must be set for streaming or prepared transactions */
2735
0
      Assert(curtxn);
2736
2737
      /* Cleanup the temporary error state. */
2738
0
      FlushErrorState();
2739
0
      FreeErrorData(errdata);
2740
0
      errdata = NULL;
2741
2742
      /* Remember the transaction is aborted. */
2743
0
      Assert(!rbtxn_is_committed(curtxn));
2744
0
      curtxn->txn_flags |= RBTXN_IS_ABORTED;
2745
2746
      /* Mark the transaction is streamed if appropriate */
2747
0
      if (stream_started)
2748
0
        ReorderBufferMaybeMarkTXNStreamed(rb, txn);
2749
2750
      /* Reset the TXN so that it is allowed to stream remaining data. */
2751
0
      ReorderBufferResetTXN(rb, txn, snapshot_now,
2752
0
                  command_id, prev_lsn,
2753
0
                  specinsert);
2754
0
    }
2755
0
    else
2756
0
    {
2757
0
      ReorderBufferCleanupTXN(rb, txn);
2758
0
      MemoryContextSwitchTo(ecxt);
2759
0
      PG_RE_THROW();
2760
0
    }
2761
0
  }
2762
0
  PG_END_TRY();
2763
0
}
2764
2765
/*
2766
 * Perform the replay of a transaction and its non-aborted subtransactions.
2767
 *
2768
 * Subtransactions previously have to be processed by
2769
 * ReorderBufferCommitChild(), even if previously assigned to the toplevel
2770
 * transaction with ReorderBufferAssignChild.
2771
 *
2772
 * This interface is called once a prepare or toplevel commit is read for both
2773
 * streamed as well as non-streamed transactions.
2774
 */
2775
static void
2776
ReorderBufferReplay(ReorderBufferTXN *txn,
2777
          ReorderBuffer *rb, TransactionId xid,
2778
          XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2779
          TimestampTz commit_time,
2780
          RepOriginId origin_id, XLogRecPtr origin_lsn)
2781
0
{
2782
0
  Snapshot  snapshot_now;
2783
0
  CommandId command_id = FirstCommandId;
2784
2785
0
  txn->final_lsn = commit_lsn;
2786
0
  txn->end_lsn = end_lsn;
2787
0
  txn->xact_time.commit_time = commit_time;
2788
0
  txn->origin_id = origin_id;
2789
0
  txn->origin_lsn = origin_lsn;
2790
2791
  /*
2792
   * If the transaction was (partially) streamed, we need to commit it in a
2793
   * 'streamed' way. That is, we first stream the remaining part of the
2794
   * transaction, and then invoke stream_commit message.
2795
   *
2796
   * Called after everything (origin ID, LSN, ...) is stored in the
2797
   * transaction to avoid passing that information directly.
2798
   */
2799
0
  if (rbtxn_is_streamed(txn))
2800
0
  {
2801
0
    ReorderBufferStreamCommit(rb, txn);
2802
0
    return;
2803
0
  }
2804
2805
  /*
2806
   * If this transaction has no snapshot, it didn't make any changes to the
2807
   * database, so there's nothing to decode.  Note that
2808
   * ReorderBufferCommitChild will have transferred any snapshots from
2809
   * subtransactions if there were any.
2810
   */
2811
0
  if (txn->base_snapshot == NULL)
2812
0
  {
2813
0
    Assert(txn->ninvalidations == 0);
2814
2815
    /*
2816
     * Removing this txn before a commit might result in the computation
2817
     * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts.
2818
     */
2819
0
    if (!rbtxn_is_prepared(txn))
2820
0
      ReorderBufferCleanupTXN(rb, txn);
2821
0
    return;
2822
0
  }
2823
2824
0
  snapshot_now = txn->base_snapshot;
2825
2826
  /* Process and send the changes to output plugin. */
2827
0
  ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
2828
0
              command_id, false);
2829
0
}
2830
2831
/*
2832
 * Commit a transaction.
2833
 *
2834
 * See comments for ReorderBufferReplay().
2835
 */
2836
void
2837
ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
2838
          XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2839
          TimestampTz commit_time,
2840
          RepOriginId origin_id, XLogRecPtr origin_lsn)
2841
0
{
2842
0
  ReorderBufferTXN *txn;
2843
2844
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2845
0
                false);
2846
2847
  /* unknown transaction, nothing to replay */
2848
0
  if (txn == NULL)
2849
0
    return;
2850
2851
0
  ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time,
2852
0
            origin_id, origin_lsn);
2853
0
}
2854
2855
/*
2856
 * Record the prepare information for a transaction. Also, mark the transaction
2857
 * as a prepared transaction.
2858
 */
2859
bool
2860
ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid,
2861
                 XLogRecPtr prepare_lsn, XLogRecPtr end_lsn,
2862
                 TimestampTz prepare_time,
2863
                 RepOriginId origin_id, XLogRecPtr origin_lsn)
2864
0
{
2865
0
  ReorderBufferTXN *txn;
2866
2867
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2868
2869
  /* unknown transaction, nothing to do */
2870
0
  if (txn == NULL)
2871
0
    return false;
2872
2873
  /*
2874
   * Remember the prepare information to be later used by commit prepared in
2875
   * case we skip doing prepare.
2876
   */
2877
0
  txn->final_lsn = prepare_lsn;
2878
0
  txn->end_lsn = end_lsn;
2879
0
  txn->xact_time.prepare_time = prepare_time;
2880
0
  txn->origin_id = origin_id;
2881
0
  txn->origin_lsn = origin_lsn;
2882
2883
  /* Mark this transaction as a prepared transaction */
2884
0
  Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == 0);
2885
0
  txn->txn_flags |= RBTXN_IS_PREPARED;
2886
2887
0
  return true;
2888
0
}
2889
2890
/* Remember that we have skipped prepare */
2891
void
2892
ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid)
2893
0
{
2894
0
  ReorderBufferTXN *txn;
2895
2896
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false);
2897
2898
  /* unknown transaction, nothing to do */
2899
0
  if (txn == NULL)
2900
0
    return;
2901
2902
  /* txn must have been marked as a prepared transaction */
2903
0
  Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
2904
0
  txn->txn_flags |= RBTXN_SKIPPED_PREPARE;
2905
0
}
2906
2907
/*
2908
 * Prepare a two-phase transaction.
2909
 *
2910
 * See comments for ReorderBufferReplay().
2911
 */
2912
void
2913
ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid,
2914
           char *gid)
2915
0
{
2916
0
  ReorderBufferTXN *txn;
2917
2918
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
2919
0
                false);
2920
2921
  /* unknown transaction, nothing to replay */
2922
0
  if (txn == NULL)
2923
0
    return;
2924
2925
  /*
2926
   * txn must have been marked as a prepared transaction and must have
2927
   * neither been skipped nor sent a prepare. Also, the prepare info must
2928
   * have been updated in it by now.
2929
   */
2930
0
  Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED);
2931
0
  Assert(txn->final_lsn != InvalidXLogRecPtr);
2932
2933
0
  txn->gid = pstrdup(gid);
2934
2935
0
  ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
2936
0
            txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
2937
2938
  /*
2939
   * Send a prepare if not already done so. This might occur if we have
2940
   * detected a concurrent abort while replaying the non-streaming
2941
   * transaction.
2942
   */
2943
0
  if (!rbtxn_sent_prepare(txn))
2944
0
  {
2945
0
    rb->prepare(rb, txn, txn->final_lsn);
2946
0
    txn->txn_flags |= RBTXN_SENT_PREPARE;
2947
0
  }
2948
0
}
2949
2950
/*
2951
 * This is used to handle COMMIT/ROLLBACK PREPARED.
2952
 */
2953
void
2954
ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid,
2955
              XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
2956
              XLogRecPtr two_phase_at,
2957
              TimestampTz commit_time, RepOriginId origin_id,
2958
              XLogRecPtr origin_lsn, char *gid, bool is_commit)
2959
0
{
2960
0
  ReorderBufferTXN *txn;
2961
0
  XLogRecPtr  prepare_end_lsn;
2962
0
  TimestampTz prepare_time;
2963
2964
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false);
2965
2966
  /* unknown transaction, nothing to do */
2967
0
  if (txn == NULL)
2968
0
    return;
2969
2970
  /*
2971
   * By this time the txn has the prepare record information, remember it to
2972
   * be later used for rollback.
2973
   */
2974
0
  prepare_end_lsn = txn->end_lsn;
2975
0
  prepare_time = txn->xact_time.prepare_time;
2976
2977
  /* add the gid in the txn */
2978
0
  txn->gid = pstrdup(gid);
2979
2980
  /*
2981
   * It is possible that this transaction is not decoded at prepare time
2982
   * either because by that time we didn't have a consistent snapshot, or
2983
   * two_phase was not enabled, or it was decoded earlier but we have
2984
   * restarted. We only need to send the prepare if it was not decoded
2985
   * earlier. We don't need to decode the xact for aborts if it is not done
2986
   * already.
2987
   */
2988
0
  if ((txn->final_lsn < two_phase_at) && is_commit)
2989
0
  {
2990
    /*
2991
     * txn must have been marked as a prepared transaction and skipped but
2992
     * not sent a prepare. Also, the prepare info must have been updated
2993
     * in txn even if we skip prepare.
2994
     */
2995
0
    Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) ==
2996
0
         (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE));
2997
0
    Assert(txn->final_lsn != InvalidXLogRecPtr);
2998
2999
    /*
3000
     * By this time the txn has the prepare record information and it is
3001
     * important to use that so that downstream gets the accurate
3002
     * information. If instead, we have passed commit information here
3003
     * then downstream can behave as it has already replayed commit
3004
     * prepared after the restart.
3005
     */
3006
0
    ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn,
3007
0
              txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn);
3008
0
  }
3009
3010
0
  txn->final_lsn = commit_lsn;
3011
0
  txn->end_lsn = end_lsn;
3012
0
  txn->xact_time.commit_time = commit_time;
3013
0
  txn->origin_id = origin_id;
3014
0
  txn->origin_lsn = origin_lsn;
3015
3016
0
  if (is_commit)
3017
0
    rb->commit_prepared(rb, txn, commit_lsn);
3018
0
  else
3019
0
    rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time);
3020
3021
  /* cleanup: make sure there's no cache pollution */
3022
0
  ReorderBufferExecuteInvalidations(txn->ninvalidations,
3023
0
                    txn->invalidations);
3024
0
  ReorderBufferCleanupTXN(rb, txn);
3025
0
}
3026
3027
/*
3028
 * Abort a transaction that possibly has previous changes. Needs to be first
3029
 * called for subtransactions and then for the toplevel xid.
3030
 *
3031
 * NB: Transactions handled here have to have actively aborted (i.e. have
3032
 * produced an abort record). Implicitly aborted transactions are handled via
3033
 * ReorderBufferAbortOld(); transactions we're just not interested in, but
3034
 * which have committed are handled in ReorderBufferForget().
3035
 *
3036
 * This function purges this transaction and its contents from memory and
3037
 * disk.
3038
 */
3039
void
3040
ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
3041
           TimestampTz abort_time)
3042
0
{
3043
0
  ReorderBufferTXN *txn;
3044
3045
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3046
0
                false);
3047
3048
  /* unknown, nothing to remove */
3049
0
  if (txn == NULL)
3050
0
    return;
3051
3052
0
  txn->xact_time.abort_time = abort_time;
3053
3054
  /* For streamed transactions notify the remote node about the abort. */
3055
0
  if (rbtxn_is_streamed(txn))
3056
0
  {
3057
0
    rb->stream_abort(rb, txn, lsn);
3058
3059
    /*
3060
     * We might have decoded changes for this transaction that could load
3061
     * the cache as per the current transaction's view (consider DDL's
3062
     * happened in this transaction). We don't want the decoding of future
3063
     * transactions to use those cache entries so execute invalidations.
3064
     */
3065
0
    if (txn->ninvalidations > 0)
3066
0
      ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
3067
0
                         txn->invalidations);
3068
0
  }
3069
3070
  /* cosmetic... */
3071
0
  txn->final_lsn = lsn;
3072
3073
  /* remove potential on-disk data, and deallocate */
3074
0
  ReorderBufferCleanupTXN(rb, txn);
3075
0
}
3076
3077
/*
3078
 * Abort all transactions that aren't actually running anymore because the
3079
 * server restarted.
3080
 *
3081
 * NB: These really have to be transactions that have aborted due to a server
3082
 * crash/immediate restart, as we don't deal with invalidations here.
3083
 */
3084
void
3085
ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid)
3086
0
{
3087
0
  dlist_mutable_iter it;
3088
3089
  /*
3090
   * Iterate through all (potential) toplevel TXNs and abort all that are
3091
   * older than what possibly can be running. Once we've found the first
3092
   * that is alive we stop, there might be some that acquired an xid earlier
3093
   * but started writing later, but it's unlikely and they will be cleaned
3094
   * up in a later call to this function.
3095
   */
3096
0
  dlist_foreach_modify(it, &rb->toplevel_by_lsn)
3097
0
  {
3098
0
    ReorderBufferTXN *txn;
3099
3100
0
    txn = dlist_container(ReorderBufferTXN, node, it.cur);
3101
3102
0
    if (TransactionIdPrecedes(txn->xid, oldestRunningXid))
3103
0
    {
3104
0
      elog(DEBUG2, "aborting old transaction %u", txn->xid);
3105
3106
      /* Notify the remote node about the crash/immediate restart. */
3107
0
      if (rbtxn_is_streamed(txn))
3108
0
        rb->stream_abort(rb, txn, InvalidXLogRecPtr);
3109
3110
      /* remove potential on-disk data, and deallocate this tx */
3111
0
      ReorderBufferCleanupTXN(rb, txn);
3112
0
    }
3113
0
    else
3114
0
      return;
3115
0
  }
3116
0
}
3117
3118
/*
3119
 * Forget the contents of a transaction if we aren't interested in its
3120
 * contents. Needs to be first called for subtransactions and then for the
3121
 * toplevel xid.
3122
 *
3123
 * This is significantly different to ReorderBufferAbort() because
3124
 * transactions that have committed need to be treated differently from aborted
3125
 * ones since they may have modified the catalog.
3126
 *
3127
 * Note that this is only allowed to be called in the moment a transaction
3128
 * commit has just been read, not earlier; otherwise later records referring
3129
 * to this xid might re-create the transaction incompletely.
3130
 */
3131
void
3132
ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
3133
0
{
3134
0
  ReorderBufferTXN *txn;
3135
3136
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3137
0
                false);
3138
3139
  /* unknown, nothing to forget */
3140
0
  if (txn == NULL)
3141
0
    return;
3142
3143
  /* this transaction mustn't be streamed */
3144
0
  Assert(!rbtxn_is_streamed(txn));
3145
3146
  /* cosmetic... */
3147
0
  txn->final_lsn = lsn;
3148
3149
  /*
3150
   * Process cache invalidation messages if there are any. Even if we're not
3151
   * interested in the transaction's contents, it could have manipulated the
3152
   * catalog and we need to update the caches according to that.
3153
   */
3154
0
  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3155
0
    ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
3156
0
                       txn->invalidations);
3157
0
  else
3158
0
    Assert(txn->ninvalidations == 0);
3159
3160
  /* remove potential on-disk data, and deallocate */
3161
0
  ReorderBufferCleanupTXN(rb, txn);
3162
0
}
3163
3164
/*
3165
 * Invalidate cache for those transactions that need to be skipped just in case
3166
 * catalogs were manipulated as part of the transaction.
3167
 *
3168
 * Note that this is a special-purpose function for prepared transactions where
3169
 * we don't want to clean up the TXN even when we decide to skip it. See
3170
 * DecodePrepare.
3171
 */
3172
void
3173
ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
3174
0
{
3175
0
  ReorderBufferTXN *txn;
3176
3177
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3178
0
                false);
3179
3180
  /* unknown, nothing to do */
3181
0
  if (txn == NULL)
3182
0
    return;
3183
3184
  /*
3185
   * Process cache invalidation messages if there are any. Even if we're not
3186
   * interested in the transaction's contents, it could have manipulated the
3187
   * catalog and we need to update the caches according to that.
3188
   */
3189
0
  if (txn->base_snapshot != NULL && txn->ninvalidations > 0)
3190
0
    ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
3191
0
                       txn->invalidations);
3192
0
  else
3193
0
    Assert(txn->ninvalidations == 0);
3194
0
}
3195
3196
3197
/*
3198
 * Execute invalidations happening outside the context of a decoded
3199
 * transaction. That currently happens either for xid-less commits
3200
 * (cf. RecordTransactionCommit()) or for invalidations in uninteresting
3201
 * transactions (via ReorderBufferForget()).
3202
 */
3203
void
3204
ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations,
3205
                   SharedInvalidationMessage *invalidations)
3206
0
{
3207
0
  bool    use_subtxn = IsTransactionOrTransactionBlock();
3208
0
  int     i;
3209
3210
0
  if (use_subtxn)
3211
0
    BeginInternalSubTransaction("replay");
3212
3213
  /*
3214
   * Force invalidations to happen outside of a valid transaction - that way
3215
   * entries will just be marked as invalid without accessing the catalog.
3216
   * That's advantageous because we don't need to setup the full state
3217
   * necessary for catalog access.
3218
   */
3219
0
  if (use_subtxn)
3220
0
    AbortCurrentTransaction();
3221
3222
0
  for (i = 0; i < ninvalidations; i++)
3223
0
    LocalExecuteInvalidationMessage(&invalidations[i]);
3224
3225
0
  if (use_subtxn)
3226
0
    RollbackAndReleaseCurrentSubTransaction();
3227
0
}
3228
3229
/*
3230
 * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at
3231
 * least once for every xid in XLogRecord->xl_xid (other places in records
3232
 * may, but do not have to be passed through here).
3233
 *
3234
 * Reorderbuffer keeps some data structures about transactions in LSN order,
3235
 * for efficiency. To do that it has to know about when transactions are seen
3236
 * first in the WAL. As many types of records are not actually interesting for
3237
 * logical decoding, they do not necessarily pass through here.
3238
 */
3239
void
3240
ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
3241
0
{
3242
  /* many records won't have an xid assigned, centralize check here */
3243
0
  if (xid != InvalidTransactionId)
3244
0
    ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3245
0
}
3246
3247
/*
3248
 * Add a new snapshot to this transaction that may only used after lsn 'lsn'
3249
 * because the previous snapshot doesn't describe the catalog correctly for
3250
 * following rows.
3251
 */
3252
void
3253
ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
3254
             XLogRecPtr lsn, Snapshot snap)
3255
0
{
3256
0
  ReorderBufferChange *change = ReorderBufferAllocChange(rb);
3257
3258
0
  change->data.snapshot = snap;
3259
0
  change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
3260
3261
0
  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3262
0
}
3263
3264
/*
3265
 * Set up the transaction's base snapshot.
3266
 *
3267
 * If we know that xid is a subtransaction, set the base snapshot on the
3268
 * top-level transaction instead.
3269
 */
3270
void
3271
ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid,
3272
               XLogRecPtr lsn, Snapshot snap)
3273
0
{
3274
0
  ReorderBufferTXN *txn;
3275
0
  bool    is_new;
3276
3277
0
  Assert(snap != NULL);
3278
3279
  /*
3280
   * Fetch the transaction to operate on.  If we know it's a subtransaction,
3281
   * operate on its top-level transaction instead.
3282
   */
3283
0
  txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true);
3284
0
  if (rbtxn_is_known_subxact(txn))
3285
0
    txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3286
0
                  NULL, InvalidXLogRecPtr, false);
3287
0
  Assert(txn->base_snapshot == NULL);
3288
3289
0
  txn->base_snapshot = snap;
3290
0
  txn->base_snapshot_lsn = lsn;
3291
0
  dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node);
3292
3293
0
  AssertTXNLsnOrder(rb);
3294
0
}
3295
3296
/*
3297
 * Access the catalog with this CommandId at this point in the changestream.
3298
 *
3299
 * May only be called for command ids > 1
3300
 */
3301
void
3302
ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
3303
               XLogRecPtr lsn, CommandId cid)
3304
0
{
3305
0
  ReorderBufferChange *change = ReorderBufferAllocChange(rb);
3306
3307
0
  change->data.command_id = cid;
3308
0
  change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
3309
3310
0
  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3311
0
}
3312
3313
/*
3314
 * Update memory counters to account for the new or removed change.
3315
 *
3316
 * We update two counters - in the reorder buffer, and in the transaction
3317
 * containing the change. The reorder buffer counter allows us to quickly
3318
 * decide if we reached the memory limit, the transaction counter allows
3319
 * us to quickly pick the largest transaction for eviction.
3320
 *
3321
 * Either txn or change must be non-NULL at least. We update the memory
3322
 * counter of txn if it's non-NULL, otherwise change->txn.
3323
 *
3324
 * When streaming is enabled, we need to update the toplevel transaction
3325
 * counters instead - we don't really care about subtransactions as we
3326
 * can't stream them individually anyway, and we only pick toplevel
3327
 * transactions for eviction. So only toplevel transactions matter.
3328
 */
3329
static void
3330
ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
3331
                ReorderBufferChange *change,
3332
                ReorderBufferTXN *txn,
3333
                bool addition, Size sz)
3334
0
{
3335
0
  ReorderBufferTXN *toptxn;
3336
3337
0
  Assert(txn || change);
3338
3339
  /*
3340
   * Ignore tuple CID changes, because those are not evicted when reaching
3341
   * memory limit. So we just don't count them, because it might easily
3342
   * trigger a pointless attempt to spill.
3343
   */
3344
0
  if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
3345
0
    return;
3346
3347
0
  if (sz == 0)
3348
0
    return;
3349
3350
0
  if (txn == NULL)
3351
0
    txn = change->txn;
3352
0
  Assert(txn != NULL);
3353
3354
  /*
3355
   * Update the total size in top level as well. This is later used to
3356
   * compute the decoding stats.
3357
   */
3358
0
  toptxn = rbtxn_get_toptxn(txn);
3359
3360
0
  if (addition)
3361
0
  {
3362
0
    Size    oldsize = txn->size;
3363
3364
0
    txn->size += sz;
3365
0
    rb->size += sz;
3366
3367
    /* Update the total size in the top transaction. */
3368
0
    toptxn->total_size += sz;
3369
3370
    /* Update the max-heap */
3371
0
    if (oldsize != 0)
3372
0
      pairingheap_remove(rb->txn_heap, &txn->txn_node);
3373
0
    pairingheap_add(rb->txn_heap, &txn->txn_node);
3374
0
  }
3375
0
  else
3376
0
  {
3377
0
    Assert((rb->size >= sz) && (txn->size >= sz));
3378
0
    txn->size -= sz;
3379
0
    rb->size -= sz;
3380
3381
    /* Update the total size in the top transaction. */
3382
0
    toptxn->total_size -= sz;
3383
3384
    /* Update the max-heap */
3385
0
    pairingheap_remove(rb->txn_heap, &txn->txn_node);
3386
0
    if (txn->size != 0)
3387
0
      pairingheap_add(rb->txn_heap, &txn->txn_node);
3388
0
  }
3389
3390
0
  Assert(txn->size <= rb->size);
3391
0
}
3392
3393
/*
3394
 * Add new (relfilelocator, tid) -> (cmin, cmax) mappings.
3395
 *
3396
 * We do not include this change type in memory accounting, because we
3397
 * keep CIDs in a separate list and do not evict them when reaching
3398
 * the memory limit.
3399
 */
3400
void
3401
ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid,
3402
               XLogRecPtr lsn, RelFileLocator locator,
3403
               ItemPointerData tid, CommandId cmin,
3404
               CommandId cmax, CommandId combocid)
3405
0
{
3406
0
  ReorderBufferChange *change = ReorderBufferAllocChange(rb);
3407
0
  ReorderBufferTXN *txn;
3408
3409
0
  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3410
3411
0
  change->data.tuplecid.locator = locator;
3412
0
  change->data.tuplecid.tid = tid;
3413
0
  change->data.tuplecid.cmin = cmin;
3414
0
  change->data.tuplecid.cmax = cmax;
3415
0
  change->data.tuplecid.combocid = combocid;
3416
0
  change->lsn = lsn;
3417
0
  change->txn = txn;
3418
0
  change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID;
3419
3420
0
  dlist_push_tail(&txn->tuplecids, &change->node);
3421
0
  txn->ntuplecids++;
3422
0
}
3423
3424
/*
3425
 * Accumulate the invalidations for executing them later.
3426
 *
3427
 * This needs to be called for each XLOG_XACT_INVALIDATIONS message and
3428
 * accumulates all the invalidation messages in the toplevel transaction, if
3429
 * available, otherwise in the current transaction, as well as in the form of
3430
 * change in reorder buffer.  We require to record it in form of the change
3431
 * so that we can execute only the required invalidations instead of executing
3432
 * all the invalidations on each CommandId increment.  We also need to
3433
 * accumulate these in the txn buffer because in some cases where we skip
3434
 * processing the transaction (see ReorderBufferForget), we need to execute
3435
 * all the invalidations together.
3436
 */
3437
void
3438
ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
3439
                XLogRecPtr lsn, Size nmsgs,
3440
                SharedInvalidationMessage *msgs)
3441
0
{
3442
0
  ReorderBufferTXN *txn;
3443
0
  MemoryContext oldcontext;
3444
0
  ReorderBufferChange *change;
3445
3446
0
  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3447
3448
0
  oldcontext = MemoryContextSwitchTo(rb->context);
3449
3450
  /*
3451
   * Collect all the invalidations under the top transaction, if available,
3452
   * so that we can execute them all together.  See comments atop this
3453
   * function.
3454
   */
3455
0
  txn = rbtxn_get_toptxn(txn);
3456
3457
0
  Assert(nmsgs > 0);
3458
3459
  /* Accumulate invalidations. */
3460
0
  if (txn->ninvalidations == 0)
3461
0
  {
3462
0
    txn->ninvalidations = nmsgs;
3463
0
    txn->invalidations = (SharedInvalidationMessage *)
3464
0
      palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3465
0
    memcpy(txn->invalidations, msgs,
3466
0
         sizeof(SharedInvalidationMessage) * nmsgs);
3467
0
  }
3468
0
  else
3469
0
  {
3470
0
    txn->invalidations = (SharedInvalidationMessage *)
3471
0
      repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) *
3472
0
           (txn->ninvalidations + nmsgs));
3473
3474
0
    memcpy(txn->invalidations + txn->ninvalidations, msgs,
3475
0
         nmsgs * sizeof(SharedInvalidationMessage));
3476
0
    txn->ninvalidations += nmsgs;
3477
0
  }
3478
3479
0
  change = ReorderBufferAllocChange(rb);
3480
0
  change->action = REORDER_BUFFER_CHANGE_INVALIDATION;
3481
0
  change->data.inval.ninvalidations = nmsgs;
3482
0
  change->data.inval.invalidations = (SharedInvalidationMessage *)
3483
0
    palloc(sizeof(SharedInvalidationMessage) * nmsgs);
3484
0
  memcpy(change->data.inval.invalidations, msgs,
3485
0
       sizeof(SharedInvalidationMessage) * nmsgs);
3486
3487
0
  ReorderBufferQueueChange(rb, xid, lsn, change, false);
3488
3489
0
  MemoryContextSwitchTo(oldcontext);
3490
0
}
3491
3492
/*
3493
 * Apply all invalidations we know. Possibly we only need parts at this point
3494
 * in the changestream but we don't know which those are.
3495
 */
3496
static void
3497
ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs)
3498
0
{
3499
0
  int     i;
3500
3501
0
  for (i = 0; i < nmsgs; i++)
3502
0
    LocalExecuteInvalidationMessage(&msgs[i]);
3503
0
}
3504
3505
/*
3506
 * Mark a transaction as containing catalog changes
3507
 */
3508
void
3509
ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
3510
                  XLogRecPtr lsn)
3511
0
{
3512
0
  ReorderBufferTXN *txn;
3513
3514
0
  txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
3515
3516
0
  if (!rbtxn_has_catalog_changes(txn))
3517
0
  {
3518
0
    txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3519
0
    dclist_push_tail(&rb->catchange_txns, &txn->catchange_node);
3520
0
  }
3521
3522
  /*
3523
   * Mark top-level transaction as having catalog changes too if one of its
3524
   * children has so that the ReorderBufferBuildTupleCidHash can
3525
   * conveniently check just top-level transaction and decide whether to
3526
   * build the hash table or not.
3527
   */
3528
0
  if (rbtxn_is_subtxn(txn))
3529
0
  {
3530
0
    ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
3531
3532
0
    if (!rbtxn_has_catalog_changes(toptxn))
3533
0
    {
3534
0
      toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
3535
0
      dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
3536
0
    }
3537
0
  }
3538
0
}
3539
3540
/*
3541
 * Return palloc'ed array of the transactions that have changed catalogs.
3542
 * The returned array is sorted in xidComparator order.
3543
 *
3544
 * The caller must free the returned array when done with it.
3545
 */
3546
TransactionId *
3547
ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb)
3548
0
{
3549
0
  dlist_iter  iter;
3550
0
  TransactionId *xids = NULL;
3551
0
  size_t    xcnt = 0;
3552
3553
  /* Quick return if the list is empty */
3554
0
  if (dclist_count(&rb->catchange_txns) == 0)
3555
0
    return NULL;
3556
3557
  /* Initialize XID array */
3558
0
  xids = (TransactionId *) palloc(sizeof(TransactionId) *
3559
0
                  dclist_count(&rb->catchange_txns));
3560
0
  dclist_foreach(iter, &rb->catchange_txns)
3561
0
  {
3562
0
    ReorderBufferTXN *txn = dclist_container(ReorderBufferTXN,
3563
0
                         catchange_node,
3564
0
                         iter.cur);
3565
3566
0
    Assert(rbtxn_has_catalog_changes(txn));
3567
3568
0
    xids[xcnt++] = txn->xid;
3569
0
  }
3570
3571
0
  qsort(xids, xcnt, sizeof(TransactionId), xidComparator);
3572
3573
0
  Assert(xcnt == dclist_count(&rb->catchange_txns));
3574
0
  return xids;
3575
0
}
3576
3577
/*
3578
 * Query whether a transaction is already *known* to contain catalog
3579
 * changes. This can be wrong until directly before the commit!
3580
 */
3581
bool
3582
ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid)
3583
0
{
3584
0
  ReorderBufferTXN *txn;
3585
3586
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
3587
0
                false);
3588
0
  if (txn == NULL)
3589
0
    return false;
3590
3591
0
  return rbtxn_has_catalog_changes(txn);
3592
0
}
3593
3594
/*
3595
 * ReorderBufferXidHasBaseSnapshot
3596
 *    Have we already set the base snapshot for the given txn/subtxn?
3597
 */
3598
bool
3599
ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid)
3600
0
{
3601
0
  ReorderBufferTXN *txn;
3602
3603
0
  txn = ReorderBufferTXNByXid(rb, xid, false,
3604
0
                NULL, InvalidXLogRecPtr, false);
3605
3606
  /* transaction isn't known yet, ergo no snapshot */
3607
0
  if (txn == NULL)
3608
0
    return false;
3609
3610
  /* a known subtxn? operate on top-level txn instead */
3611
0
  if (rbtxn_is_known_subxact(txn))
3612
0
    txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false,
3613
0
                  NULL, InvalidXLogRecPtr, false);
3614
3615
0
  return txn->base_snapshot != NULL;
3616
0
}
3617
3618
3619
/*
3620
 * ---------------------------------------
3621
 * Disk serialization support
3622
 * ---------------------------------------
3623
 */
3624
3625
/*
3626
 * Ensure the IO buffer is >= sz.
3627
 */
3628
static void
3629
ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz)
3630
0
{
3631
0
  if (!rb->outbufsize)
3632
0
  {
3633
0
    rb->outbuf = MemoryContextAlloc(rb->context, sz);
3634
0
    rb->outbufsize = sz;
3635
0
  }
3636
0
  else if (rb->outbufsize < sz)
3637
0
  {
3638
0
    rb->outbuf = repalloc(rb->outbuf, sz);
3639
0
    rb->outbufsize = sz;
3640
0
  }
3641
0
}
3642
3643
3644
/* Compare two transactions by size */
3645
static int
3646
ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg)
3647
0
{
3648
0
  const ReorderBufferTXN *ta = pairingheap_const_container(ReorderBufferTXN, txn_node, a);
3649
0
  const ReorderBufferTXN *tb = pairingheap_const_container(ReorderBufferTXN, txn_node, b);
3650
3651
0
  if (ta->size < tb->size)
3652
0
    return -1;
3653
0
  if (ta->size > tb->size)
3654
0
    return 1;
3655
0
  return 0;
3656
0
}
3657
3658
/*
3659
 * Find the largest transaction (toplevel or subxact) to evict (spill to disk).
3660
 */
3661
static ReorderBufferTXN *
3662
ReorderBufferLargestTXN(ReorderBuffer *rb)
3663
0
{
3664
0
  ReorderBufferTXN *largest;
3665
3666
  /* Get the largest transaction from the max-heap */
3667
0
  largest = pairingheap_container(ReorderBufferTXN, txn_node,
3668
0
                  pairingheap_first(rb->txn_heap));
3669
3670
0
  Assert(largest);
3671
0
  Assert(largest->size > 0);
3672
0
  Assert(largest->size <= rb->size);
3673
3674
0
  return largest;
3675
0
}
3676
3677
/*
3678
 * Find the largest streamable (and non-aborted) toplevel transaction to evict
3679
 * (by streaming).
3680
 *
3681
 * This can be seen as an optimized version of ReorderBufferLargestTXN, which
3682
 * should give us the same transaction (because we don't update memory account
3683
 * for subtransaction with streaming, so it's always 0). But we can simply
3684
 * iterate over the limited number of toplevel transactions that have a base
3685
 * snapshot. There is no use of selecting a transaction that doesn't have base
3686
 * snapshot because we don't decode such transactions.  Also, we do not select
3687
 * the transaction which doesn't have any streamable change.
3688
 *
3689
 * Note that, we skip transactions that contain incomplete changes. There
3690
 * is a scope of optimization here such that we can select the largest
3691
 * transaction which has incomplete changes.  But that will make the code and
3692
 * design quite complex and that might not be worth the benefit.  If we plan to
3693
 * stream the transactions that contain incomplete changes then we need to
3694
 * find a way to partially stream/truncate the transaction changes in-memory
3695
 * and build a mechanism to partially truncate the spilled files.
3696
 * Additionally, whenever we partially stream the transaction we need to
3697
 * maintain the last streamed lsn and next time we need to restore from that
3698
 * segment and the offset in WAL.  As we stream the changes from the top
3699
 * transaction and restore them subtransaction wise, we need to even remember
3700
 * the subxact from where we streamed the last change.
3701
 */
3702
static ReorderBufferTXN *
3703
ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
3704
0
{
3705
0
  dlist_iter  iter;
3706
0
  Size    largest_size = 0;
3707
0
  ReorderBufferTXN *largest = NULL;
3708
3709
  /* Find the largest top-level transaction having a base snapshot. */
3710
0
  dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn)
3711
0
  {
3712
0
    ReorderBufferTXN *txn;
3713
3714
0
    txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur);
3715
3716
    /* must not be a subtxn */
3717
0
    Assert(!rbtxn_is_known_subxact(txn));
3718
    /* base_snapshot must be set */
3719
0
    Assert(txn->base_snapshot != NULL);
3720
3721
    /* Don't consider these kinds of transactions for eviction. */
3722
0
    if (rbtxn_has_partial_change(txn) ||
3723
0
      !rbtxn_has_streamable_change(txn) ||
3724
0
      rbtxn_is_aborted(txn))
3725
0
      continue;
3726
3727
    /* Find the largest of the eviction candidates. */
3728
0
    if ((largest == NULL || txn->total_size > largest_size) &&
3729
0
      (txn->total_size > 0))
3730
0
    {
3731
0
      largest = txn;
3732
0
      largest_size = txn->total_size;
3733
0
    }
3734
0
  }
3735
3736
0
  return largest;
3737
0
}
3738
3739
/*
3740
 * Check whether the logical_decoding_work_mem limit was reached, and if yes
3741
 * pick the largest (sub)transaction at-a-time to evict and spill its changes to
3742
 * disk or send to the output plugin until we reach under the memory limit.
3743
 *
3744
 * If debug_logical_replication_streaming is set to "immediate", stream or
3745
 * serialize the changes immediately.
3746
 *
3747
 * XXX At this point we select the transactions until we reach under the memory
3748
 * limit, but we might also adapt a more elaborate eviction strategy - for example
3749
 * evicting enough transactions to free certain fraction (e.g. 50%) of the memory
3750
 * limit.
3751
 */
3752
static void
3753
ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
3754
0
{
3755
0
  ReorderBufferTXN *txn;
3756
3757
  /*
3758
   * Bail out if debug_logical_replication_streaming is buffered and we
3759
   * haven't exceeded the memory limit.
3760
   */
3761
0
  if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED &&
3762
0
    rb->size < logical_decoding_work_mem * (Size) 1024)
3763
0
    return;
3764
3765
  /*
3766
   * If debug_logical_replication_streaming is immediate, loop until there's
3767
   * no change. Otherwise, loop until we reach under the memory limit. One
3768
   * might think that just by evicting the largest (sub)transaction we will
3769
   * come under the memory limit based on assumption that the selected
3770
   * transaction is at least as large as the most recent change (which
3771
   * caused us to go over the memory limit). However, that is not true
3772
   * because a user can reduce the logical_decoding_work_mem to a smaller
3773
   * value before the most recent change.
3774
   */
3775
0
  while (rb->size >= logical_decoding_work_mem * (Size) 1024 ||
3776
0
       (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE &&
3777
0
      rb->size > 0))
3778
0
  {
3779
    /*
3780
     * Pick the largest non-aborted transaction and evict it from memory
3781
     * by streaming, if possible.  Otherwise, spill to disk.
3782
     */
3783
0
    if (ReorderBufferCanStartStreaming(rb) &&
3784
0
      (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
3785
0
    {
3786
      /* we know there has to be one, because the size is not zero */
3787
0
      Assert(txn && rbtxn_is_toptxn(txn));
3788
0
      Assert(txn->total_size > 0);
3789
0
      Assert(rb->size >= txn->total_size);
3790
3791
      /* skip the transaction if aborted */
3792
0
      if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
3793
0
        continue;
3794
3795
0
      ReorderBufferStreamTXN(rb, txn);
3796
0
    }
3797
0
    else
3798
0
    {
3799
      /*
3800
       * Pick the largest transaction (or subtransaction) and evict it
3801
       * from memory by serializing it to disk.
3802
       */
3803
0
      txn = ReorderBufferLargestTXN(rb);
3804
3805
      /* we know there has to be one, because the size is not zero */
3806
0
      Assert(txn);
3807
0
      Assert(txn->size > 0);
3808
0
      Assert(rb->size >= txn->size);
3809
3810
      /* skip the transaction if aborted */
3811
0
      if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn))
3812
0
        continue;
3813
3814
0
      ReorderBufferSerializeTXN(rb, txn);
3815
0
    }
3816
3817
    /*
3818
     * After eviction, the transaction should have no entries in memory,
3819
     * and should use 0 bytes for changes.
3820
     */
3821
0
    Assert(txn->size == 0);
3822
0
    Assert(txn->nentries_mem == 0);
3823
0
  }
3824
3825
  /* We must be under the memory limit now. */
3826
0
  Assert(rb->size < logical_decoding_work_mem * (Size) 1024);
3827
0
}
3828
3829
/*
3830
 * Spill data of a large transaction (and its subtransactions) to disk.
3831
 */
3832
static void
3833
ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
3834
0
{
3835
0
  dlist_iter  subtxn_i;
3836
0
  dlist_mutable_iter change_i;
3837
0
  int     fd = -1;
3838
0
  XLogSegNo curOpenSegNo = 0;
3839
0
  Size    spilled = 0;
3840
0
  Size    size = txn->size;
3841
3842
0
  elog(DEBUG2, "spill %u changes in XID %u to disk",
3843
0
     (uint32) txn->nentries_mem, txn->xid);
3844
3845
  /* do the same to all child TXs */
3846
0
  dlist_foreach(subtxn_i, &txn->subtxns)
3847
0
  {
3848
0
    ReorderBufferTXN *subtxn;
3849
3850
0
    subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur);
3851
0
    ReorderBufferSerializeTXN(rb, subtxn);
3852
0
  }
3853
3854
  /* serialize changestream */
3855
0
  dlist_foreach_modify(change_i, &txn->changes)
3856
0
  {
3857
0
    ReorderBufferChange *change;
3858
3859
0
    change = dlist_container(ReorderBufferChange, node, change_i.cur);
3860
3861
    /*
3862
     * store in segment in which it belongs by start lsn, don't split over
3863
     * multiple segments tho
3864
     */
3865
0
    if (fd == -1 ||
3866
0
      !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size))
3867
0
    {
3868
0
      char    path[MAXPGPATH];
3869
3870
0
      if (fd != -1)
3871
0
        CloseTransientFile(fd);
3872
3873
0
      XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size);
3874
3875
      /*
3876
       * No need to care about TLIs here, only used during a single run,
3877
       * so each LSN only maps to a specific WAL record.
3878
       */
3879
0
      ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
3880
0
                    curOpenSegNo);
3881
3882
      /* open segment, create it if necessary */
3883
0
      fd = OpenTransientFile(path,
3884
0
                   O_CREAT | O_WRONLY | O_APPEND | PG_BINARY);
3885
3886
0
      if (fd < 0)
3887
0
        ereport(ERROR,
3888
0
            (errcode_for_file_access(),
3889
0
             errmsg("could not open file \"%s\": %m", path)));
3890
0
    }
3891
3892
0
    ReorderBufferSerializeChange(rb, txn, fd, change);
3893
0
    dlist_delete(&change->node);
3894
0
    ReorderBufferFreeChange(rb, change, false);
3895
3896
0
    spilled++;
3897
0
  }
3898
3899
  /* Update the memory counter */
3900
0
  ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size);
3901
3902
  /* update the statistics iff we have spilled anything */
3903
0
  if (spilled)
3904
0
  {
3905
0
    rb->spillCount += 1;
3906
0
    rb->spillBytes += size;
3907
3908
    /* don't consider already serialized transactions */
3909
0
    rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
3910
3911
    /* update the decoding stats */
3912
0
    UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
3913
0
  }
3914
3915
0
  Assert(spilled == txn->nentries_mem);
3916
0
  Assert(dlist_is_empty(&txn->changes));
3917
0
  txn->nentries_mem = 0;
3918
0
  txn->txn_flags |= RBTXN_IS_SERIALIZED;
3919
3920
0
  if (fd != -1)
3921
0
    CloseTransientFile(fd);
3922
0
}
3923
3924
/*
3925
 * Serialize individual change to disk.
3926
 */
3927
static void
3928
ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
3929
               int fd, ReorderBufferChange *change)
3930
0
{
3931
0
  ReorderBufferDiskChange *ondisk;
3932
0
  Size    sz = sizeof(ReorderBufferDiskChange);
3933
3934
0
  ReorderBufferSerializeReserve(rb, sz);
3935
3936
0
  ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3937
0
  memcpy(&ondisk->change, change, sizeof(ReorderBufferChange));
3938
3939
0
  switch (change->action)
3940
0
  {
3941
      /* fall through these, they're all similar enough */
3942
0
    case REORDER_BUFFER_CHANGE_INSERT:
3943
0
    case REORDER_BUFFER_CHANGE_UPDATE:
3944
0
    case REORDER_BUFFER_CHANGE_DELETE:
3945
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
3946
0
      {
3947
0
        char     *data;
3948
0
        HeapTuple oldtup,
3949
0
              newtup;
3950
0
        Size    oldlen = 0;
3951
0
        Size    newlen = 0;
3952
3953
0
        oldtup = change->data.tp.oldtuple;
3954
0
        newtup = change->data.tp.newtuple;
3955
3956
0
        if (oldtup)
3957
0
        {
3958
0
          sz += sizeof(HeapTupleData);
3959
0
          oldlen = oldtup->t_len;
3960
0
          sz += oldlen;
3961
0
        }
3962
3963
0
        if (newtup)
3964
0
        {
3965
0
          sz += sizeof(HeapTupleData);
3966
0
          newlen = newtup->t_len;
3967
0
          sz += newlen;
3968
0
        }
3969
3970
        /* make sure we have enough space */
3971
0
        ReorderBufferSerializeReserve(rb, sz);
3972
3973
0
        data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
3974
        /* might have been reallocated above */
3975
0
        ondisk = (ReorderBufferDiskChange *) rb->outbuf;
3976
3977
0
        if (oldlen)
3978
0
        {
3979
0
          memcpy(data, oldtup, sizeof(HeapTupleData));
3980
0
          data += sizeof(HeapTupleData);
3981
3982
0
          memcpy(data, oldtup->t_data, oldlen);
3983
0
          data += oldlen;
3984
0
        }
3985
3986
0
        if (newlen)
3987
0
        {
3988
0
          memcpy(data, newtup, sizeof(HeapTupleData));
3989
0
          data += sizeof(HeapTupleData);
3990
3991
0
          memcpy(data, newtup->t_data, newlen);
3992
0
          data += newlen;
3993
0
        }
3994
0
        break;
3995
0
      }
3996
0
    case REORDER_BUFFER_CHANGE_MESSAGE:
3997
0
      {
3998
0
        char     *data;
3999
0
        Size    prefix_size = strlen(change->data.msg.prefix) + 1;
4000
4001
0
        sz += prefix_size + change->data.msg.message_size +
4002
0
          sizeof(Size) + sizeof(Size);
4003
0
        ReorderBufferSerializeReserve(rb, sz);
4004
4005
0
        data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4006
4007
        /* might have been reallocated above */
4008
0
        ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4009
4010
        /* write the prefix including the size */
4011
0
        memcpy(data, &prefix_size, sizeof(Size));
4012
0
        data += sizeof(Size);
4013
0
        memcpy(data, change->data.msg.prefix,
4014
0
             prefix_size);
4015
0
        data += prefix_size;
4016
4017
        /* write the message including the size */
4018
0
        memcpy(data, &change->data.msg.message_size, sizeof(Size));
4019
0
        data += sizeof(Size);
4020
0
        memcpy(data, change->data.msg.message,
4021
0
             change->data.msg.message_size);
4022
0
        data += change->data.msg.message_size;
4023
4024
0
        break;
4025
0
      }
4026
0
    case REORDER_BUFFER_CHANGE_INVALIDATION:
4027
0
      {
4028
0
        char     *data;
4029
0
        Size    inval_size = sizeof(SharedInvalidationMessage) *
4030
0
          change->data.inval.ninvalidations;
4031
4032
0
        sz += inval_size;
4033
4034
0
        ReorderBufferSerializeReserve(rb, sz);
4035
0
        data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4036
4037
        /* might have been reallocated above */
4038
0
        ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4039
0
        memcpy(data, change->data.inval.invalidations, inval_size);
4040
0
        data += inval_size;
4041
4042
0
        break;
4043
0
      }
4044
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4045
0
      {
4046
0
        Snapshot  snap;
4047
0
        char     *data;
4048
4049
0
        snap = change->data.snapshot;
4050
4051
0
        sz += sizeof(SnapshotData) +
4052
0
          sizeof(TransactionId) * snap->xcnt +
4053
0
          sizeof(TransactionId) * snap->subxcnt;
4054
4055
        /* make sure we have enough space */
4056
0
        ReorderBufferSerializeReserve(rb, sz);
4057
0
        data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4058
        /* might have been reallocated above */
4059
0
        ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4060
4061
0
        memcpy(data, snap, sizeof(SnapshotData));
4062
0
        data += sizeof(SnapshotData);
4063
4064
0
        if (snap->xcnt)
4065
0
        {
4066
0
          memcpy(data, snap->xip,
4067
0
               sizeof(TransactionId) * snap->xcnt);
4068
0
          data += sizeof(TransactionId) * snap->xcnt;
4069
0
        }
4070
4071
0
        if (snap->subxcnt)
4072
0
        {
4073
0
          memcpy(data, snap->subxip,
4074
0
               sizeof(TransactionId) * snap->subxcnt);
4075
0
          data += sizeof(TransactionId) * snap->subxcnt;
4076
0
        }
4077
0
        break;
4078
0
      }
4079
0
    case REORDER_BUFFER_CHANGE_TRUNCATE:
4080
0
      {
4081
0
        Size    size;
4082
0
        char     *data;
4083
4084
        /* account for the OIDs of truncated relations */
4085
0
        size = sizeof(Oid) * change->data.truncate.nrelids;
4086
0
        sz += size;
4087
4088
        /* make sure we have enough space */
4089
0
        ReorderBufferSerializeReserve(rb, sz);
4090
4091
0
        data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange);
4092
        /* might have been reallocated above */
4093
0
        ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4094
4095
0
        memcpy(data, change->data.truncate.relids, size);
4096
0
        data += size;
4097
4098
0
        break;
4099
0
      }
4100
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4101
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4102
0
    case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4103
0
    case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4104
      /* ReorderBufferChange contains everything important */
4105
0
      break;
4106
0
  }
4107
4108
0
  ondisk->size = sz;
4109
4110
0
  errno = 0;
4111
0
  pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE);
4112
0
  if (write(fd, rb->outbuf, ondisk->size) != ondisk->size)
4113
0
  {
4114
0
    int     save_errno = errno;
4115
4116
0
    CloseTransientFile(fd);
4117
4118
    /* if write didn't set errno, assume problem is no disk space */
4119
0
    errno = save_errno ? save_errno : ENOSPC;
4120
0
    ereport(ERROR,
4121
0
        (errcode_for_file_access(),
4122
0
         errmsg("could not write to data file for XID %u: %m",
4123
0
            txn->xid)));
4124
0
  }
4125
0
  pgstat_report_wait_end();
4126
4127
  /*
4128
   * Keep the transaction's final_lsn up to date with each change we send to
4129
   * disk, so that ReorderBufferRestoreCleanup works correctly.  (We used to
4130
   * only do this on commit and abort records, but that doesn't work if a
4131
   * system crash leaves a transaction without its abort record).
4132
   *
4133
   * Make sure not to move it backwards.
4134
   */
4135
0
  if (txn->final_lsn < change->lsn)
4136
0
    txn->final_lsn = change->lsn;
4137
4138
0
  Assert(ondisk->change.action == change->action);
4139
0
}
4140
4141
/* Returns true, if the output plugin supports streaming, false, otherwise. */
4142
static inline bool
4143
ReorderBufferCanStream(ReorderBuffer *rb)
4144
0
{
4145
0
  LogicalDecodingContext *ctx = rb->private_data;
4146
4147
0
  return ctx->streaming;
4148
0
}
4149
4150
/* Returns true, if the streaming can be started now, false, otherwise. */
4151
static inline bool
4152
ReorderBufferCanStartStreaming(ReorderBuffer *rb)
4153
0
{
4154
0
  LogicalDecodingContext *ctx = rb->private_data;
4155
0
  SnapBuild  *builder = ctx->snapshot_builder;
4156
4157
  /* We can't start streaming unless a consistent state is reached. */
4158
0
  if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT)
4159
0
    return false;
4160
4161
  /*
4162
   * We can't start streaming immediately even if the streaming is enabled
4163
   * because we previously decoded this transaction and now just are
4164
   * restarting.
4165
   */
4166
0
  if (ReorderBufferCanStream(rb) &&
4167
0
    !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
4168
0
    return true;
4169
4170
0
  return false;
4171
0
}
4172
4173
/*
4174
 * Send data of a large transaction (and its subtransactions) to the
4175
 * output plugin, but using the stream API.
4176
 */
4177
static void
4178
ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
4179
0
{
4180
0
  Snapshot  snapshot_now;
4181
0
  CommandId command_id;
4182
0
  Size    stream_bytes;
4183
0
  bool    txn_is_streamed;
4184
4185
  /* We can never reach here for a subtransaction. */
4186
0
  Assert(rbtxn_is_toptxn(txn));
4187
4188
  /*
4189
   * We can't make any assumptions about base snapshot here, similar to what
4190
   * ReorderBufferCommit() does. That relies on base_snapshot getting
4191
   * transferred from subxact in ReorderBufferCommitChild(), but that was
4192
   * not yet called as the transaction is in-progress.
4193
   *
4194
   * So just walk the subxacts and use the same logic here. But we only need
4195
   * to do that once, when the transaction is streamed for the first time.
4196
   * After that we need to reuse the snapshot from the previous run.
4197
   *
4198
   * Unlike DecodeCommit which adds xids of all the subtransactions in
4199
   * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but
4200
   * we do add them to subxip array instead via ReorderBufferCopySnap. This
4201
   * allows the catalog changes made in subtransactions decoded till now to
4202
   * be visible.
4203
   */
4204
0
  if (txn->snapshot_now == NULL)
4205
0
  {
4206
0
    dlist_iter  subxact_i;
4207
4208
    /* make sure this transaction is streamed for the first time */
4209
0
    Assert(!rbtxn_is_streamed(txn));
4210
4211
    /* at the beginning we should have invalid command ID */
4212
0
    Assert(txn->command_id == InvalidCommandId);
4213
4214
0
    dlist_foreach(subxact_i, &txn->subtxns)
4215
0
    {
4216
0
      ReorderBufferTXN *subtxn;
4217
4218
0
      subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
4219
0
      ReorderBufferTransferSnapToParent(txn, subtxn);
4220
0
    }
4221
4222
    /*
4223
     * If this transaction has no snapshot, it didn't make any changes to
4224
     * the database till now, so there's nothing to decode.
4225
     */
4226
0
    if (txn->base_snapshot == NULL)
4227
0
    {
4228
0
      Assert(txn->ninvalidations == 0);
4229
0
      return;
4230
0
    }
4231
4232
0
    command_id = FirstCommandId;
4233
0
    snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
4234
0
                       txn, command_id);
4235
0
  }
4236
0
  else
4237
0
  {
4238
    /* the transaction must have been already streamed */
4239
0
    Assert(rbtxn_is_streamed(txn));
4240
4241
    /*
4242
     * Nah, we already have snapshot from the previous streaming run. We
4243
     * assume new subxacts can't move the LSN backwards, and so can't beat
4244
     * the LSN condition in the previous branch (so no need to walk
4245
     * through subxacts again). In fact, we must not do that as we may be
4246
     * using snapshot half-way through the subxact.
4247
     */
4248
0
    command_id = txn->command_id;
4249
4250
    /*
4251
     * We can't use txn->snapshot_now directly because after the last
4252
     * streaming run, we might have got some new sub-transactions. So we
4253
     * need to add them to the snapshot.
4254
     */
4255
0
    snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
4256
0
                       txn, command_id);
4257
4258
    /* Free the previously copied snapshot. */
4259
0
    Assert(txn->snapshot_now->copied);
4260
0
    ReorderBufferFreeSnap(rb, txn->snapshot_now);
4261
0
    txn->snapshot_now = NULL;
4262
0
  }
4263
4264
  /*
4265
   * Remember this information to be used later to update stats. We can't
4266
   * update the stats here as an error while processing the changes would
4267
   * lead to the accumulation of stats even though we haven't streamed all
4268
   * the changes.
4269
   */
4270
0
  txn_is_streamed = rbtxn_is_streamed(txn);
4271
0
  stream_bytes = txn->total_size;
4272
4273
  /* Process and send the changes to output plugin. */
4274
0
  ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
4275
0
              command_id, true);
4276
4277
0
  rb->streamCount += 1;
4278
0
  rb->streamBytes += stream_bytes;
4279
4280
  /* Don't consider already streamed transaction. */
4281
0
  rb->streamTxns += (txn_is_streamed) ? 0 : 1;
4282
4283
  /* update the decoding stats */
4284
0
  UpdateDecodingStats((LogicalDecodingContext *) rb->private_data);
4285
4286
0
  Assert(dlist_is_empty(&txn->changes));
4287
0
  Assert(txn->nentries == 0);
4288
0
  Assert(txn->nentries_mem == 0);
4289
0
}
4290
4291
/*
4292
 * Size of a change in memory.
4293
 */
4294
static Size
4295
ReorderBufferChangeSize(ReorderBufferChange *change)
4296
0
{
4297
0
  Size    sz = sizeof(ReorderBufferChange);
4298
4299
0
  switch (change->action)
4300
0
  {
4301
      /* fall through these, they're all similar enough */
4302
0
    case REORDER_BUFFER_CHANGE_INSERT:
4303
0
    case REORDER_BUFFER_CHANGE_UPDATE:
4304
0
    case REORDER_BUFFER_CHANGE_DELETE:
4305
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
4306
0
      {
4307
0
        HeapTuple oldtup,
4308
0
              newtup;
4309
0
        Size    oldlen = 0;
4310
0
        Size    newlen = 0;
4311
4312
0
        oldtup = change->data.tp.oldtuple;
4313
0
        newtup = change->data.tp.newtuple;
4314
4315
0
        if (oldtup)
4316
0
        {
4317
0
          sz += sizeof(HeapTupleData);
4318
0
          oldlen = oldtup->t_len;
4319
0
          sz += oldlen;
4320
0
        }
4321
4322
0
        if (newtup)
4323
0
        {
4324
0
          sz += sizeof(HeapTupleData);
4325
0
          newlen = newtup->t_len;
4326
0
          sz += newlen;
4327
0
        }
4328
4329
0
        break;
4330
0
      }
4331
0
    case REORDER_BUFFER_CHANGE_MESSAGE:
4332
0
      {
4333
0
        Size    prefix_size = strlen(change->data.msg.prefix) + 1;
4334
4335
0
        sz += prefix_size + change->data.msg.message_size +
4336
0
          sizeof(Size) + sizeof(Size);
4337
4338
0
        break;
4339
0
      }
4340
0
    case REORDER_BUFFER_CHANGE_INVALIDATION:
4341
0
      {
4342
0
        sz += sizeof(SharedInvalidationMessage) *
4343
0
          change->data.inval.ninvalidations;
4344
0
        break;
4345
0
      }
4346
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4347
0
      {
4348
0
        Snapshot  snap;
4349
4350
0
        snap = change->data.snapshot;
4351
4352
0
        sz += sizeof(SnapshotData) +
4353
0
          sizeof(TransactionId) * snap->xcnt +
4354
0
          sizeof(TransactionId) * snap->subxcnt;
4355
4356
0
        break;
4357
0
      }
4358
0
    case REORDER_BUFFER_CHANGE_TRUNCATE:
4359
0
      {
4360
0
        sz += sizeof(Oid) * change->data.truncate.nrelids;
4361
4362
0
        break;
4363
0
      }
4364
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4365
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4366
0
    case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4367
0
    case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4368
      /* ReorderBufferChange contains everything important */
4369
0
      break;
4370
0
  }
4371
4372
0
  return sz;
4373
0
}
4374
4375
4376
/*
4377
 * Restore a number of changes spilled to disk back into memory.
4378
 */
4379
static Size
4380
ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
4381
              TXNEntryFile *file, XLogSegNo *segno)
4382
0
{
4383
0
  Size    restored = 0;
4384
0
  XLogSegNo last_segno;
4385
0
  dlist_mutable_iter cleanup_iter;
4386
0
  File     *fd = &file->vfd;
4387
4388
0
  Assert(txn->first_lsn != InvalidXLogRecPtr);
4389
0
  Assert(txn->final_lsn != InvalidXLogRecPtr);
4390
4391
  /* free current entries, so we have memory for more */
4392
0
  dlist_foreach_modify(cleanup_iter, &txn->changes)
4393
0
  {
4394
0
    ReorderBufferChange *cleanup =
4395
0
      dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
4396
4397
0
    dlist_delete(&cleanup->node);
4398
0
    ReorderBufferFreeChange(rb, cleanup, true);
4399
0
  }
4400
0
  txn->nentries_mem = 0;
4401
0
  Assert(dlist_is_empty(&txn->changes));
4402
4403
0
  XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size);
4404
4405
0
  while (restored < max_changes_in_memory && *segno <= last_segno)
4406
0
  {
4407
0
    int     readBytes;
4408
0
    ReorderBufferDiskChange *ondisk;
4409
4410
0
    CHECK_FOR_INTERRUPTS();
4411
4412
0
    if (*fd == -1)
4413
0
    {
4414
0
      char    path[MAXPGPATH];
4415
4416
      /* first time in */
4417
0
      if (*segno == 0)
4418
0
        XLByteToSeg(txn->first_lsn, *segno, wal_segment_size);
4419
4420
0
      Assert(*segno != 0 || dlist_is_empty(&txn->changes));
4421
4422
      /*
4423
       * No need to care about TLIs here, only used during a single run,
4424
       * so each LSN only maps to a specific WAL record.
4425
       */
4426
0
      ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid,
4427
0
                    *segno);
4428
4429
0
      *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY);
4430
4431
      /* No harm in resetting the offset even in case of failure */
4432
0
      file->curOffset = 0;
4433
4434
0
      if (*fd < 0 && errno == ENOENT)
4435
0
      {
4436
0
        *fd = -1;
4437
0
        (*segno)++;
4438
0
        continue;
4439
0
      }
4440
0
      else if (*fd < 0)
4441
0
        ereport(ERROR,
4442
0
            (errcode_for_file_access(),
4443
0
             errmsg("could not open file \"%s\": %m",
4444
0
                path)));
4445
0
    }
4446
4447
    /*
4448
     * Read the statically sized part of a change which has information
4449
     * about the total size. If we couldn't read a record, we're at the
4450
     * end of this file.
4451
     */
4452
0
    ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange));
4453
0
    readBytes = FileRead(file->vfd, rb->outbuf,
4454
0
               sizeof(ReorderBufferDiskChange),
4455
0
               file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ);
4456
4457
    /* eof */
4458
0
    if (readBytes == 0)
4459
0
    {
4460
0
      FileClose(*fd);
4461
0
      *fd = -1;
4462
0
      (*segno)++;
4463
0
      continue;
4464
0
    }
4465
0
    else if (readBytes < 0)
4466
0
      ereport(ERROR,
4467
0
          (errcode_for_file_access(),
4468
0
           errmsg("could not read from reorderbuffer spill file: %m")));
4469
0
    else if (readBytes != sizeof(ReorderBufferDiskChange))
4470
0
      ereport(ERROR,
4471
0
          (errcode_for_file_access(),
4472
0
           errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4473
0
              readBytes,
4474
0
              (uint32) sizeof(ReorderBufferDiskChange))));
4475
4476
0
    file->curOffset += readBytes;
4477
4478
0
    ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4479
4480
0
    ReorderBufferSerializeReserve(rb,
4481
0
                    sizeof(ReorderBufferDiskChange) + ondisk->size);
4482
0
    ondisk = (ReorderBufferDiskChange *) rb->outbuf;
4483
4484
0
    readBytes = FileRead(file->vfd,
4485
0
               rb->outbuf + sizeof(ReorderBufferDiskChange),
4486
0
               ondisk->size - sizeof(ReorderBufferDiskChange),
4487
0
               file->curOffset,
4488
0
               WAIT_EVENT_REORDER_BUFFER_READ);
4489
4490
0
    if (readBytes < 0)
4491
0
      ereport(ERROR,
4492
0
          (errcode_for_file_access(),
4493
0
           errmsg("could not read from reorderbuffer spill file: %m")));
4494
0
    else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange))
4495
0
      ereport(ERROR,
4496
0
          (errcode_for_file_access(),
4497
0
           errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes",
4498
0
              readBytes,
4499
0
              (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange)))));
4500
4501
0
    file->curOffset += readBytes;
4502
4503
    /*
4504
     * ok, read a full change from disk, now restore it into proper
4505
     * in-memory format
4506
     */
4507
0
    ReorderBufferRestoreChange(rb, txn, rb->outbuf);
4508
0
    restored++;
4509
0
  }
4510
4511
0
  return restored;
4512
0
}
4513
4514
/*
4515
 * Convert change from its on-disk format to in-memory format and queue it onto
4516
 * the TXN's ->changes list.
4517
 *
4518
 * Note: although "data" is declared char*, at entry it points to a
4519
 * maxalign'd buffer, making it safe in most of this function to assume
4520
 * that the pointed-to data is suitably aligned for direct access.
4521
 */
4522
static void
4523
ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
4524
               char *data)
4525
0
{
4526
0
  ReorderBufferDiskChange *ondisk;
4527
0
  ReorderBufferChange *change;
4528
4529
0
  ondisk = (ReorderBufferDiskChange *) data;
4530
4531
0
  change = ReorderBufferAllocChange(rb);
4532
4533
  /* copy static part */
4534
0
  memcpy(change, &ondisk->change, sizeof(ReorderBufferChange));
4535
4536
0
  data += sizeof(ReorderBufferDiskChange);
4537
4538
  /* restore individual stuff */
4539
0
  switch (change->action)
4540
0
  {
4541
      /* fall through these, they're all similar enough */
4542
0
    case REORDER_BUFFER_CHANGE_INSERT:
4543
0
    case REORDER_BUFFER_CHANGE_UPDATE:
4544
0
    case REORDER_BUFFER_CHANGE_DELETE:
4545
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT:
4546
0
      if (change->data.tp.oldtuple)
4547
0
      {
4548
0
        uint32    tuplelen = ((HeapTuple) data)->t_len;
4549
4550
0
        change->data.tp.oldtuple =
4551
0
          ReorderBufferAllocTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4552
4553
        /* restore ->tuple */
4554
0
        memcpy(change->data.tp.oldtuple, data,
4555
0
             sizeof(HeapTupleData));
4556
0
        data += sizeof(HeapTupleData);
4557
4558
        /* reset t_data pointer into the new tuplebuf */
4559
0
        change->data.tp.oldtuple->t_data =
4560
0
          (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE);
4561
4562
        /* restore tuple data itself */
4563
0
        memcpy(change->data.tp.oldtuple->t_data, data, tuplelen);
4564
0
        data += tuplelen;
4565
0
      }
4566
4567
0
      if (change->data.tp.newtuple)
4568
0
      {
4569
        /* here, data might not be suitably aligned! */
4570
0
        uint32    tuplelen;
4571
4572
0
        memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len),
4573
0
             sizeof(uint32));
4574
4575
0
        change->data.tp.newtuple =
4576
0
          ReorderBufferAllocTupleBuf(rb, tuplelen - SizeofHeapTupleHeader);
4577
4578
        /* restore ->tuple */
4579
0
        memcpy(change->data.tp.newtuple, data,
4580
0
             sizeof(HeapTupleData));
4581
0
        data += sizeof(HeapTupleData);
4582
4583
        /* reset t_data pointer into the new tuplebuf */
4584
0
        change->data.tp.newtuple->t_data =
4585
0
          (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE);
4586
4587
        /* restore tuple data itself */
4588
0
        memcpy(change->data.tp.newtuple->t_data, data, tuplelen);
4589
0
        data += tuplelen;
4590
0
      }
4591
4592
0
      break;
4593
0
    case REORDER_BUFFER_CHANGE_MESSAGE:
4594
0
      {
4595
0
        Size    prefix_size;
4596
4597
        /* read prefix */
4598
0
        memcpy(&prefix_size, data, sizeof(Size));
4599
0
        data += sizeof(Size);
4600
0
        change->data.msg.prefix = MemoryContextAlloc(rb->context,
4601
0
                               prefix_size);
4602
0
        memcpy(change->data.msg.prefix, data, prefix_size);
4603
0
        Assert(change->data.msg.prefix[prefix_size - 1] == '\0');
4604
0
        data += prefix_size;
4605
4606
        /* read the message */
4607
0
        memcpy(&change->data.msg.message_size, data, sizeof(Size));
4608
0
        data += sizeof(Size);
4609
0
        change->data.msg.message = MemoryContextAlloc(rb->context,
4610
0
                                change->data.msg.message_size);
4611
0
        memcpy(change->data.msg.message, data,
4612
0
             change->data.msg.message_size);
4613
0
        data += change->data.msg.message_size;
4614
4615
0
        break;
4616
0
      }
4617
0
    case REORDER_BUFFER_CHANGE_INVALIDATION:
4618
0
      {
4619
0
        Size    inval_size = sizeof(SharedInvalidationMessage) *
4620
0
          change->data.inval.ninvalidations;
4621
4622
0
        change->data.inval.invalidations =
4623
0
          MemoryContextAlloc(rb->context, inval_size);
4624
4625
        /* read the message */
4626
0
        memcpy(change->data.inval.invalidations, data, inval_size);
4627
4628
0
        break;
4629
0
      }
4630
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
4631
0
      {
4632
0
        Snapshot  oldsnap;
4633
0
        Snapshot  newsnap;
4634
0
        Size    size;
4635
4636
0
        oldsnap = (Snapshot) data;
4637
4638
0
        size = sizeof(SnapshotData) +
4639
0
          sizeof(TransactionId) * oldsnap->xcnt +
4640
0
          sizeof(TransactionId) * (oldsnap->subxcnt + 0);
4641
4642
0
        change->data.snapshot = MemoryContextAllocZero(rb->context, size);
4643
4644
0
        newsnap = change->data.snapshot;
4645
4646
0
        memcpy(newsnap, data, size);
4647
0
        newsnap->xip = (TransactionId *)
4648
0
          (((char *) newsnap) + sizeof(SnapshotData));
4649
0
        newsnap->subxip = newsnap->xip + newsnap->xcnt;
4650
0
        newsnap->copied = true;
4651
0
        break;
4652
0
      }
4653
      /* the base struct contains all the data, easy peasy */
4654
0
    case REORDER_BUFFER_CHANGE_TRUNCATE:
4655
0
      {
4656
0
        Oid      *relids;
4657
4658
0
        relids = ReorderBufferAllocRelids(rb, change->data.truncate.nrelids);
4659
0
        memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid));
4660
0
        change->data.truncate.relids = relids;
4661
4662
0
        break;
4663
0
      }
4664
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
4665
0
    case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT:
4666
0
    case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID:
4667
0
    case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID:
4668
0
      break;
4669
0
  }
4670
4671
0
  dlist_push_tail(&txn->changes, &change->node);
4672
0
  txn->nentries_mem++;
4673
4674
  /*
4675
   * Update memory accounting for the restored change.  We need to do this
4676
   * although we don't check the memory limit when restoring the changes in
4677
   * this branch (we only do that when initially queueing the changes after
4678
   * decoding), because we will release the changes later, and that will
4679
   * update the accounting too (subtracting the size from the counters). And
4680
   * we don't want to underflow there.
4681
   */
4682
0
  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
4683
0
                  ReorderBufferChangeSize(change));
4684
0
}
4685
4686
/*
4687
 * Remove all on-disk stored for the passed in transaction.
4688
 */
4689
static void
4690
ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn)
4691
0
{
4692
0
  XLogSegNo first;
4693
0
  XLogSegNo cur;
4694
0
  XLogSegNo last;
4695
4696
0
  Assert(txn->first_lsn != InvalidXLogRecPtr);
4697
0
  Assert(txn->final_lsn != InvalidXLogRecPtr);
4698
4699
0
  XLByteToSeg(txn->first_lsn, first, wal_segment_size);
4700
0
  XLByteToSeg(txn->final_lsn, last, wal_segment_size);
4701
4702
  /* iterate over all possible filenames, and delete them */
4703
0
  for (cur = first; cur <= last; cur++)
4704
0
  {
4705
0
    char    path[MAXPGPATH];
4706
4707
0
    ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur);
4708
0
    if (unlink(path) != 0 && errno != ENOENT)
4709
0
      ereport(ERROR,
4710
0
          (errcode_for_file_access(),
4711
0
           errmsg("could not remove file \"%s\": %m", path)));
4712
0
  }
4713
0
}
4714
4715
/*
4716
 * Remove any leftover serialized reorder buffers from a slot directory after a
4717
 * prior crash or decoding session exit.
4718
 */
4719
static void
4720
ReorderBufferCleanupSerializedTXNs(const char *slotname)
4721
0
{
4722
0
  DIR      *spill_dir;
4723
0
  struct dirent *spill_de;
4724
0
  struct stat statbuf;
4725
0
  char    path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)];
4726
4727
0
  sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname);
4728
4729
  /* we're only handling directories here, skip if it's not ours */
4730
0
  if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode))
4731
0
    return;
4732
4733
0
  spill_dir = AllocateDir(path);
4734
0
  while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL)
4735
0
  {
4736
    /* only look at names that can be ours */
4737
0
    if (strncmp(spill_de->d_name, "xid", 3) == 0)
4738
0
    {
4739
0
      snprintf(path, sizeof(path),
4740
0
           "%s/%s/%s", PG_REPLSLOT_DIR, slotname,
4741
0
           spill_de->d_name);
4742
4743
0
      if (unlink(path) != 0)
4744
0
        ereport(ERROR,
4745
0
            (errcode_for_file_access(),
4746
0
             errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m",
4747
0
                path, PG_REPLSLOT_DIR, slotname)));
4748
0
    }
4749
0
  }
4750
0
  FreeDir(spill_dir);
4751
0
}
4752
4753
/*
4754
 * Given a replication slot, transaction ID and segment number, fill in the
4755
 * corresponding spill file into 'path', which is a caller-owned buffer of size
4756
 * at least MAXPGPATH.
4757
 */
4758
static void
4759
ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid,
4760
              XLogSegNo segno)
4761
0
{
4762
0
  XLogRecPtr  recptr;
4763
4764
0
  XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr);
4765
4766
0
  snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill",
4767
0
       PG_REPLSLOT_DIR,
4768
0
       NameStr(MyReplicationSlot->data.name),
4769
0
       xid, LSN_FORMAT_ARGS(recptr));
4770
0
}
4771
4772
/*
4773
 * Delete all data spilled to disk after we've restarted/crashed. It will be
4774
 * recreated when the respective slots are reused.
4775
 */
4776
void
4777
StartupReorderBuffer(void)
4778
0
{
4779
0
  DIR      *logical_dir;
4780
0
  struct dirent *logical_de;
4781
4782
0
  logical_dir = AllocateDir(PG_REPLSLOT_DIR);
4783
0
  while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL)
4784
0
  {
4785
0
    if (strcmp(logical_de->d_name, ".") == 0 ||
4786
0
      strcmp(logical_de->d_name, "..") == 0)
4787
0
      continue;
4788
4789
    /* if it cannot be a slot, skip the directory */
4790
0
    if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
4791
0
      continue;
4792
4793
    /*
4794
     * ok, has to be a surviving logical slot, iterate and delete
4795
     * everything starting with xid-*
4796
     */
4797
0
    ReorderBufferCleanupSerializedTXNs(logical_de->d_name);
4798
0
  }
4799
0
  FreeDir(logical_dir);
4800
0
}
4801
4802
/* ---------------------------------------
4803
 * toast reassembly support
4804
 * ---------------------------------------
4805
 */
4806
4807
/*
4808
 * Initialize per tuple toast reconstruction support.
4809
 */
4810
static void
4811
ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn)
4812
0
{
4813
0
  HASHCTL   hash_ctl;
4814
4815
0
  Assert(txn->toast_hash == NULL);
4816
4817
0
  hash_ctl.keysize = sizeof(Oid);
4818
0
  hash_ctl.entrysize = sizeof(ReorderBufferToastEnt);
4819
0
  hash_ctl.hcxt = rb->context;
4820
0
  txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl,
4821
0
                  HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
4822
0
}
4823
4824
/*
4825
 * Per toast-chunk handling for toast reconstruction
4826
 *
4827
 * Appends a toast chunk so we can reconstruct it when the tuple "owning" the
4828
 * toasted Datum comes along.
4829
 */
4830
static void
4831
ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn,
4832
                Relation relation, ReorderBufferChange *change)
4833
0
{
4834
0
  ReorderBufferToastEnt *ent;
4835
0
  HeapTuple newtup;
4836
0
  bool    found;
4837
0
  int32   chunksize;
4838
0
  bool    isnull;
4839
0
  Pointer   chunk;
4840
0
  TupleDesc desc = RelationGetDescr(relation);
4841
0
  Oid     chunk_id;
4842
0
  int32   chunk_seq;
4843
4844
0
  if (txn->toast_hash == NULL)
4845
0
    ReorderBufferToastInitHash(rb, txn);
4846
4847
0
  Assert(IsToastRelation(relation));
4848
4849
0
  newtup = change->data.tp.newtuple;
4850
0
  chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull));
4851
0
  Assert(!isnull);
4852
0
  chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull));
4853
0
  Assert(!isnull);
4854
4855
0
  ent = (ReorderBufferToastEnt *)
4856
0
    hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found);
4857
4858
0
  if (!found)
4859
0
  {
4860
0
    Assert(ent->chunk_id == chunk_id);
4861
0
    ent->num_chunks = 0;
4862
0
    ent->last_chunk_seq = 0;
4863
0
    ent->size = 0;
4864
0
    ent->reconstructed = NULL;
4865
0
    dlist_init(&ent->chunks);
4866
4867
0
    if (chunk_seq != 0)
4868
0
      elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0",
4869
0
         chunk_seq, chunk_id);
4870
0
  }
4871
0
  else if (found && chunk_seq != ent->last_chunk_seq + 1)
4872
0
    elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d",
4873
0
       chunk_seq, chunk_id, ent->last_chunk_seq + 1);
4874
4875
0
  chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull));
4876
0
  Assert(!isnull);
4877
4878
  /* calculate size so we can allocate the right size at once later */
4879
0
  if (!VARATT_IS_EXTENDED(chunk))
4880
0
    chunksize = VARSIZE(chunk) - VARHDRSZ;
4881
0
  else if (VARATT_IS_SHORT(chunk))
4882
    /* could happen due to heap_form_tuple doing its thing */
4883
0
    chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT;
4884
0
  else
4885
0
    elog(ERROR, "unexpected type of toast chunk");
4886
4887
0
  ent->size += chunksize;
4888
0
  ent->last_chunk_seq = chunk_seq;
4889
0
  ent->num_chunks++;
4890
0
  dlist_push_tail(&ent->chunks, &change->node);
4891
0
}
4892
4893
/*
4894
 * Rejigger change->newtuple to point to in-memory toast tuples instead of
4895
 * on-disk toast tuples that may no longer exist (think DROP TABLE or VACUUM).
4896
 *
4897
 * We cannot replace unchanged toast tuples though, so those will still point
4898
 * to on-disk toast data.
4899
 *
4900
 * While updating the existing change with detoasted tuple data, we need to
4901
 * update the memory accounting info, because the change size will differ.
4902
 * Otherwise the accounting may get out of sync, triggering serialization
4903
 * at unexpected times.
4904
 *
4905
 * We simply subtract size of the change before rejiggering the tuple, and
4906
 * then add the new size. This makes it look like the change was removed
4907
 * and then added back, except it only tweaks the accounting info.
4908
 *
4909
 * In particular it can't trigger serialization, which would be pointless
4910
 * anyway as it happens during commit processing right before handing
4911
 * the change to the output plugin.
4912
 */
4913
static void
4914
ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn,
4915
              Relation relation, ReorderBufferChange *change)
4916
0
{
4917
0
  TupleDesc desc;
4918
0
  int     natt;
4919
0
  Datum    *attrs;
4920
0
  bool     *isnull;
4921
0
  bool     *free;
4922
0
  HeapTuple tmphtup;
4923
0
  Relation  toast_rel;
4924
0
  TupleDesc toast_desc;
4925
0
  MemoryContext oldcontext;
4926
0
  HeapTuple newtup;
4927
0
  Size    old_size;
4928
4929
  /* no toast tuples changed */
4930
0
  if (txn->toast_hash == NULL)
4931
0
    return;
4932
4933
  /*
4934
   * We're going to modify the size of the change. So, to make sure the
4935
   * accounting is correct we record the current change size and then after
4936
   * re-computing the change we'll subtract the recorded size and then
4937
   * re-add the new change size at the end. We don't immediately subtract
4938
   * the old size because if there is any error before we add the new size,
4939
   * we will release the changes and that will update the accounting info
4940
   * (subtracting the size from the counters). And we don't want to
4941
   * underflow there.
4942
   */
4943
0
  old_size = ReorderBufferChangeSize(change);
4944
4945
0
  oldcontext = MemoryContextSwitchTo(rb->context);
4946
4947
  /* we should only have toast tuples in an INSERT or UPDATE */
4948
0
  Assert(change->data.tp.newtuple);
4949
4950
0
  desc = RelationGetDescr(relation);
4951
4952
0
  toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid);
4953
0
  if (!RelationIsValid(toast_rel))
4954
0
    elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")",
4955
0
       relation->rd_rel->reltoastrelid, RelationGetRelationName(relation));
4956
4957
0
  toast_desc = RelationGetDescr(toast_rel);
4958
4959
  /* should we allocate from stack instead? */
4960
0
  attrs = palloc0(sizeof(Datum) * desc->natts);
4961
0
  isnull = palloc0(sizeof(bool) * desc->natts);
4962
0
  free = palloc0(sizeof(bool) * desc->natts);
4963
4964
0
  newtup = change->data.tp.newtuple;
4965
4966
0
  heap_deform_tuple(newtup, desc, attrs, isnull);
4967
4968
0
  for (natt = 0; natt < desc->natts; natt++)
4969
0
  {
4970
0
    Form_pg_attribute attr = TupleDescAttr(desc, natt);
4971
0
    ReorderBufferToastEnt *ent;
4972
0
    struct varlena *varlena;
4973
4974
    /* va_rawsize is the size of the original datum -- including header */
4975
0
    struct varatt_external toast_pointer;
4976
0
    struct varatt_indirect redirect_pointer;
4977
0
    struct varlena *new_datum = NULL;
4978
0
    struct varlena *reconstructed;
4979
0
    dlist_iter  it;
4980
0
    Size    data_done = 0;
4981
4982
    /* system columns aren't toasted */
4983
0
    if (attr->attnum < 0)
4984
0
      continue;
4985
4986
0
    if (attr->attisdropped)
4987
0
      continue;
4988
4989
    /* not a varlena datatype */
4990
0
    if (attr->attlen != -1)
4991
0
      continue;
4992
4993
    /* no data */
4994
0
    if (isnull[natt])
4995
0
      continue;
4996
4997
    /* ok, we know we have a toast datum */
4998
0
    varlena = (struct varlena *) DatumGetPointer(attrs[natt]);
4999
5000
    /* no need to do anything if the tuple isn't external */
5001
0
    if (!VARATT_IS_EXTERNAL(varlena))
5002
0
      continue;
5003
5004
0
    VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena);
5005
5006
    /*
5007
     * Check whether the toast tuple changed, replace if so.
5008
     */
5009
0
    ent = (ReorderBufferToastEnt *)
5010
0
      hash_search(txn->toast_hash,
5011
0
            &toast_pointer.va_valueid,
5012
0
            HASH_FIND,
5013
0
            NULL);
5014
0
    if (ent == NULL)
5015
0
      continue;
5016
5017
0
    new_datum =
5018
0
      (struct varlena *) palloc0(INDIRECT_POINTER_SIZE);
5019
5020
0
    free[natt] = true;
5021
5022
0
    reconstructed = palloc0(toast_pointer.va_rawsize);
5023
5024
0
    ent->reconstructed = reconstructed;
5025
5026
    /* stitch toast tuple back together from its parts */
5027
0
    dlist_foreach(it, &ent->chunks)
5028
0
    {
5029
0
      bool    cisnull;
5030
0
      ReorderBufferChange *cchange;
5031
0
      HeapTuple ctup;
5032
0
      Pointer   chunk;
5033
5034
0
      cchange = dlist_container(ReorderBufferChange, node, it.cur);
5035
0
      ctup = cchange->data.tp.newtuple;
5036
0
      chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull));
5037
5038
0
      Assert(!cisnull);
5039
0
      Assert(!VARATT_IS_EXTERNAL(chunk));
5040
0
      Assert(!VARATT_IS_SHORT(chunk));
5041
5042
0
      memcpy(VARDATA(reconstructed) + data_done,
5043
0
           VARDATA(chunk),
5044
0
           VARSIZE(chunk) - VARHDRSZ);
5045
0
      data_done += VARSIZE(chunk) - VARHDRSZ;
5046
0
    }
5047
0
    Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer));
5048
5049
    /* make sure its marked as compressed or not */
5050
0
    if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer))
5051
0
      SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ);
5052
0
    else
5053
0
      SET_VARSIZE(reconstructed, data_done + VARHDRSZ);
5054
5055
0
    memset(&redirect_pointer, 0, sizeof(redirect_pointer));
5056
0
    redirect_pointer.pointer = reconstructed;
5057
5058
0
    SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT);
5059
0
    memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer,
5060
0
         sizeof(redirect_pointer));
5061
5062
0
    attrs[natt] = PointerGetDatum(new_datum);
5063
0
  }
5064
5065
  /*
5066
   * Build tuple in separate memory & copy tuple back into the tuplebuf
5067
   * passed to the output plugin. We can't directly heap_fill_tuple() into
5068
   * the tuplebuf because attrs[] will point back into the current content.
5069
   */
5070
0
  tmphtup = heap_form_tuple(desc, attrs, isnull);
5071
0
  Assert(newtup->t_len <= MaxHeapTupleSize);
5072
0
  Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE));
5073
5074
0
  memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len);
5075
0
  newtup->t_len = tmphtup->t_len;
5076
5077
  /*
5078
   * free resources we won't further need, more persistent stuff will be
5079
   * free'd in ReorderBufferToastReset().
5080
   */
5081
0
  RelationClose(toast_rel);
5082
0
  pfree(tmphtup);
5083
0
  for (natt = 0; natt < desc->natts; natt++)
5084
0
  {
5085
0
    if (free[natt])
5086
0
      pfree(DatumGetPointer(attrs[natt]));
5087
0
  }
5088
0
  pfree(attrs);
5089
0
  pfree(free);
5090
0
  pfree(isnull);
5091
5092
0
  MemoryContextSwitchTo(oldcontext);
5093
5094
  /* subtract the old change size */
5095
0
  ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size);
5096
  /* now add the change back, with the correct size */
5097
0
  ReorderBufferChangeMemoryUpdate(rb, change, NULL, true,
5098
0
                  ReorderBufferChangeSize(change));
5099
0
}
5100
5101
/*
5102
 * Free all resources allocated for toast reconstruction.
5103
 */
5104
static void
5105
ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
5106
0
{
5107
0
  HASH_SEQ_STATUS hstat;
5108
0
  ReorderBufferToastEnt *ent;
5109
5110
0
  if (txn->toast_hash == NULL)
5111
0
    return;
5112
5113
  /* sequentially walk over the hash and free everything */
5114
0
  hash_seq_init(&hstat, txn->toast_hash);
5115
0
  while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL)
5116
0
  {
5117
0
    dlist_mutable_iter it;
5118
5119
0
    if (ent->reconstructed != NULL)
5120
0
      pfree(ent->reconstructed);
5121
5122
0
    dlist_foreach_modify(it, &ent->chunks)
5123
0
    {
5124
0
      ReorderBufferChange *change =
5125
0
        dlist_container(ReorderBufferChange, node, it.cur);
5126
5127
0
      dlist_delete(&change->node);
5128
0
      ReorderBufferFreeChange(rb, change, true);
5129
0
    }
5130
0
  }
5131
5132
0
  hash_destroy(txn->toast_hash);
5133
0
  txn->toast_hash = NULL;
5134
0
}
5135
5136
5137
/* ---------------------------------------
5138
 * Visibility support for logical decoding
5139
 *
5140
 *
5141
 * Lookup actual cmin/cmax values when using decoding snapshot. We can't
5142
 * always rely on stored cmin/cmax values because of two scenarios:
5143
 *
5144
 * * A tuple got changed multiple times during a single transaction and thus
5145
 *   has got a combo CID. Combo CIDs are only valid for the duration of a
5146
 *   single transaction.
5147
 * * A tuple with a cmin but no cmax (and thus no combo CID) got
5148
 *   deleted/updated in another transaction than the one which created it
5149
 *   which we are looking at right now. As only one of cmin, cmax or combo CID
5150
 *   is actually stored in the heap we don't have access to the value we
5151
 *   need anymore.
5152
 *
5153
 * To resolve those problems we have a per-transaction hash of (cmin,
5154
 * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual
5155
 * (cmin, cmax) values. That also takes care of combo CIDs by simply
5156
 * not caring about them at all. As we have the real cmin/cmax values
5157
 * combo CIDs aren't interesting.
5158
 *
5159
 * As we only care about catalog tuples here the overhead of this
5160
 * hashtable should be acceptable.
5161
 *
5162
 * Heap rewrites complicate this a bit, check rewriteheap.c for
5163
 * details.
5164
 * -------------------------------------------------------------------------
5165
 */
5166
5167
/* struct for sorting mapping files by LSN efficiently */
5168
typedef struct RewriteMappingFile
5169
{
5170
  XLogRecPtr  lsn;
5171
  char    fname[MAXPGPATH];
5172
} RewriteMappingFile;
5173
5174
#ifdef NOT_USED
5175
static void
5176
DisplayMapping(HTAB *tuplecid_data)
5177
{
5178
  HASH_SEQ_STATUS hstat;
5179
  ReorderBufferTupleCidEnt *ent;
5180
5181
  hash_seq_init(&hstat, tuplecid_data);
5182
  while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL)
5183
  {
5184
    elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u",
5185
       ent->key.rlocator.dbOid,
5186
       ent->key.rlocator.spcOid,
5187
       ent->key.rlocator.relNumber,
5188
       ItemPointerGetBlockNumber(&ent->key.tid),
5189
       ItemPointerGetOffsetNumber(&ent->key.tid),
5190
       ent->cmin,
5191
       ent->cmax
5192
      );
5193
  }
5194
}
5195
#endif
5196
5197
/*
5198
 * Apply a single mapping file to tuplecid_data.
5199
 *
5200
 * The mapping file has to have been verified to be a) committed b) for our
5201
 * transaction c) applied in LSN order.
5202
 */
5203
static void
5204
ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname)
5205
0
{
5206
0
  char    path[MAXPGPATH];
5207
0
  int     fd;
5208
0
  int     readBytes;
5209
0
  LogicalRewriteMappingData map;
5210
5211
0
  sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname);
5212
0
  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
5213
0
  if (fd < 0)
5214
0
    ereport(ERROR,
5215
0
        (errcode_for_file_access(),
5216
0
         errmsg("could not open file \"%s\": %m", path)));
5217
5218
0
  while (true)
5219
0
  {
5220
0
    ReorderBufferTupleCidKey key;
5221
0
    ReorderBufferTupleCidEnt *ent;
5222
0
    ReorderBufferTupleCidEnt *new_ent;
5223
0
    bool    found;
5224
5225
    /* be careful about padding */
5226
0
    memset(&key, 0, sizeof(ReorderBufferTupleCidKey));
5227
5228
    /* read all mappings till the end of the file */
5229
0
    pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ);
5230
0
    readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData));
5231
0
    pgstat_report_wait_end();
5232
5233
0
    if (readBytes < 0)
5234
0
      ereport(ERROR,
5235
0
          (errcode_for_file_access(),
5236
0
           errmsg("could not read file \"%s\": %m",
5237
0
              path)));
5238
0
    else if (readBytes == 0) /* EOF */
5239
0
      break;
5240
0
    else if (readBytes != sizeof(LogicalRewriteMappingData))
5241
0
      ereport(ERROR,
5242
0
          (errcode_for_file_access(),
5243
0
           errmsg("could not read from file \"%s\": read %d instead of %d bytes",
5244
0
              path, readBytes,
5245
0
              (int32) sizeof(LogicalRewriteMappingData))));
5246
5247
0
    key.rlocator = map.old_locator;
5248
0
    ItemPointerCopy(&map.old_tid,
5249
0
            &key.tid);
5250
5251
5252
0
    ent = (ReorderBufferTupleCidEnt *)
5253
0
      hash_search(tuplecid_data, &key, HASH_FIND, NULL);
5254
5255
    /* no existing mapping, no need to update */
5256
0
    if (!ent)
5257
0
      continue;
5258
5259
0
    key.rlocator = map.new_locator;
5260
0
    ItemPointerCopy(&map.new_tid,
5261
0
            &key.tid);
5262
5263
0
    new_ent = (ReorderBufferTupleCidEnt *)
5264
0
      hash_search(tuplecid_data, &key, HASH_ENTER, &found);
5265
5266
0
    if (found)
5267
0
    {
5268
      /*
5269
       * Make sure the existing mapping makes sense. We sometime update
5270
       * old records that did not yet have a cmax (e.g. pg_class' own
5271
       * entry while rewriting it) during rewrites, so allow that.
5272
       */
5273
0
      Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin);
5274
0
      Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax);
5275
0
    }
5276
0
    else
5277
0
    {
5278
      /* update mapping */
5279
0
      new_ent->cmin = ent->cmin;
5280
0
      new_ent->cmax = ent->cmax;
5281
0
      new_ent->combocid = ent->combocid;
5282
0
    }
5283
0
  }
5284
5285
0
  if (CloseTransientFile(fd) != 0)
5286
0
    ereport(ERROR,
5287
0
        (errcode_for_file_access(),
5288
0
         errmsg("could not close file \"%s\": %m", path)));
5289
0
}
5290
5291
5292
/*
5293
 * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'.
5294
 */
5295
static bool
5296
TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num)
5297
0
{
5298
0
  return bsearch(&xid, xip, num,
5299
0
           sizeof(TransactionId), xidComparator) != NULL;
5300
0
}
5301
5302
/*
5303
 * list_sort() comparator for sorting RewriteMappingFiles in LSN order.
5304
 */
5305
static int
5306
file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p)
5307
0
{
5308
0
  RewriteMappingFile *a = (RewriteMappingFile *) lfirst(a_p);
5309
0
  RewriteMappingFile *b = (RewriteMappingFile *) lfirst(b_p);
5310
5311
0
  return pg_cmp_u64(a->lsn, b->lsn);
5312
0
}
5313
5314
/*
5315
 * Apply any existing logical remapping files if there are any targeted at our
5316
 * transaction for relid.
5317
 */
5318
static void
5319
UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot)
5320
0
{
5321
0
  DIR      *mapping_dir;
5322
0
  struct dirent *mapping_de;
5323
0
  List     *files = NIL;
5324
0
  ListCell   *file;
5325
0
  Oid     dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId;
5326
5327
0
  mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR);
5328
0
  while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL)
5329
0
  {
5330
0
    Oid     f_dboid;
5331
0
    Oid     f_relid;
5332
0
    TransactionId f_mapped_xid;
5333
0
    TransactionId f_create_xid;
5334
0
    XLogRecPtr  f_lsn;
5335
0
    uint32    f_hi,
5336
0
          f_lo;
5337
0
    RewriteMappingFile *f;
5338
5339
0
    if (strcmp(mapping_de->d_name, ".") == 0 ||
5340
0
      strcmp(mapping_de->d_name, "..") == 0)
5341
0
      continue;
5342
5343
    /* Ignore files that aren't ours */
5344
0
    if (strncmp(mapping_de->d_name, "map-", 4) != 0)
5345
0
      continue;
5346
5347
0
    if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT,
5348
0
           &f_dboid, &f_relid, &f_hi, &f_lo,
5349
0
           &f_mapped_xid, &f_create_xid) != 6)
5350
0
      elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name);
5351
5352
0
    f_lsn = ((uint64) f_hi) << 32 | f_lo;
5353
5354
    /* mapping for another database */
5355
0
    if (f_dboid != dboid)
5356
0
      continue;
5357
5358
    /* mapping for another relation */
5359
0
    if (f_relid != relid)
5360
0
      continue;
5361
5362
    /* did the creating transaction abort? */
5363
0
    if (!TransactionIdDidCommit(f_create_xid))
5364
0
      continue;
5365
5366
    /* not for our transaction */
5367
0
    if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt))
5368
0
      continue;
5369
5370
    /* ok, relevant, queue for apply */
5371
0
    f = palloc(sizeof(RewriteMappingFile));
5372
0
    f->lsn = f_lsn;
5373
0
    strcpy(f->fname, mapping_de->d_name);
5374
0
    files = lappend(files, f);
5375
0
  }
5376
0
  FreeDir(mapping_dir);
5377
5378
  /* sort files so we apply them in LSN order */
5379
0
  list_sort(files, file_sort_by_lsn);
5380
5381
0
  foreach(file, files)
5382
0
  {
5383
0
    RewriteMappingFile *f = (RewriteMappingFile *) lfirst(file);
5384
5385
0
    elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname,
5386
0
       snapshot->subxip[0]);
5387
0
    ApplyLogicalMappingFile(tuplecid_data, relid, f->fname);
5388
0
    pfree(f);
5389
0
  }
5390
0
}
5391
5392
/*
5393
 * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on
5394
 * combo CIDs.
5395
 */
5396
bool
5397
ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
5398
                Snapshot snapshot,
5399
                HeapTuple htup, Buffer buffer,
5400
                CommandId *cmin, CommandId *cmax)
5401
0
{
5402
0
  ReorderBufferTupleCidKey key;
5403
0
  ReorderBufferTupleCidEnt *ent;
5404
0
  ForkNumber  forkno;
5405
0
  BlockNumber blockno;
5406
0
  bool    updated_mapping = false;
5407
5408
  /*
5409
   * Return unresolved if tuplecid_data is not valid.  That's because when
5410
   * streaming in-progress transactions we may run into tuples with the CID
5411
   * before actually decoding them.  Think e.g. about INSERT followed by
5412
   * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
5413
   * INSERT.  So in such cases, we assume the CID is from the future
5414
   * command.
5415
   */
5416
0
  if (tuplecid_data == NULL)
5417
0
    return false;
5418
5419
  /* be careful about padding */
5420
0
  memset(&key, 0, sizeof(key));
5421
5422
0
  Assert(!BufferIsLocal(buffer));
5423
5424
  /*
5425
   * get relfilelocator from the buffer, no convenient way to access it
5426
   * other than that.
5427
   */
5428
0
  BufferGetTag(buffer, &key.rlocator, &forkno, &blockno);
5429
5430
  /* tuples can only be in the main fork */
5431
0
  Assert(forkno == MAIN_FORKNUM);
5432
0
  Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self));
5433
5434
0
  ItemPointerCopy(&htup->t_self,
5435
0
          &key.tid);
5436
5437
0
restart:
5438
0
  ent = (ReorderBufferTupleCidEnt *)
5439
0
    hash_search(tuplecid_data, &key, HASH_FIND, NULL);
5440
5441
  /*
5442
   * failed to find a mapping, check whether the table was rewritten and
5443
   * apply mapping if so, but only do that once - there can be no new
5444
   * mappings while we are in here since we have to hold a lock on the
5445
   * relation.
5446
   */
5447
0
  if (ent == NULL && !updated_mapping)
5448
0
  {
5449
0
    UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot);
5450
    /* now check but don't update for a mapping again */
5451
0
    updated_mapping = true;
5452
0
    goto restart;
5453
0
  }
5454
0
  else if (ent == NULL)
5455
0
    return false;
5456
5457
0
  if (cmin)
5458
0
    *cmin = ent->cmin;
5459
0
  if (cmax)
5460
0
    *cmax = ent->cmax;
5461
0
  return true;
5462
0
}
5463
5464
/*
5465
 * Count invalidation messages of specified transaction.
5466
 *
5467
 * Returns number of messages, and msgs is set to the pointer of the linked
5468
 * list for the messages.
5469
 */
5470
uint32
5471
ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid,
5472
                SharedInvalidationMessage **msgs)
5473
0
{
5474
0
  ReorderBufferTXN *txn;
5475
5476
0
  txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
5477
0
                false);
5478
5479
0
  if (txn == NULL)
5480
0
    return 0;
5481
5482
0
  *msgs = txn->invalidations;
5483
5484
0
  return txn->ninvalidations;
5485
0
}