/src/postgres/src/backend/replication/logical/reorderbuffer.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * reorderbuffer.c |
4 | | * PostgreSQL logical replay/reorder buffer management |
5 | | * |
6 | | * |
7 | | * Copyright (c) 2012-2025, PostgreSQL Global Development Group |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/replication/logical/reorderbuffer.c |
12 | | * |
13 | | * NOTES |
14 | | * This module gets handed individual pieces of transactions in the order |
15 | | * they are written to the WAL and is responsible to reassemble them into |
16 | | * toplevel transaction sized pieces. When a transaction is completely |
17 | | * reassembled - signaled by reading the transaction commit record - it |
18 | | * will then call the output plugin (cf. ReorderBufferCommit()) with the |
19 | | * individual changes. The output plugins rely on snapshots built by |
20 | | * snapbuild.c which hands them to us. |
21 | | * |
22 | | * Transactions and subtransactions/savepoints in postgres are not |
23 | | * immediately linked to each other from outside the performing |
24 | | * backend. Only at commit/abort (or special xact_assignment records) they |
25 | | * are linked together. Which means that we will have to splice together a |
26 | | * toplevel transaction from its subtransactions. To do that efficiently we |
27 | | * build a binary heap indexed by the smallest current lsn of the individual |
28 | | * subtransactions' changestreams. As the individual streams are inherently |
29 | | * ordered by LSN - since that is where we build them from - the transaction |
30 | | * can easily be reassembled by always using the subtransaction with the |
31 | | * smallest current LSN from the heap. |
32 | | * |
33 | | * In order to cope with large transactions - which can be several times as |
34 | | * big as the available memory - this module supports spooling the contents |
35 | | * of large transactions to disk. When the transaction is replayed the |
36 | | * contents of individual (sub-)transactions will be read from disk in |
37 | | * chunks. |
38 | | * |
39 | | * This module also has to deal with reassembling toast records from the |
40 | | * individual chunks stored in WAL. When a new (or initial) version of a |
41 | | * tuple is stored in WAL it will always be preceded by the toast chunks |
42 | | * emitted for the columns stored out of line. Within a single toplevel |
43 | | * transaction there will be no other data carrying records between a row's |
44 | | * toast chunks and the row data itself. See ReorderBufferToast* for |
45 | | * details. |
46 | | * |
47 | | * ReorderBuffer uses two special memory context types - SlabContext for |
48 | | * allocations of fixed-length structures (changes and transactions), and |
49 | | * GenerationContext for the variable-length transaction data (allocated |
50 | | * and freed in groups with similar lifespans). |
51 | | * |
52 | | * To limit the amount of memory used by decoded changes, we track memory |
53 | | * used at the reorder buffer level (i.e. total amount of memory), and for |
54 | | * each transaction. When the total amount of used memory exceeds the |
55 | | * limit, the transaction consuming the most memory is then serialized to |
56 | | * disk. |
57 | | * |
58 | | * Only decoded changes are evicted from memory (spilled to disk), not the |
59 | | * transaction records. The number of toplevel transactions is limited, |
60 | | * but a transaction with many subtransactions may still consume significant |
61 | | * amounts of memory. However, the transaction records are fairly small and |
62 | | * are not included in the memory limit. |
63 | | * |
64 | | * The current eviction algorithm is very simple - the transaction is |
65 | | * picked merely by size, while it might be useful to also consider age |
66 | | * (LSN) of the changes for example. With the new Generational memory |
67 | | * allocator, evicting the oldest changes would make it more likely the |
68 | | * memory gets actually freed. |
69 | | * |
70 | | * We use a max-heap with transaction size as the key to efficiently find |
71 | | * the largest transaction. We update the max-heap whenever the memory |
72 | | * counter is updated; however transactions with size 0 are not stored in |
73 | | * the heap, because they have no changes to evict. |
74 | | * |
75 | | * We still rely on max_changes_in_memory when loading serialized changes |
76 | | * back into memory. At that point we can't use the memory limit directly |
77 | | * as we load the subxacts independently. One option to deal with this |
78 | | * would be to count the subxacts, and allow each to allocate 1/N of the |
79 | | * memory limit. That however does not seem very appealing, because with |
80 | | * many subtransactions it may easily cause thrashing (short cycles of |
81 | | * deserializing and applying very few changes). We probably should give |
82 | | * a bit more memory to the oldest subtransactions, because it's likely |
83 | | * they are the source for the next sequence of changes. |
84 | | * |
85 | | * ------------------------------------------------------------------------- |
86 | | */ |
87 | | #include "postgres.h" |
88 | | |
89 | | #include <unistd.h> |
90 | | #include <sys/stat.h> |
91 | | |
92 | | #include "access/detoast.h" |
93 | | #include "access/heapam.h" |
94 | | #include "access/rewriteheap.h" |
95 | | #include "access/transam.h" |
96 | | #include "access/xact.h" |
97 | | #include "access/xlog_internal.h" |
98 | | #include "catalog/catalog.h" |
99 | | #include "common/int.h" |
100 | | #include "lib/binaryheap.h" |
101 | | #include "miscadmin.h" |
102 | | #include "pgstat.h" |
103 | | #include "replication/logical.h" |
104 | | #include "replication/reorderbuffer.h" |
105 | | #include "replication/slot.h" |
106 | | #include "replication/snapbuild.h" /* just for SnapBuildSnapDecRefcount */ |
107 | | #include "storage/bufmgr.h" |
108 | | #include "storage/fd.h" |
109 | | #include "storage/procarray.h" |
110 | | #include "storage/sinval.h" |
111 | | #include "utils/builtins.h" |
112 | | #include "utils/memutils.h" |
113 | | #include "utils/rel.h" |
114 | | #include "utils/relfilenumbermap.h" |
115 | | |
116 | | /* entry for a hash table we use to map from xid to our transaction state */ |
117 | | typedef struct ReorderBufferTXNByIdEnt |
118 | | { |
119 | | TransactionId xid; |
120 | | ReorderBufferTXN *txn; |
121 | | } ReorderBufferTXNByIdEnt; |
122 | | |
123 | | /* data structures for (relfilelocator, ctid) => (cmin, cmax) mapping */ |
124 | | typedef struct ReorderBufferTupleCidKey |
125 | | { |
126 | | RelFileLocator rlocator; |
127 | | ItemPointerData tid; |
128 | | } ReorderBufferTupleCidKey; |
129 | | |
130 | | typedef struct ReorderBufferTupleCidEnt |
131 | | { |
132 | | ReorderBufferTupleCidKey key; |
133 | | CommandId cmin; |
134 | | CommandId cmax; |
135 | | CommandId combocid; /* just for debugging */ |
136 | | } ReorderBufferTupleCidEnt; |
137 | | |
138 | | /* Virtual file descriptor with file offset tracking */ |
139 | | typedef struct TXNEntryFile |
140 | | { |
141 | | File vfd; /* -1 when the file is closed */ |
142 | | off_t curOffset; /* offset for next write or read. Reset to 0 |
143 | | * when vfd is opened. */ |
144 | | } TXNEntryFile; |
145 | | |
146 | | /* k-way in-order change iteration support structures */ |
147 | | typedef struct ReorderBufferIterTXNEntry |
148 | | { |
149 | | XLogRecPtr lsn; |
150 | | ReorderBufferChange *change; |
151 | | ReorderBufferTXN *txn; |
152 | | TXNEntryFile file; |
153 | | XLogSegNo segno; |
154 | | } ReorderBufferIterTXNEntry; |
155 | | |
156 | | typedef struct ReorderBufferIterTXNState |
157 | | { |
158 | | binaryheap *heap; |
159 | | Size nr_txns; |
160 | | dlist_head old_change; |
161 | | ReorderBufferIterTXNEntry entries[FLEXIBLE_ARRAY_MEMBER]; |
162 | | } ReorderBufferIterTXNState; |
163 | | |
164 | | /* toast datastructures */ |
165 | | typedef struct ReorderBufferToastEnt |
166 | | { |
167 | | Oid chunk_id; /* toast_table.chunk_id */ |
168 | | int32 last_chunk_seq; /* toast_table.chunk_seq of the last chunk we |
169 | | * have seen */ |
170 | | Size num_chunks; /* number of chunks we've already seen */ |
171 | | Size size; /* combined size of chunks seen */ |
172 | | dlist_head chunks; /* linked list of chunks */ |
173 | | struct varlena *reconstructed; /* reconstructed varlena now pointed to in |
174 | | * main tup */ |
175 | | } ReorderBufferToastEnt; |
176 | | |
177 | | /* Disk serialization support datastructures */ |
178 | | typedef struct ReorderBufferDiskChange |
179 | | { |
180 | | Size size; |
181 | | ReorderBufferChange change; |
182 | | /* data follows */ |
183 | | } ReorderBufferDiskChange; |
184 | | |
185 | 0 | #define IsSpecInsert(action) \ |
186 | 0 | ( \ |
187 | 0 | ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \ |
188 | 0 | ) |
189 | 0 | #define IsSpecConfirmOrAbort(action) \ |
190 | 0 | ( \ |
191 | 0 | (((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) || \ |
192 | 0 | ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT)) \ |
193 | 0 | ) |
194 | 0 | #define IsInsertOrUpdate(action) \ |
195 | 0 | ( \ |
196 | 0 | (((action) == REORDER_BUFFER_CHANGE_INSERT) || \ |
197 | 0 | ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \ |
198 | 0 | ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \ |
199 | 0 | ) |
200 | | |
201 | | /* |
202 | | * Maximum number of changes kept in memory, per transaction. After that, |
203 | | * changes are spooled to disk. |
204 | | * |
205 | | * The current value should be sufficient to decode the entire transaction |
206 | | * without hitting disk in OLTP workloads, while starting to spool to disk in |
207 | | * other workloads reasonably fast. |
208 | | * |
209 | | * At some point in the future it probably makes sense to have a more elaborate |
210 | | * resource management here, but it's not entirely clear what that would look |
211 | | * like. |
212 | | */ |
213 | | int logical_decoding_work_mem; |
214 | | static const Size max_changes_in_memory = 4096; /* XXX for restore only */ |
215 | | |
216 | | /* GUC variable */ |
217 | | int debug_logical_replication_streaming = DEBUG_LOGICAL_REP_STREAMING_BUFFERED; |
218 | | |
219 | | /* --------------------------------------- |
220 | | * primary reorderbuffer support routines |
221 | | * --------------------------------------- |
222 | | */ |
223 | | static ReorderBufferTXN *ReorderBufferAllocTXN(ReorderBuffer *rb); |
224 | | static void ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); |
225 | | static ReorderBufferTXN *ReorderBufferTXNByXid(ReorderBuffer *rb, |
226 | | TransactionId xid, bool create, bool *is_new, |
227 | | XLogRecPtr lsn, bool create_as_top); |
228 | | static void ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, |
229 | | ReorderBufferTXN *subtxn); |
230 | | |
231 | | static void AssertTXNLsnOrder(ReorderBuffer *rb); |
232 | | |
233 | | /* --------------------------------------- |
234 | | * support functions for lsn-order iterating over the ->changes of a |
235 | | * transaction and its subtransactions |
236 | | * |
237 | | * used for iteration over the k-way heap merge of a transaction and its |
238 | | * subtransactions |
239 | | * --------------------------------------- |
240 | | */ |
241 | | static void ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, |
242 | | ReorderBufferIterTXNState *volatile *iter_state); |
243 | | static ReorderBufferChange *ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state); |
244 | | static void ReorderBufferIterTXNFinish(ReorderBuffer *rb, |
245 | | ReorderBufferIterTXNState *state); |
246 | | static void ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs); |
247 | | |
248 | | /* |
249 | | * --------------------------------------- |
250 | | * Disk serialization support functions |
251 | | * --------------------------------------- |
252 | | */ |
253 | | static void ReorderBufferCheckMemoryLimit(ReorderBuffer *rb); |
254 | | static void ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); |
255 | | static void ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
256 | | int fd, ReorderBufferChange *change); |
257 | | static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, |
258 | | TXNEntryFile *file, XLogSegNo *segno); |
259 | | static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
260 | | char *data); |
261 | | static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn); |
262 | | static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, |
263 | | bool txn_prepared); |
264 | | static void ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn); |
265 | | static bool ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); |
266 | | static void ReorderBufferCleanupSerializedTXNs(const char *slotname); |
267 | | static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, |
268 | | TransactionId xid, XLogSegNo segno); |
269 | | static int ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg); |
270 | | |
271 | | static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap); |
272 | | static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, |
273 | | ReorderBufferTXN *txn, CommandId cid); |
274 | | |
275 | | /* |
276 | | * --------------------------------------- |
277 | | * Streaming support functions |
278 | | * --------------------------------------- |
279 | | */ |
280 | | static inline bool ReorderBufferCanStream(ReorderBuffer *rb); |
281 | | static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb); |
282 | | static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn); |
283 | | static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn); |
284 | | |
285 | | /* --------------------------------------- |
286 | | * toast reassembly support |
287 | | * --------------------------------------- |
288 | | */ |
289 | | static void ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn); |
290 | | static void ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn); |
291 | | static void ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, |
292 | | Relation relation, ReorderBufferChange *change); |
293 | | static void ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, |
294 | | Relation relation, ReorderBufferChange *change); |
295 | | |
296 | | /* |
297 | | * --------------------------------------- |
298 | | * memory accounting |
299 | | * --------------------------------------- |
300 | | */ |
301 | | static Size ReorderBufferChangeSize(ReorderBufferChange *change); |
302 | | static void ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, |
303 | | ReorderBufferChange *change, |
304 | | ReorderBufferTXN *txn, |
305 | | bool addition, Size sz); |
306 | | |
307 | | /* |
308 | | * Allocate a new ReorderBuffer and clean out any old serialized state from |
309 | | * prior ReorderBuffer instances for the same slot. |
310 | | */ |
311 | | ReorderBuffer * |
312 | | ReorderBufferAllocate(void) |
313 | 0 | { |
314 | 0 | ReorderBuffer *buffer; |
315 | 0 | HASHCTL hash_ctl; |
316 | 0 | MemoryContext new_ctx; |
317 | |
|
318 | 0 | Assert(MyReplicationSlot != NULL); |
319 | | |
320 | | /* allocate memory in own context, to have better accountability */ |
321 | 0 | new_ctx = AllocSetContextCreate(CurrentMemoryContext, |
322 | 0 | "ReorderBuffer", |
323 | 0 | ALLOCSET_DEFAULT_SIZES); |
324 | |
|
325 | 0 | buffer = |
326 | 0 | (ReorderBuffer *) MemoryContextAlloc(new_ctx, sizeof(ReorderBuffer)); |
327 | |
|
328 | 0 | memset(&hash_ctl, 0, sizeof(hash_ctl)); |
329 | |
|
330 | 0 | buffer->context = new_ctx; |
331 | |
|
332 | 0 | buffer->change_context = SlabContextCreate(new_ctx, |
333 | 0 | "Change", |
334 | 0 | SLAB_DEFAULT_BLOCK_SIZE, |
335 | 0 | sizeof(ReorderBufferChange)); |
336 | |
|
337 | 0 | buffer->txn_context = SlabContextCreate(new_ctx, |
338 | 0 | "TXN", |
339 | 0 | SLAB_DEFAULT_BLOCK_SIZE, |
340 | 0 | sizeof(ReorderBufferTXN)); |
341 | | |
342 | | /* |
343 | | * To minimize memory fragmentation caused by long-running transactions |
344 | | * with changes spanning multiple memory blocks, we use a single |
345 | | * fixed-size memory block for decoded tuple storage. The performance |
346 | | * testing showed that the default memory block size maintains logical |
347 | | * decoding performance without causing fragmentation due to concurrent |
348 | | * transactions. One might think that we can use the max size as |
349 | | * SLAB_LARGE_BLOCK_SIZE but the test also showed it doesn't help resolve |
350 | | * the memory fragmentation. |
351 | | */ |
352 | 0 | buffer->tup_context = GenerationContextCreate(new_ctx, |
353 | 0 | "Tuples", |
354 | 0 | SLAB_DEFAULT_BLOCK_SIZE, |
355 | 0 | SLAB_DEFAULT_BLOCK_SIZE, |
356 | 0 | SLAB_DEFAULT_BLOCK_SIZE); |
357 | |
|
358 | 0 | hash_ctl.keysize = sizeof(TransactionId); |
359 | 0 | hash_ctl.entrysize = sizeof(ReorderBufferTXNByIdEnt); |
360 | 0 | hash_ctl.hcxt = buffer->context; |
361 | |
|
362 | 0 | buffer->by_txn = hash_create("ReorderBufferByXid", 1000, &hash_ctl, |
363 | 0 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
364 | |
|
365 | 0 | buffer->by_txn_last_xid = InvalidTransactionId; |
366 | 0 | buffer->by_txn_last_txn = NULL; |
367 | |
|
368 | 0 | buffer->outbuf = NULL; |
369 | 0 | buffer->outbufsize = 0; |
370 | 0 | buffer->size = 0; |
371 | | |
372 | | /* txn_heap is ordered by transaction size */ |
373 | 0 | buffer->txn_heap = pairingheap_allocate(ReorderBufferTXNSizeCompare, NULL); |
374 | |
|
375 | 0 | buffer->spillTxns = 0; |
376 | 0 | buffer->spillCount = 0; |
377 | 0 | buffer->spillBytes = 0; |
378 | 0 | buffer->streamTxns = 0; |
379 | 0 | buffer->streamCount = 0; |
380 | 0 | buffer->streamBytes = 0; |
381 | 0 | buffer->totalTxns = 0; |
382 | 0 | buffer->totalBytes = 0; |
383 | |
|
384 | 0 | buffer->current_restart_decoding_lsn = InvalidXLogRecPtr; |
385 | |
|
386 | 0 | dlist_init(&buffer->toplevel_by_lsn); |
387 | 0 | dlist_init(&buffer->txns_by_base_snapshot_lsn); |
388 | 0 | dclist_init(&buffer->catchange_txns); |
389 | | |
390 | | /* |
391 | | * Ensure there's no stale data from prior uses of this slot, in case some |
392 | | * prior exit avoided calling ReorderBufferFree. Failure to do this can |
393 | | * produce duplicated txns, and it's very cheap if there's nothing there. |
394 | | */ |
395 | 0 | ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); |
396 | |
|
397 | 0 | return buffer; |
398 | 0 | } |
399 | | |
400 | | /* |
401 | | * Free a ReorderBuffer |
402 | | */ |
403 | | void |
404 | | ReorderBufferFree(ReorderBuffer *rb) |
405 | 0 | { |
406 | 0 | MemoryContext context = rb->context; |
407 | | |
408 | | /* |
409 | | * We free separately allocated data by entirely scrapping reorderbuffer's |
410 | | * memory context. |
411 | | */ |
412 | 0 | MemoryContextDelete(context); |
413 | | |
414 | | /* Free disk space used by unconsumed reorder buffers */ |
415 | 0 | ReorderBufferCleanupSerializedTXNs(NameStr(MyReplicationSlot->data.name)); |
416 | 0 | } |
417 | | |
418 | | /* |
419 | | * Allocate a new ReorderBufferTXN. |
420 | | */ |
421 | | static ReorderBufferTXN * |
422 | | ReorderBufferAllocTXN(ReorderBuffer *rb) |
423 | 0 | { |
424 | 0 | ReorderBufferTXN *txn; |
425 | |
|
426 | 0 | txn = (ReorderBufferTXN *) |
427 | 0 | MemoryContextAlloc(rb->txn_context, sizeof(ReorderBufferTXN)); |
428 | |
|
429 | 0 | memset(txn, 0, sizeof(ReorderBufferTXN)); |
430 | |
|
431 | 0 | dlist_init(&txn->changes); |
432 | 0 | dlist_init(&txn->tuplecids); |
433 | 0 | dlist_init(&txn->subtxns); |
434 | | |
435 | | /* InvalidCommandId is not zero, so set it explicitly */ |
436 | 0 | txn->command_id = InvalidCommandId; |
437 | 0 | txn->output_plugin_private = NULL; |
438 | |
|
439 | 0 | return txn; |
440 | 0 | } |
441 | | |
442 | | /* |
443 | | * Free a ReorderBufferTXN. |
444 | | */ |
445 | | static void |
446 | | ReorderBufferFreeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
447 | 0 | { |
448 | | /* clean the lookup cache if we were cached (quite likely) */ |
449 | 0 | if (rb->by_txn_last_xid == txn->xid) |
450 | 0 | { |
451 | 0 | rb->by_txn_last_xid = InvalidTransactionId; |
452 | 0 | rb->by_txn_last_txn = NULL; |
453 | 0 | } |
454 | | |
455 | | /* free data that's contained */ |
456 | |
|
457 | 0 | if (txn->gid != NULL) |
458 | 0 | { |
459 | 0 | pfree(txn->gid); |
460 | 0 | txn->gid = NULL; |
461 | 0 | } |
462 | |
|
463 | 0 | if (txn->tuplecid_hash != NULL) |
464 | 0 | { |
465 | 0 | hash_destroy(txn->tuplecid_hash); |
466 | 0 | txn->tuplecid_hash = NULL; |
467 | 0 | } |
468 | |
|
469 | 0 | if (txn->invalidations) |
470 | 0 | { |
471 | 0 | pfree(txn->invalidations); |
472 | 0 | txn->invalidations = NULL; |
473 | 0 | } |
474 | | |
475 | | /* Reset the toast hash */ |
476 | 0 | ReorderBufferToastReset(rb, txn); |
477 | | |
478 | | /* All changes must be deallocated */ |
479 | 0 | Assert(txn->size == 0); |
480 | |
|
481 | 0 | pfree(txn); |
482 | 0 | } |
483 | | |
484 | | /* |
485 | | * Allocate a ReorderBufferChange. |
486 | | */ |
487 | | ReorderBufferChange * |
488 | | ReorderBufferAllocChange(ReorderBuffer *rb) |
489 | 0 | { |
490 | 0 | ReorderBufferChange *change; |
491 | |
|
492 | 0 | change = (ReorderBufferChange *) |
493 | 0 | MemoryContextAlloc(rb->change_context, sizeof(ReorderBufferChange)); |
494 | |
|
495 | 0 | memset(change, 0, sizeof(ReorderBufferChange)); |
496 | 0 | return change; |
497 | 0 | } |
498 | | |
499 | | /* |
500 | | * Free a ReorderBufferChange and update memory accounting, if requested. |
501 | | */ |
502 | | void |
503 | | ReorderBufferFreeChange(ReorderBuffer *rb, ReorderBufferChange *change, |
504 | | bool upd_mem) |
505 | 0 | { |
506 | | /* update memory accounting info */ |
507 | 0 | if (upd_mem) |
508 | 0 | ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, |
509 | 0 | ReorderBufferChangeSize(change)); |
510 | | |
511 | | /* free contained data */ |
512 | 0 | switch (change->action) |
513 | 0 | { |
514 | 0 | case REORDER_BUFFER_CHANGE_INSERT: |
515 | 0 | case REORDER_BUFFER_CHANGE_UPDATE: |
516 | 0 | case REORDER_BUFFER_CHANGE_DELETE: |
517 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
518 | 0 | if (change->data.tp.newtuple) |
519 | 0 | { |
520 | 0 | ReorderBufferFreeTupleBuf(change->data.tp.newtuple); |
521 | 0 | change->data.tp.newtuple = NULL; |
522 | 0 | } |
523 | |
|
524 | 0 | if (change->data.tp.oldtuple) |
525 | 0 | { |
526 | 0 | ReorderBufferFreeTupleBuf(change->data.tp.oldtuple); |
527 | 0 | change->data.tp.oldtuple = NULL; |
528 | 0 | } |
529 | 0 | break; |
530 | 0 | case REORDER_BUFFER_CHANGE_MESSAGE: |
531 | 0 | if (change->data.msg.prefix != NULL) |
532 | 0 | pfree(change->data.msg.prefix); |
533 | 0 | change->data.msg.prefix = NULL; |
534 | 0 | if (change->data.msg.message != NULL) |
535 | 0 | pfree(change->data.msg.message); |
536 | 0 | change->data.msg.message = NULL; |
537 | 0 | break; |
538 | 0 | case REORDER_BUFFER_CHANGE_INVALIDATION: |
539 | 0 | if (change->data.inval.invalidations) |
540 | 0 | pfree(change->data.inval.invalidations); |
541 | 0 | change->data.inval.invalidations = NULL; |
542 | 0 | break; |
543 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
544 | 0 | if (change->data.snapshot) |
545 | 0 | { |
546 | 0 | ReorderBufferFreeSnap(rb, change->data.snapshot); |
547 | 0 | change->data.snapshot = NULL; |
548 | 0 | } |
549 | 0 | break; |
550 | | /* no data in addition to the struct itself */ |
551 | 0 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
552 | 0 | if (change->data.truncate.relids != NULL) |
553 | 0 | { |
554 | 0 | ReorderBufferFreeRelids(rb, change->data.truncate.relids); |
555 | 0 | change->data.truncate.relids = NULL; |
556 | 0 | } |
557 | 0 | break; |
558 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
559 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: |
560 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
561 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
562 | 0 | break; |
563 | 0 | } |
564 | | |
565 | 0 | pfree(change); |
566 | 0 | } |
567 | | |
568 | | /* |
569 | | * Allocate a HeapTuple fitting a tuple of size tuple_len (excluding header |
570 | | * overhead). |
571 | | */ |
572 | | HeapTuple |
573 | | ReorderBufferAllocTupleBuf(ReorderBuffer *rb, Size tuple_len) |
574 | 0 | { |
575 | 0 | HeapTuple tuple; |
576 | 0 | Size alloc_len; |
577 | |
|
578 | 0 | alloc_len = tuple_len + SizeofHeapTupleHeader; |
579 | |
|
580 | 0 | tuple = (HeapTuple) MemoryContextAlloc(rb->tup_context, |
581 | 0 | HEAPTUPLESIZE + alloc_len); |
582 | 0 | tuple->t_data = (HeapTupleHeader) ((char *) tuple + HEAPTUPLESIZE); |
583 | |
|
584 | 0 | return tuple; |
585 | 0 | } |
586 | | |
587 | | /* |
588 | | * Free a HeapTuple returned by ReorderBufferAllocTupleBuf(). |
589 | | */ |
590 | | void |
591 | | ReorderBufferFreeTupleBuf(HeapTuple tuple) |
592 | 0 | { |
593 | 0 | pfree(tuple); |
594 | 0 | } |
595 | | |
596 | | /* |
597 | | * Allocate an array for relids of truncated relations. |
598 | | * |
599 | | * We use the global memory context (for the whole reorder buffer), because |
600 | | * none of the existing ones seems like a good match (some are SLAB, so we |
601 | | * can't use those, and tup_context is meant for tuple data, not relids). We |
602 | | * could add yet another context, but it seems like an overkill - TRUNCATE is |
603 | | * not particularly common operation, so it does not seem worth it. |
604 | | */ |
605 | | Oid * |
606 | | ReorderBufferAllocRelids(ReorderBuffer *rb, int nrelids) |
607 | 0 | { |
608 | 0 | Oid *relids; |
609 | 0 | Size alloc_len; |
610 | |
|
611 | 0 | alloc_len = sizeof(Oid) * nrelids; |
612 | |
|
613 | 0 | relids = (Oid *) MemoryContextAlloc(rb->context, alloc_len); |
614 | |
|
615 | 0 | return relids; |
616 | 0 | } |
617 | | |
618 | | /* |
619 | | * Free an array of relids. |
620 | | */ |
621 | | void |
622 | | ReorderBufferFreeRelids(ReorderBuffer *rb, Oid *relids) |
623 | 0 | { |
624 | 0 | pfree(relids); |
625 | 0 | } |
626 | | |
627 | | /* |
628 | | * Return the ReorderBufferTXN from the given buffer, specified by Xid. |
629 | | * If create is true, and a transaction doesn't already exist, create it |
630 | | * (with the given LSN, and as top transaction if that's specified); |
631 | | * when this happens, is_new is set to true. |
632 | | */ |
633 | | static ReorderBufferTXN * |
634 | | ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create, |
635 | | bool *is_new, XLogRecPtr lsn, bool create_as_top) |
636 | 0 | { |
637 | 0 | ReorderBufferTXN *txn; |
638 | 0 | ReorderBufferTXNByIdEnt *ent; |
639 | 0 | bool found; |
640 | |
|
641 | 0 | Assert(TransactionIdIsValid(xid)); |
642 | | |
643 | | /* |
644 | | * Check the one-entry lookup cache first |
645 | | */ |
646 | 0 | if (TransactionIdIsValid(rb->by_txn_last_xid) && |
647 | 0 | rb->by_txn_last_xid == xid) |
648 | 0 | { |
649 | 0 | txn = rb->by_txn_last_txn; |
650 | |
|
651 | 0 | if (txn != NULL) |
652 | 0 | { |
653 | | /* found it, and it's valid */ |
654 | 0 | if (is_new) |
655 | 0 | *is_new = false; |
656 | 0 | return txn; |
657 | 0 | } |
658 | | |
659 | | /* |
660 | | * cached as non-existent, and asked not to create? Then nothing else |
661 | | * to do. |
662 | | */ |
663 | 0 | if (!create) |
664 | 0 | return NULL; |
665 | | /* otherwise fall through to create it */ |
666 | 0 | } |
667 | | |
668 | | /* |
669 | | * If the cache wasn't hit or it yielded a "does-not-exist" and we want to |
670 | | * create an entry. |
671 | | */ |
672 | | |
673 | | /* search the lookup table */ |
674 | 0 | ent = (ReorderBufferTXNByIdEnt *) |
675 | 0 | hash_search(rb->by_txn, |
676 | 0 | &xid, |
677 | 0 | create ? HASH_ENTER : HASH_FIND, |
678 | 0 | &found); |
679 | 0 | if (found) |
680 | 0 | txn = ent->txn; |
681 | 0 | else if (create) |
682 | 0 | { |
683 | | /* initialize the new entry, if creation was requested */ |
684 | 0 | Assert(ent != NULL); |
685 | 0 | Assert(lsn != InvalidXLogRecPtr); |
686 | |
|
687 | 0 | ent->txn = ReorderBufferAllocTXN(rb); |
688 | 0 | ent->txn->xid = xid; |
689 | 0 | txn = ent->txn; |
690 | 0 | txn->first_lsn = lsn; |
691 | 0 | txn->restart_decoding_lsn = rb->current_restart_decoding_lsn; |
692 | |
|
693 | 0 | if (create_as_top) |
694 | 0 | { |
695 | 0 | dlist_push_tail(&rb->toplevel_by_lsn, &txn->node); |
696 | 0 | AssertTXNLsnOrder(rb); |
697 | 0 | } |
698 | 0 | } |
699 | 0 | else |
700 | 0 | txn = NULL; /* not found and not asked to create */ |
701 | | |
702 | | /* update cache */ |
703 | 0 | rb->by_txn_last_xid = xid; |
704 | 0 | rb->by_txn_last_txn = txn; |
705 | |
|
706 | 0 | if (is_new) |
707 | 0 | *is_new = !found; |
708 | |
|
709 | 0 | Assert(!create || txn != NULL); |
710 | 0 | return txn; |
711 | 0 | } |
712 | | |
713 | | /* |
714 | | * Record the partial change for the streaming of in-progress transactions. We |
715 | | * can stream only complete changes so if we have a partial change like toast |
716 | | * table insert or speculative insert then we mark such a 'txn' so that it |
717 | | * can't be streamed. We also ensure that if the changes in such a 'txn' can |
718 | | * be streamed and are above logical_decoding_work_mem threshold then we stream |
719 | | * them as soon as we have a complete change. |
720 | | */ |
721 | | static void |
722 | | ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
723 | | ReorderBufferChange *change, |
724 | | bool toast_insert) |
725 | 0 | { |
726 | 0 | ReorderBufferTXN *toptxn; |
727 | | |
728 | | /* |
729 | | * The partial changes need to be processed only while streaming |
730 | | * in-progress transactions. |
731 | | */ |
732 | 0 | if (!ReorderBufferCanStream(rb)) |
733 | 0 | return; |
734 | | |
735 | | /* Get the top transaction. */ |
736 | 0 | toptxn = rbtxn_get_toptxn(txn); |
737 | | |
738 | | /* |
739 | | * Indicate a partial change for toast inserts. The change will be |
740 | | * considered as complete once we get the insert or update on the main |
741 | | * table and we are sure that the pending toast chunks are not required |
742 | | * anymore. |
743 | | * |
744 | | * If we allow streaming when there are pending toast chunks then such |
745 | | * chunks won't be released till the insert (multi_insert) is complete and |
746 | | * we expect the txn to have streamed all changes after streaming. This |
747 | | * restriction is mainly to ensure the correctness of streamed |
748 | | * transactions and it doesn't seem worth uplifting such a restriction |
749 | | * just to allow this case because anyway we will stream the transaction |
750 | | * once such an insert is complete. |
751 | | */ |
752 | 0 | if (toast_insert) |
753 | 0 | toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; |
754 | 0 | else if (rbtxn_has_partial_change(toptxn) && |
755 | 0 | IsInsertOrUpdate(change->action) && |
756 | 0 | change->data.tp.clear_toast_afterwards) |
757 | 0 | toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; |
758 | | |
759 | | /* |
760 | | * Indicate a partial change for speculative inserts. The change will be |
761 | | * considered as complete once we get the speculative confirm or abort |
762 | | * token. |
763 | | */ |
764 | 0 | if (IsSpecInsert(change->action)) |
765 | 0 | toptxn->txn_flags |= RBTXN_HAS_PARTIAL_CHANGE; |
766 | 0 | else if (rbtxn_has_partial_change(toptxn) && |
767 | 0 | IsSpecConfirmOrAbort(change->action)) |
768 | 0 | toptxn->txn_flags &= ~RBTXN_HAS_PARTIAL_CHANGE; |
769 | | |
770 | | /* |
771 | | * Stream the transaction if it is serialized before and the changes are |
772 | | * now complete in the top-level transaction. |
773 | | * |
774 | | * The reason for doing the streaming of such a transaction as soon as we |
775 | | * get the complete change for it is that previously it would have reached |
776 | | * the memory threshold and wouldn't get streamed because of incomplete |
777 | | * changes. Delaying such transactions would increase apply lag for them. |
778 | | */ |
779 | 0 | if (ReorderBufferCanStartStreaming(rb) && |
780 | 0 | !(rbtxn_has_partial_change(toptxn)) && |
781 | 0 | rbtxn_is_serialized(txn) && |
782 | 0 | rbtxn_has_streamable_change(toptxn)) |
783 | 0 | ReorderBufferStreamTXN(rb, toptxn); |
784 | 0 | } |
785 | | |
786 | | /* |
787 | | * Queue a change into a transaction so it can be replayed upon commit or will be |
788 | | * streamed when we reach logical_decoding_work_mem threshold. |
789 | | */ |
790 | | void |
791 | | ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, |
792 | | ReorderBufferChange *change, bool toast_insert) |
793 | 0 | { |
794 | 0 | ReorderBufferTXN *txn; |
795 | |
|
796 | 0 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
797 | | |
798 | | /* |
799 | | * If we have detected that the transaction is aborted while streaming the |
800 | | * previous changes or by checking its CLOG, there is no point in |
801 | | * collecting further changes for it. |
802 | | */ |
803 | 0 | if (rbtxn_is_aborted(txn)) |
804 | 0 | { |
805 | | /* |
806 | | * We don't need to update memory accounting for this change as we |
807 | | * have not added it to the queue yet. |
808 | | */ |
809 | 0 | ReorderBufferFreeChange(rb, change, false); |
810 | 0 | return; |
811 | 0 | } |
812 | | |
813 | | /* |
814 | | * The changes that are sent downstream are considered streamable. We |
815 | | * remember such transactions so that only those will later be considered |
816 | | * for streaming. |
817 | | */ |
818 | 0 | if (change->action == REORDER_BUFFER_CHANGE_INSERT || |
819 | 0 | change->action == REORDER_BUFFER_CHANGE_UPDATE || |
820 | 0 | change->action == REORDER_BUFFER_CHANGE_DELETE || |
821 | 0 | change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT || |
822 | 0 | change->action == REORDER_BUFFER_CHANGE_TRUNCATE || |
823 | 0 | change->action == REORDER_BUFFER_CHANGE_MESSAGE) |
824 | 0 | { |
825 | 0 | ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn); |
826 | |
|
827 | 0 | toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE; |
828 | 0 | } |
829 | |
|
830 | 0 | change->lsn = lsn; |
831 | 0 | change->txn = txn; |
832 | |
|
833 | 0 | Assert(InvalidXLogRecPtr != lsn); |
834 | 0 | dlist_push_tail(&txn->changes, &change->node); |
835 | 0 | txn->nentries++; |
836 | 0 | txn->nentries_mem++; |
837 | | |
838 | | /* update memory accounting information */ |
839 | 0 | ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, |
840 | 0 | ReorderBufferChangeSize(change)); |
841 | | |
842 | | /* process partial change */ |
843 | 0 | ReorderBufferProcessPartialChange(rb, txn, change, toast_insert); |
844 | | |
845 | | /* check the memory limits and evict something if needed */ |
846 | 0 | ReorderBufferCheckMemoryLimit(rb); |
847 | 0 | } |
848 | | |
849 | | /* |
850 | | * A transactional message is queued to be processed upon commit and a |
851 | | * non-transactional message gets processed immediately. |
852 | | */ |
853 | | void |
854 | | ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid, |
855 | | Snapshot snap, XLogRecPtr lsn, |
856 | | bool transactional, const char *prefix, |
857 | | Size message_size, const char *message) |
858 | 0 | { |
859 | 0 | if (transactional) |
860 | 0 | { |
861 | 0 | MemoryContext oldcontext; |
862 | 0 | ReorderBufferChange *change; |
863 | |
|
864 | 0 | Assert(xid != InvalidTransactionId); |
865 | | |
866 | | /* |
867 | | * We don't expect snapshots for transactional changes - we'll use the |
868 | | * snapshot derived later during apply (unless the change gets |
869 | | * skipped). |
870 | | */ |
871 | 0 | Assert(!snap); |
872 | |
|
873 | 0 | oldcontext = MemoryContextSwitchTo(rb->context); |
874 | |
|
875 | 0 | change = ReorderBufferAllocChange(rb); |
876 | 0 | change->action = REORDER_BUFFER_CHANGE_MESSAGE; |
877 | 0 | change->data.msg.prefix = pstrdup(prefix); |
878 | 0 | change->data.msg.message_size = message_size; |
879 | 0 | change->data.msg.message = palloc(message_size); |
880 | 0 | memcpy(change->data.msg.message, message, message_size); |
881 | |
|
882 | 0 | ReorderBufferQueueChange(rb, xid, lsn, change, false); |
883 | |
|
884 | 0 | MemoryContextSwitchTo(oldcontext); |
885 | 0 | } |
886 | 0 | else |
887 | 0 | { |
888 | 0 | ReorderBufferTXN *txn = NULL; |
889 | 0 | volatile Snapshot snapshot_now = snap; |
890 | | |
891 | | /* Non-transactional changes require a valid snapshot. */ |
892 | 0 | Assert(snapshot_now); |
893 | |
|
894 | 0 | if (xid != InvalidTransactionId) |
895 | 0 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
896 | | |
897 | | /* setup snapshot to allow catalog access */ |
898 | 0 | SetupHistoricSnapshot(snapshot_now, NULL); |
899 | 0 | PG_TRY(); |
900 | 0 | { |
901 | 0 | rb->message(rb, txn, lsn, false, prefix, message_size, message); |
902 | |
|
903 | 0 | TeardownHistoricSnapshot(false); |
904 | 0 | } |
905 | 0 | PG_CATCH(); |
906 | 0 | { |
907 | 0 | TeardownHistoricSnapshot(true); |
908 | 0 | PG_RE_THROW(); |
909 | 0 | } |
910 | 0 | PG_END_TRY(); |
911 | 0 | } |
912 | 0 | } |
913 | | |
914 | | /* |
915 | | * AssertTXNLsnOrder |
916 | | * Verify LSN ordering of transaction lists in the reorderbuffer |
917 | | * |
918 | | * Other LSN-related invariants are checked too. |
919 | | * |
920 | | * No-op if assertions are not in use. |
921 | | */ |
922 | | static void |
923 | | AssertTXNLsnOrder(ReorderBuffer *rb) |
924 | 0 | { |
925 | | #ifdef USE_ASSERT_CHECKING |
926 | | LogicalDecodingContext *ctx = rb->private_data; |
927 | | dlist_iter iter; |
928 | | XLogRecPtr prev_first_lsn = InvalidXLogRecPtr; |
929 | | XLogRecPtr prev_base_snap_lsn = InvalidXLogRecPtr; |
930 | | |
931 | | /* |
932 | | * Skip the verification if we don't reach the LSN at which we start |
933 | | * decoding the contents of transactions yet because until we reach the |
934 | | * LSN, we could have transactions that don't have the association between |
935 | | * the top-level transaction and subtransaction yet and consequently have |
936 | | * the same LSN. We don't guarantee this association until we try to |
937 | | * decode the actual contents of transaction. The ordering of the records |
938 | | * prior to the start_decoding_at LSN should have been checked before the |
939 | | * restart. |
940 | | */ |
941 | | if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, ctx->reader->EndRecPtr)) |
942 | | return; |
943 | | |
944 | | dlist_foreach(iter, &rb->toplevel_by_lsn) |
945 | | { |
946 | | ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, node, |
947 | | iter.cur); |
948 | | |
949 | | /* start LSN must be set */ |
950 | | Assert(cur_txn->first_lsn != InvalidXLogRecPtr); |
951 | | |
952 | | /* If there is an end LSN, it must be higher than start LSN */ |
953 | | if (cur_txn->end_lsn != InvalidXLogRecPtr) |
954 | | Assert(cur_txn->first_lsn <= cur_txn->end_lsn); |
955 | | |
956 | | /* Current initial LSN must be strictly higher than previous */ |
957 | | if (prev_first_lsn != InvalidXLogRecPtr) |
958 | | Assert(prev_first_lsn < cur_txn->first_lsn); |
959 | | |
960 | | /* known-as-subtxn txns must not be listed */ |
961 | | Assert(!rbtxn_is_known_subxact(cur_txn)); |
962 | | |
963 | | prev_first_lsn = cur_txn->first_lsn; |
964 | | } |
965 | | |
966 | | dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) |
967 | | { |
968 | | ReorderBufferTXN *cur_txn = dlist_container(ReorderBufferTXN, |
969 | | base_snapshot_node, |
970 | | iter.cur); |
971 | | |
972 | | /* base snapshot (and its LSN) must be set */ |
973 | | Assert(cur_txn->base_snapshot != NULL); |
974 | | Assert(cur_txn->base_snapshot_lsn != InvalidXLogRecPtr); |
975 | | |
976 | | /* current LSN must be strictly higher than previous */ |
977 | | if (prev_base_snap_lsn != InvalidXLogRecPtr) |
978 | | Assert(prev_base_snap_lsn < cur_txn->base_snapshot_lsn); |
979 | | |
980 | | /* known-as-subtxn txns must not be listed */ |
981 | | Assert(!rbtxn_is_known_subxact(cur_txn)); |
982 | | |
983 | | prev_base_snap_lsn = cur_txn->base_snapshot_lsn; |
984 | | } |
985 | | #endif |
986 | 0 | } |
987 | | |
988 | | /* |
989 | | * AssertChangeLsnOrder |
990 | | * |
991 | | * Check ordering of changes in the (sub)transaction. |
992 | | */ |
993 | | static void |
994 | | AssertChangeLsnOrder(ReorderBufferTXN *txn) |
995 | 0 | { |
996 | | #ifdef USE_ASSERT_CHECKING |
997 | | dlist_iter iter; |
998 | | XLogRecPtr prev_lsn = txn->first_lsn; |
999 | | |
1000 | | dlist_foreach(iter, &txn->changes) |
1001 | | { |
1002 | | ReorderBufferChange *cur_change; |
1003 | | |
1004 | | cur_change = dlist_container(ReorderBufferChange, node, iter.cur); |
1005 | | |
1006 | | Assert(txn->first_lsn != InvalidXLogRecPtr); |
1007 | | Assert(cur_change->lsn != InvalidXLogRecPtr); |
1008 | | Assert(txn->first_lsn <= cur_change->lsn); |
1009 | | |
1010 | | if (txn->end_lsn != InvalidXLogRecPtr) |
1011 | | Assert(cur_change->lsn <= txn->end_lsn); |
1012 | | |
1013 | | Assert(prev_lsn <= cur_change->lsn); |
1014 | | |
1015 | | prev_lsn = cur_change->lsn; |
1016 | | } |
1017 | | #endif |
1018 | 0 | } |
1019 | | |
1020 | | /* |
1021 | | * ReorderBufferGetOldestTXN |
1022 | | * Return oldest transaction in reorderbuffer |
1023 | | */ |
1024 | | ReorderBufferTXN * |
1025 | | ReorderBufferGetOldestTXN(ReorderBuffer *rb) |
1026 | 0 | { |
1027 | 0 | ReorderBufferTXN *txn; |
1028 | |
|
1029 | 0 | AssertTXNLsnOrder(rb); |
1030 | |
|
1031 | 0 | if (dlist_is_empty(&rb->toplevel_by_lsn)) |
1032 | 0 | return NULL; |
1033 | | |
1034 | 0 | txn = dlist_head_element(ReorderBufferTXN, node, &rb->toplevel_by_lsn); |
1035 | |
|
1036 | 0 | Assert(!rbtxn_is_known_subxact(txn)); |
1037 | 0 | Assert(txn->first_lsn != InvalidXLogRecPtr); |
1038 | 0 | return txn; |
1039 | 0 | } |
1040 | | |
1041 | | /* |
1042 | | * ReorderBufferGetOldestXmin |
1043 | | * Return oldest Xmin in reorderbuffer |
1044 | | * |
1045 | | * Returns oldest possibly running Xid from the point of view of snapshots |
1046 | | * used in the transactions kept by reorderbuffer, or InvalidTransactionId if |
1047 | | * there are none. |
1048 | | * |
1049 | | * Since snapshots are assigned monotonically, this equals the Xmin of the |
1050 | | * base snapshot with minimal base_snapshot_lsn. |
1051 | | */ |
1052 | | TransactionId |
1053 | | ReorderBufferGetOldestXmin(ReorderBuffer *rb) |
1054 | 0 | { |
1055 | 0 | ReorderBufferTXN *txn; |
1056 | |
|
1057 | 0 | AssertTXNLsnOrder(rb); |
1058 | |
|
1059 | 0 | if (dlist_is_empty(&rb->txns_by_base_snapshot_lsn)) |
1060 | 0 | return InvalidTransactionId; |
1061 | | |
1062 | 0 | txn = dlist_head_element(ReorderBufferTXN, base_snapshot_node, |
1063 | 0 | &rb->txns_by_base_snapshot_lsn); |
1064 | 0 | return txn->base_snapshot->xmin; |
1065 | 0 | } |
1066 | | |
1067 | | void |
1068 | | ReorderBufferSetRestartPoint(ReorderBuffer *rb, XLogRecPtr ptr) |
1069 | 0 | { |
1070 | 0 | rb->current_restart_decoding_lsn = ptr; |
1071 | 0 | } |
1072 | | |
1073 | | /* |
1074 | | * ReorderBufferAssignChild |
1075 | | * |
1076 | | * Make note that we know that subxid is a subtransaction of xid, seen as of |
1077 | | * the given lsn. |
1078 | | */ |
1079 | | void |
1080 | | ReorderBufferAssignChild(ReorderBuffer *rb, TransactionId xid, |
1081 | | TransactionId subxid, XLogRecPtr lsn) |
1082 | 0 | { |
1083 | 0 | ReorderBufferTXN *txn; |
1084 | 0 | ReorderBufferTXN *subtxn; |
1085 | 0 | bool new_top; |
1086 | 0 | bool new_sub; |
1087 | |
|
1088 | 0 | txn = ReorderBufferTXNByXid(rb, xid, true, &new_top, lsn, true); |
1089 | 0 | subtxn = ReorderBufferTXNByXid(rb, subxid, true, &new_sub, lsn, false); |
1090 | |
|
1091 | 0 | if (!new_sub) |
1092 | 0 | { |
1093 | 0 | if (rbtxn_is_known_subxact(subtxn)) |
1094 | 0 | { |
1095 | | /* already associated, nothing to do */ |
1096 | 0 | return; |
1097 | 0 | } |
1098 | 0 | else |
1099 | 0 | { |
1100 | | /* |
1101 | | * We already saw this transaction, but initially added it to the |
1102 | | * list of top-level txns. Now that we know it's not top-level, |
1103 | | * remove it from there. |
1104 | | */ |
1105 | 0 | dlist_delete(&subtxn->node); |
1106 | 0 | } |
1107 | 0 | } |
1108 | | |
1109 | 0 | subtxn->txn_flags |= RBTXN_IS_SUBXACT; |
1110 | 0 | subtxn->toplevel_xid = xid; |
1111 | 0 | Assert(subtxn->nsubtxns == 0); |
1112 | | |
1113 | | /* set the reference to top-level transaction */ |
1114 | 0 | subtxn->toptxn = txn; |
1115 | | |
1116 | | /* add to subtransaction list */ |
1117 | 0 | dlist_push_tail(&txn->subtxns, &subtxn->node); |
1118 | 0 | txn->nsubtxns++; |
1119 | | |
1120 | | /* Possibly transfer the subtxn's snapshot to its top-level txn. */ |
1121 | 0 | ReorderBufferTransferSnapToParent(txn, subtxn); |
1122 | | |
1123 | | /* Verify LSN-ordering invariant */ |
1124 | 0 | AssertTXNLsnOrder(rb); |
1125 | 0 | } |
1126 | | |
1127 | | /* |
1128 | | * ReorderBufferTransferSnapToParent |
1129 | | * Transfer base snapshot from subtxn to top-level txn, if needed |
1130 | | * |
1131 | | * This is done if the top-level txn doesn't have a base snapshot, or if the |
1132 | | * subtxn's base snapshot has an earlier LSN than the top-level txn's base |
1133 | | * snapshot's LSN. This can happen if there are no changes in the toplevel |
1134 | | * txn but there are some in the subtxn, or the first change in subtxn has |
1135 | | * earlier LSN than first change in the top-level txn and we learned about |
1136 | | * their kinship only now. |
1137 | | * |
1138 | | * The subtransaction's snapshot is cleared regardless of the transfer |
1139 | | * happening, since it's not needed anymore in either case. |
1140 | | * |
1141 | | * We do this as soon as we become aware of their kinship, to avoid queueing |
1142 | | * extra snapshots to txns known-as-subtxns -- only top-level txns will |
1143 | | * receive further snapshots. |
1144 | | */ |
1145 | | static void |
1146 | | ReorderBufferTransferSnapToParent(ReorderBufferTXN *txn, |
1147 | | ReorderBufferTXN *subtxn) |
1148 | 0 | { |
1149 | 0 | Assert(subtxn->toplevel_xid == txn->xid); |
1150 | |
|
1151 | 0 | if (subtxn->base_snapshot != NULL) |
1152 | 0 | { |
1153 | 0 | if (txn->base_snapshot == NULL || |
1154 | 0 | subtxn->base_snapshot_lsn < txn->base_snapshot_lsn) |
1155 | 0 | { |
1156 | | /* |
1157 | | * If the toplevel transaction already has a base snapshot but |
1158 | | * it's newer than the subxact's, purge it. |
1159 | | */ |
1160 | 0 | if (txn->base_snapshot != NULL) |
1161 | 0 | { |
1162 | 0 | SnapBuildSnapDecRefcount(txn->base_snapshot); |
1163 | 0 | dlist_delete(&txn->base_snapshot_node); |
1164 | 0 | } |
1165 | | |
1166 | | /* |
1167 | | * The snapshot is now the top transaction's; transfer it, and |
1168 | | * adjust the list position of the top transaction in the list by |
1169 | | * moving it to where the subtransaction is. |
1170 | | */ |
1171 | 0 | txn->base_snapshot = subtxn->base_snapshot; |
1172 | 0 | txn->base_snapshot_lsn = subtxn->base_snapshot_lsn; |
1173 | 0 | dlist_insert_before(&subtxn->base_snapshot_node, |
1174 | 0 | &txn->base_snapshot_node); |
1175 | | |
1176 | | /* |
1177 | | * The subtransaction doesn't have a snapshot anymore (so it |
1178 | | * mustn't be in the list.) |
1179 | | */ |
1180 | 0 | subtxn->base_snapshot = NULL; |
1181 | 0 | subtxn->base_snapshot_lsn = InvalidXLogRecPtr; |
1182 | 0 | dlist_delete(&subtxn->base_snapshot_node); |
1183 | 0 | } |
1184 | 0 | else |
1185 | 0 | { |
1186 | | /* Base snap of toplevel is fine, so subxact's is not needed */ |
1187 | 0 | SnapBuildSnapDecRefcount(subtxn->base_snapshot); |
1188 | 0 | dlist_delete(&subtxn->base_snapshot_node); |
1189 | 0 | subtxn->base_snapshot = NULL; |
1190 | 0 | subtxn->base_snapshot_lsn = InvalidXLogRecPtr; |
1191 | 0 | } |
1192 | 0 | } |
1193 | 0 | } |
1194 | | |
1195 | | /* |
1196 | | * Associate a subtransaction with its toplevel transaction at commit |
1197 | | * time. There may be no further changes added after this. |
1198 | | */ |
1199 | | void |
1200 | | ReorderBufferCommitChild(ReorderBuffer *rb, TransactionId xid, |
1201 | | TransactionId subxid, XLogRecPtr commit_lsn, |
1202 | | XLogRecPtr end_lsn) |
1203 | 0 | { |
1204 | 0 | ReorderBufferTXN *subtxn; |
1205 | |
|
1206 | 0 | subtxn = ReorderBufferTXNByXid(rb, subxid, false, NULL, |
1207 | 0 | InvalidXLogRecPtr, false); |
1208 | | |
1209 | | /* |
1210 | | * No need to do anything if that subtxn didn't contain any changes |
1211 | | */ |
1212 | 0 | if (!subtxn) |
1213 | 0 | return; |
1214 | | |
1215 | 0 | subtxn->final_lsn = commit_lsn; |
1216 | 0 | subtxn->end_lsn = end_lsn; |
1217 | | |
1218 | | /* |
1219 | | * Assign this subxact as a child of the toplevel xact (no-op if already |
1220 | | * done.) |
1221 | | */ |
1222 | 0 | ReorderBufferAssignChild(rb, xid, subxid, InvalidXLogRecPtr); |
1223 | 0 | } |
1224 | | |
1225 | | |
1226 | | /* |
1227 | | * Support for efficiently iterating over a transaction's and its |
1228 | | * subtransactions' changes. |
1229 | | * |
1230 | | * We do by doing a k-way merge between transactions/subtransactions. For that |
1231 | | * we model the current heads of the different transactions as a binary heap |
1232 | | * so we easily know which (sub-)transaction has the change with the smallest |
1233 | | * lsn next. |
1234 | | * |
1235 | | * We assume the changes in individual transactions are already sorted by LSN. |
1236 | | */ |
1237 | | |
1238 | | /* |
1239 | | * Binary heap comparison function. |
1240 | | */ |
1241 | | static int |
1242 | | ReorderBufferIterCompare(Datum a, Datum b, void *arg) |
1243 | 0 | { |
1244 | 0 | ReorderBufferIterTXNState *state = (ReorderBufferIterTXNState *) arg; |
1245 | 0 | XLogRecPtr pos_a = state->entries[DatumGetInt32(a)].lsn; |
1246 | 0 | XLogRecPtr pos_b = state->entries[DatumGetInt32(b)].lsn; |
1247 | |
|
1248 | 0 | if (pos_a < pos_b) |
1249 | 0 | return 1; |
1250 | 0 | else if (pos_a == pos_b) |
1251 | 0 | return 0; |
1252 | 0 | return -1; |
1253 | 0 | } |
1254 | | |
1255 | | /* |
1256 | | * Allocate & initialize an iterator which iterates in lsn order over a |
1257 | | * transaction and all its subtransactions. |
1258 | | * |
1259 | | * Note: The iterator state is returned through iter_state parameter rather |
1260 | | * than the function's return value. This is because the state gets cleaned up |
1261 | | * in a PG_CATCH block in the caller, so we want to make sure the caller gets |
1262 | | * back the state even if this function throws an exception. |
1263 | | */ |
1264 | | static void |
1265 | | ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn, |
1266 | | ReorderBufferIterTXNState *volatile *iter_state) |
1267 | 0 | { |
1268 | 0 | Size nr_txns = 0; |
1269 | 0 | ReorderBufferIterTXNState *state; |
1270 | 0 | dlist_iter cur_txn_i; |
1271 | 0 | int32 off; |
1272 | |
|
1273 | 0 | *iter_state = NULL; |
1274 | | |
1275 | | /* Check ordering of changes in the toplevel transaction. */ |
1276 | 0 | AssertChangeLsnOrder(txn); |
1277 | | |
1278 | | /* |
1279 | | * Calculate the size of our heap: one element for every transaction that |
1280 | | * contains changes. (Besides the transactions already in the reorder |
1281 | | * buffer, we count the one we were directly passed.) |
1282 | | */ |
1283 | 0 | if (txn->nentries > 0) |
1284 | 0 | nr_txns++; |
1285 | |
|
1286 | 0 | dlist_foreach(cur_txn_i, &txn->subtxns) |
1287 | 0 | { |
1288 | 0 | ReorderBufferTXN *cur_txn; |
1289 | |
|
1290 | 0 | cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); |
1291 | | |
1292 | | /* Check ordering of changes in this subtransaction. */ |
1293 | 0 | AssertChangeLsnOrder(cur_txn); |
1294 | |
|
1295 | 0 | if (cur_txn->nentries > 0) |
1296 | 0 | nr_txns++; |
1297 | 0 | } |
1298 | | |
1299 | | /* allocate iteration state */ |
1300 | 0 | state = (ReorderBufferIterTXNState *) |
1301 | 0 | MemoryContextAllocZero(rb->context, |
1302 | 0 | sizeof(ReorderBufferIterTXNState) + |
1303 | 0 | sizeof(ReorderBufferIterTXNEntry) * nr_txns); |
1304 | |
|
1305 | 0 | state->nr_txns = nr_txns; |
1306 | 0 | dlist_init(&state->old_change); |
1307 | |
|
1308 | 0 | for (off = 0; off < state->nr_txns; off++) |
1309 | 0 | { |
1310 | 0 | state->entries[off].file.vfd = -1; |
1311 | 0 | state->entries[off].segno = 0; |
1312 | 0 | } |
1313 | | |
1314 | | /* allocate heap */ |
1315 | 0 | state->heap = binaryheap_allocate(state->nr_txns, |
1316 | 0 | ReorderBufferIterCompare, |
1317 | 0 | state); |
1318 | | |
1319 | | /* Now that the state fields are initialized, it is safe to return it. */ |
1320 | 0 | *iter_state = state; |
1321 | | |
1322 | | /* |
1323 | | * Now insert items into the binary heap, in an unordered fashion. (We |
1324 | | * will run a heap assembly step at the end; this is more efficient.) |
1325 | | */ |
1326 | |
|
1327 | 0 | off = 0; |
1328 | | |
1329 | | /* add toplevel transaction if it contains changes */ |
1330 | 0 | if (txn->nentries > 0) |
1331 | 0 | { |
1332 | 0 | ReorderBufferChange *cur_change; |
1333 | |
|
1334 | 0 | if (rbtxn_is_serialized(txn)) |
1335 | 0 | { |
1336 | | /* serialize remaining changes */ |
1337 | 0 | ReorderBufferSerializeTXN(rb, txn); |
1338 | 0 | ReorderBufferRestoreChanges(rb, txn, &state->entries[off].file, |
1339 | 0 | &state->entries[off].segno); |
1340 | 0 | } |
1341 | |
|
1342 | 0 | cur_change = dlist_head_element(ReorderBufferChange, node, |
1343 | 0 | &txn->changes); |
1344 | |
|
1345 | 0 | state->entries[off].lsn = cur_change->lsn; |
1346 | 0 | state->entries[off].change = cur_change; |
1347 | 0 | state->entries[off].txn = txn; |
1348 | |
|
1349 | 0 | binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); |
1350 | 0 | } |
1351 | | |
1352 | | /* add subtransactions if they contain changes */ |
1353 | 0 | dlist_foreach(cur_txn_i, &txn->subtxns) |
1354 | 0 | { |
1355 | 0 | ReorderBufferTXN *cur_txn; |
1356 | |
|
1357 | 0 | cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur); |
1358 | |
|
1359 | 0 | if (cur_txn->nentries > 0) |
1360 | 0 | { |
1361 | 0 | ReorderBufferChange *cur_change; |
1362 | |
|
1363 | 0 | if (rbtxn_is_serialized(cur_txn)) |
1364 | 0 | { |
1365 | | /* serialize remaining changes */ |
1366 | 0 | ReorderBufferSerializeTXN(rb, cur_txn); |
1367 | 0 | ReorderBufferRestoreChanges(rb, cur_txn, |
1368 | 0 | &state->entries[off].file, |
1369 | 0 | &state->entries[off].segno); |
1370 | 0 | } |
1371 | 0 | cur_change = dlist_head_element(ReorderBufferChange, node, |
1372 | 0 | &cur_txn->changes); |
1373 | |
|
1374 | 0 | state->entries[off].lsn = cur_change->lsn; |
1375 | 0 | state->entries[off].change = cur_change; |
1376 | 0 | state->entries[off].txn = cur_txn; |
1377 | |
|
1378 | 0 | binaryheap_add_unordered(state->heap, Int32GetDatum(off++)); |
1379 | 0 | } |
1380 | 0 | } |
1381 | | |
1382 | | /* assemble a valid binary heap */ |
1383 | 0 | binaryheap_build(state->heap); |
1384 | 0 | } |
1385 | | |
1386 | | /* |
1387 | | * Return the next change when iterating over a transaction and its |
1388 | | * subtransactions. |
1389 | | * |
1390 | | * Returns NULL when no further changes exist. |
1391 | | */ |
1392 | | static ReorderBufferChange * |
1393 | | ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state) |
1394 | 0 | { |
1395 | 0 | ReorderBufferChange *change; |
1396 | 0 | ReorderBufferIterTXNEntry *entry; |
1397 | 0 | int32 off; |
1398 | | |
1399 | | /* nothing there anymore */ |
1400 | 0 | if (state->heap->bh_size == 0) |
1401 | 0 | return NULL; |
1402 | | |
1403 | 0 | off = DatumGetInt32(binaryheap_first(state->heap)); |
1404 | 0 | entry = &state->entries[off]; |
1405 | | |
1406 | | /* free memory we might have "leaked" in the previous *Next call */ |
1407 | 0 | if (!dlist_is_empty(&state->old_change)) |
1408 | 0 | { |
1409 | 0 | change = dlist_container(ReorderBufferChange, node, |
1410 | 0 | dlist_pop_head_node(&state->old_change)); |
1411 | 0 | ReorderBufferFreeChange(rb, change, true); |
1412 | 0 | Assert(dlist_is_empty(&state->old_change)); |
1413 | 0 | } |
1414 | |
|
1415 | 0 | change = entry->change; |
1416 | | |
1417 | | /* |
1418 | | * update heap with information about which transaction has the next |
1419 | | * relevant change in LSN order |
1420 | | */ |
1421 | | |
1422 | | /* there are in-memory changes */ |
1423 | 0 | if (dlist_has_next(&entry->txn->changes, &entry->change->node)) |
1424 | 0 | { |
1425 | 0 | dlist_node *next = dlist_next_node(&entry->txn->changes, &change->node); |
1426 | 0 | ReorderBufferChange *next_change = |
1427 | 0 | dlist_container(ReorderBufferChange, node, next); |
1428 | | |
1429 | | /* txn stays the same */ |
1430 | 0 | state->entries[off].lsn = next_change->lsn; |
1431 | 0 | state->entries[off].change = next_change; |
1432 | |
|
1433 | 0 | binaryheap_replace_first(state->heap, Int32GetDatum(off)); |
1434 | 0 | return change; |
1435 | 0 | } |
1436 | | |
1437 | | /* try to load changes from disk */ |
1438 | 0 | if (entry->txn->nentries != entry->txn->nentries_mem) |
1439 | 0 | { |
1440 | | /* |
1441 | | * Ugly: restoring changes will reuse *Change records, thus delete the |
1442 | | * current one from the per-tx list and only free in the next call. |
1443 | | */ |
1444 | 0 | dlist_delete(&change->node); |
1445 | 0 | dlist_push_tail(&state->old_change, &change->node); |
1446 | | |
1447 | | /* |
1448 | | * Update the total bytes processed by the txn for which we are |
1449 | | * releasing the current set of changes and restoring the new set of |
1450 | | * changes. |
1451 | | */ |
1452 | 0 | rb->totalBytes += entry->txn->size; |
1453 | 0 | if (ReorderBufferRestoreChanges(rb, entry->txn, &entry->file, |
1454 | 0 | &state->entries[off].segno)) |
1455 | 0 | { |
1456 | | /* successfully restored changes from disk */ |
1457 | 0 | ReorderBufferChange *next_change = |
1458 | 0 | dlist_head_element(ReorderBufferChange, node, |
1459 | 0 | &entry->txn->changes); |
1460 | |
|
1461 | 0 | elog(DEBUG2, "restored %u/%u changes from disk", |
1462 | 0 | (uint32) entry->txn->nentries_mem, |
1463 | 0 | (uint32) entry->txn->nentries); |
1464 | | |
1465 | 0 | Assert(entry->txn->nentries_mem); |
1466 | | /* txn stays the same */ |
1467 | 0 | state->entries[off].lsn = next_change->lsn; |
1468 | 0 | state->entries[off].change = next_change; |
1469 | 0 | binaryheap_replace_first(state->heap, Int32GetDatum(off)); |
1470 | |
|
1471 | 0 | return change; |
1472 | 0 | } |
1473 | 0 | } |
1474 | | |
1475 | | /* ok, no changes there anymore, remove */ |
1476 | 0 | binaryheap_remove_first(state->heap); |
1477 | |
|
1478 | 0 | return change; |
1479 | 0 | } |
1480 | | |
1481 | | /* |
1482 | | * Deallocate the iterator |
1483 | | */ |
1484 | | static void |
1485 | | ReorderBufferIterTXNFinish(ReorderBuffer *rb, |
1486 | | ReorderBufferIterTXNState *state) |
1487 | 0 | { |
1488 | 0 | int32 off; |
1489 | |
|
1490 | 0 | for (off = 0; off < state->nr_txns; off++) |
1491 | 0 | { |
1492 | 0 | if (state->entries[off].file.vfd != -1) |
1493 | 0 | FileClose(state->entries[off].file.vfd); |
1494 | 0 | } |
1495 | | |
1496 | | /* free memory we might have "leaked" in the last *Next call */ |
1497 | 0 | if (!dlist_is_empty(&state->old_change)) |
1498 | 0 | { |
1499 | 0 | ReorderBufferChange *change; |
1500 | |
|
1501 | 0 | change = dlist_container(ReorderBufferChange, node, |
1502 | 0 | dlist_pop_head_node(&state->old_change)); |
1503 | 0 | ReorderBufferFreeChange(rb, change, true); |
1504 | 0 | Assert(dlist_is_empty(&state->old_change)); |
1505 | 0 | } |
1506 | |
|
1507 | 0 | binaryheap_free(state->heap); |
1508 | 0 | pfree(state); |
1509 | 0 | } |
1510 | | |
1511 | | /* |
1512 | | * Cleanup the contents of a transaction, usually after the transaction |
1513 | | * committed or aborted. |
1514 | | */ |
1515 | | static void |
1516 | | ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
1517 | 0 | { |
1518 | 0 | bool found; |
1519 | 0 | dlist_mutable_iter iter; |
1520 | 0 | Size mem_freed = 0; |
1521 | | |
1522 | | /* cleanup subtransactions & their changes */ |
1523 | 0 | dlist_foreach_modify(iter, &txn->subtxns) |
1524 | 0 | { |
1525 | 0 | ReorderBufferTXN *subtxn; |
1526 | |
|
1527 | 0 | subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); |
1528 | | |
1529 | | /* |
1530 | | * Subtransactions are always associated to the toplevel TXN, even if |
1531 | | * they originally were happening inside another subtxn, so we won't |
1532 | | * ever recurse more than one level deep here. |
1533 | | */ |
1534 | 0 | Assert(rbtxn_is_known_subxact(subtxn)); |
1535 | 0 | Assert(subtxn->nsubtxns == 0); |
1536 | |
|
1537 | 0 | ReorderBufferCleanupTXN(rb, subtxn); |
1538 | 0 | } |
1539 | | |
1540 | | /* cleanup changes in the txn */ |
1541 | 0 | dlist_foreach_modify(iter, &txn->changes) |
1542 | 0 | { |
1543 | 0 | ReorderBufferChange *change; |
1544 | |
|
1545 | 0 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1546 | | |
1547 | | /* Check we're not mixing changes from different transactions. */ |
1548 | 0 | Assert(change->txn == txn); |
1549 | | |
1550 | | /* |
1551 | | * Instead of updating the memory counter for individual changes, we |
1552 | | * sum up the size of memory to free so we can update the memory |
1553 | | * counter all together below. This saves costs of maintaining the |
1554 | | * max-heap. |
1555 | | */ |
1556 | 0 | mem_freed += ReorderBufferChangeSize(change); |
1557 | |
|
1558 | 0 | ReorderBufferFreeChange(rb, change, false); |
1559 | 0 | } |
1560 | | |
1561 | | /* Update the memory counter */ |
1562 | 0 | ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); |
1563 | | |
1564 | | /* |
1565 | | * Cleanup the tuplecids we stored for decoding catalog snapshot access. |
1566 | | * They are always stored in the toplevel transaction. |
1567 | | */ |
1568 | 0 | dlist_foreach_modify(iter, &txn->tuplecids) |
1569 | 0 | { |
1570 | 0 | ReorderBufferChange *change; |
1571 | |
|
1572 | 0 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1573 | | |
1574 | | /* Check we're not mixing changes from different transactions. */ |
1575 | 0 | Assert(change->txn == txn); |
1576 | 0 | Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); |
1577 | |
|
1578 | 0 | ReorderBufferFreeChange(rb, change, true); |
1579 | 0 | } |
1580 | | |
1581 | | /* |
1582 | | * Cleanup the base snapshot, if set. |
1583 | | */ |
1584 | 0 | if (txn->base_snapshot != NULL) |
1585 | 0 | { |
1586 | 0 | SnapBuildSnapDecRefcount(txn->base_snapshot); |
1587 | 0 | dlist_delete(&txn->base_snapshot_node); |
1588 | 0 | } |
1589 | | |
1590 | | /* |
1591 | | * Cleanup the snapshot for the last streamed run. |
1592 | | */ |
1593 | 0 | if (txn->snapshot_now != NULL) |
1594 | 0 | { |
1595 | 0 | Assert(rbtxn_is_streamed(txn)); |
1596 | 0 | ReorderBufferFreeSnap(rb, txn->snapshot_now); |
1597 | 0 | } |
1598 | | |
1599 | | /* |
1600 | | * Remove TXN from its containing lists. |
1601 | | * |
1602 | | * Note: if txn is known as subxact, we are deleting the TXN from its |
1603 | | * parent's list of known subxacts; this leaves the parent's nsubxacts |
1604 | | * count too high, but we don't care. Otherwise, we are deleting the TXN |
1605 | | * from the LSN-ordered list of toplevel TXNs. We remove the TXN from the |
1606 | | * list of catalog modifying transactions as well. |
1607 | | */ |
1608 | 0 | dlist_delete(&txn->node); |
1609 | 0 | if (rbtxn_has_catalog_changes(txn)) |
1610 | 0 | dclist_delete_from(&rb->catchange_txns, &txn->catchange_node); |
1611 | | |
1612 | | /* now remove reference from buffer */ |
1613 | 0 | hash_search(rb->by_txn, &txn->xid, HASH_REMOVE, &found); |
1614 | 0 | Assert(found); |
1615 | | |
1616 | | /* remove entries spilled to disk */ |
1617 | 0 | if (rbtxn_is_serialized(txn)) |
1618 | 0 | ReorderBufferRestoreCleanup(rb, txn); |
1619 | | |
1620 | | /* deallocate */ |
1621 | 0 | ReorderBufferFreeTXN(rb, txn); |
1622 | 0 | } |
1623 | | |
1624 | | /* |
1625 | | * Discard changes from a transaction (and subtransactions), either after |
1626 | | * streaming, decoding them at PREPARE, or detecting the transaction abort. |
1627 | | * Keep the remaining info - transactions, tuplecids, invalidations and |
1628 | | * snapshots. |
1629 | | * |
1630 | | * We additionally remove tuplecids after decoding the transaction at prepare |
1631 | | * time as we only need to perform invalidation at rollback or commit prepared. |
1632 | | * |
1633 | | * 'txn_prepared' indicates that we have decoded the transaction at prepare |
1634 | | * time. |
1635 | | */ |
1636 | | static void |
1637 | | ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared) |
1638 | 0 | { |
1639 | 0 | dlist_mutable_iter iter; |
1640 | 0 | Size mem_freed = 0; |
1641 | | |
1642 | | /* cleanup subtransactions & their changes */ |
1643 | 0 | dlist_foreach_modify(iter, &txn->subtxns) |
1644 | 0 | { |
1645 | 0 | ReorderBufferTXN *subtxn; |
1646 | |
|
1647 | 0 | subtxn = dlist_container(ReorderBufferTXN, node, iter.cur); |
1648 | | |
1649 | | /* |
1650 | | * Subtransactions are always associated to the toplevel TXN, even if |
1651 | | * they originally were happening inside another subtxn, so we won't |
1652 | | * ever recurse more than one level deep here. |
1653 | | */ |
1654 | 0 | Assert(rbtxn_is_known_subxact(subtxn)); |
1655 | 0 | Assert(subtxn->nsubtxns == 0); |
1656 | |
|
1657 | 0 | ReorderBufferMaybeMarkTXNStreamed(rb, subtxn); |
1658 | 0 | ReorderBufferTruncateTXN(rb, subtxn, txn_prepared); |
1659 | 0 | } |
1660 | | |
1661 | | /* cleanup changes in the txn */ |
1662 | 0 | dlist_foreach_modify(iter, &txn->changes) |
1663 | 0 | { |
1664 | 0 | ReorderBufferChange *change; |
1665 | |
|
1666 | 0 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1667 | | |
1668 | | /* Check we're not mixing changes from different transactions. */ |
1669 | 0 | Assert(change->txn == txn); |
1670 | | |
1671 | | /* remove the change from its containing list */ |
1672 | 0 | dlist_delete(&change->node); |
1673 | | |
1674 | | /* |
1675 | | * Instead of updating the memory counter for individual changes, we |
1676 | | * sum up the size of memory to free so we can update the memory |
1677 | | * counter all together below. This saves costs of maintaining the |
1678 | | * max-heap. |
1679 | | */ |
1680 | 0 | mem_freed += ReorderBufferChangeSize(change); |
1681 | |
|
1682 | 0 | ReorderBufferFreeChange(rb, change, false); |
1683 | 0 | } |
1684 | | |
1685 | | /* Update the memory counter */ |
1686 | 0 | ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed); |
1687 | |
|
1688 | 0 | if (txn_prepared) |
1689 | 0 | { |
1690 | | /* |
1691 | | * If this is a prepared txn, cleanup the tuplecids we stored for |
1692 | | * decoding catalog snapshot access. They are always stored in the |
1693 | | * toplevel transaction. |
1694 | | */ |
1695 | 0 | dlist_foreach_modify(iter, &txn->tuplecids) |
1696 | 0 | { |
1697 | 0 | ReorderBufferChange *change; |
1698 | |
|
1699 | 0 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1700 | | |
1701 | | /* Check we're not mixing changes from different transactions. */ |
1702 | 0 | Assert(change->txn == txn); |
1703 | 0 | Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); |
1704 | | |
1705 | | /* Remove the change from its containing list. */ |
1706 | 0 | dlist_delete(&change->node); |
1707 | |
|
1708 | 0 | ReorderBufferFreeChange(rb, change, true); |
1709 | 0 | } |
1710 | 0 | } |
1711 | | |
1712 | | /* |
1713 | | * Destroy the (relfilelocator, ctid) hashtable, so that we don't leak any |
1714 | | * memory. We could also keep the hash table and update it with new ctid |
1715 | | * values, but this seems simpler and good enough for now. |
1716 | | */ |
1717 | 0 | if (txn->tuplecid_hash != NULL) |
1718 | 0 | { |
1719 | 0 | hash_destroy(txn->tuplecid_hash); |
1720 | 0 | txn->tuplecid_hash = NULL; |
1721 | 0 | } |
1722 | | |
1723 | | /* If this txn is serialized then clean the disk space. */ |
1724 | 0 | if (rbtxn_is_serialized(txn)) |
1725 | 0 | { |
1726 | 0 | ReorderBufferRestoreCleanup(rb, txn); |
1727 | 0 | txn->txn_flags &= ~RBTXN_IS_SERIALIZED; |
1728 | | |
1729 | | /* |
1730 | | * We set this flag to indicate if the transaction is ever serialized. |
1731 | | * We need this to accurately update the stats as otherwise the same |
1732 | | * transaction can be counted as serialized multiple times. |
1733 | | */ |
1734 | 0 | txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR; |
1735 | 0 | } |
1736 | | |
1737 | | /* also reset the number of entries in the transaction */ |
1738 | 0 | txn->nentries_mem = 0; |
1739 | 0 | txn->nentries = 0; |
1740 | 0 | } |
1741 | | |
1742 | | /* |
1743 | | * Check the transaction status by CLOG lookup and discard all changes if |
1744 | | * the transaction is aborted. The transaction status is cached in |
1745 | | * txn->txn_flags so we can skip future changes and avoid CLOG lookups on the |
1746 | | * next call. |
1747 | | * |
1748 | | * Return true if the transaction is aborted, otherwise return false. |
1749 | | * |
1750 | | * When the 'debug_logical_replication_streaming' is set to "immediate", we |
1751 | | * don't check the transaction status, meaning the caller will always process |
1752 | | * this transaction. |
1753 | | */ |
1754 | | static bool |
1755 | | ReorderBufferCheckAndTruncateAbortedTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
1756 | 0 | { |
1757 | | /* Quick return for regression tests */ |
1758 | 0 | if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE)) |
1759 | 0 | return false; |
1760 | | |
1761 | | /* |
1762 | | * Quick return if the transaction status is already known. |
1763 | | */ |
1764 | | |
1765 | 0 | if (rbtxn_is_committed(txn)) |
1766 | 0 | return false; |
1767 | 0 | if (rbtxn_is_aborted(txn)) |
1768 | 0 | { |
1769 | | /* Already-aborted transactions should not have any changes */ |
1770 | 0 | Assert(txn->size == 0); |
1771 | |
|
1772 | 0 | return true; |
1773 | 0 | } |
1774 | | |
1775 | | /* Otherwise, check the transaction status using CLOG lookup */ |
1776 | | |
1777 | 0 | if (TransactionIdIsInProgress(txn->xid)) |
1778 | 0 | return false; |
1779 | | |
1780 | 0 | if (TransactionIdDidCommit(txn->xid)) |
1781 | 0 | { |
1782 | | /* |
1783 | | * Remember the transaction is committed so that we can skip CLOG |
1784 | | * check next time, avoiding the pressure on CLOG lookup. |
1785 | | */ |
1786 | 0 | Assert(!rbtxn_is_aborted(txn)); |
1787 | 0 | txn->txn_flags |= RBTXN_IS_COMMITTED; |
1788 | 0 | return false; |
1789 | 0 | } |
1790 | | |
1791 | | /* |
1792 | | * The transaction aborted. We discard both the changes collected so far |
1793 | | * and the toast reconstruction data. The full cleanup will happen as part |
1794 | | * of decoding ABORT record of this transaction. |
1795 | | */ |
1796 | 0 | ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); |
1797 | 0 | ReorderBufferToastReset(rb, txn); |
1798 | | |
1799 | | /* All changes should be discarded */ |
1800 | 0 | Assert(txn->size == 0); |
1801 | | |
1802 | | /* |
1803 | | * Mark the transaction as aborted so we can ignore future changes of this |
1804 | | * transaction. |
1805 | | */ |
1806 | 0 | Assert(!rbtxn_is_committed(txn)); |
1807 | 0 | txn->txn_flags |= RBTXN_IS_ABORTED; |
1808 | |
|
1809 | 0 | return true; |
1810 | 0 | } |
1811 | | |
1812 | | /* |
1813 | | * Build a hash with a (relfilelocator, ctid) -> (cmin, cmax) mapping for use by |
1814 | | * HeapTupleSatisfiesHistoricMVCC. |
1815 | | */ |
1816 | | static void |
1817 | | ReorderBufferBuildTupleCidHash(ReorderBuffer *rb, ReorderBufferTXN *txn) |
1818 | 0 | { |
1819 | 0 | dlist_iter iter; |
1820 | 0 | HASHCTL hash_ctl; |
1821 | |
|
1822 | 0 | if (!rbtxn_has_catalog_changes(txn) || dlist_is_empty(&txn->tuplecids)) |
1823 | 0 | return; |
1824 | | |
1825 | 0 | hash_ctl.keysize = sizeof(ReorderBufferTupleCidKey); |
1826 | 0 | hash_ctl.entrysize = sizeof(ReorderBufferTupleCidEnt); |
1827 | 0 | hash_ctl.hcxt = rb->context; |
1828 | | |
1829 | | /* |
1830 | | * create the hash with the exact number of to-be-stored tuplecids from |
1831 | | * the start |
1832 | | */ |
1833 | 0 | txn->tuplecid_hash = |
1834 | 0 | hash_create("ReorderBufferTupleCid", txn->ntuplecids, &hash_ctl, |
1835 | 0 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
1836 | |
|
1837 | 0 | dlist_foreach(iter, &txn->tuplecids) |
1838 | 0 | { |
1839 | 0 | ReorderBufferTupleCidKey key; |
1840 | 0 | ReorderBufferTupleCidEnt *ent; |
1841 | 0 | bool found; |
1842 | 0 | ReorderBufferChange *change; |
1843 | |
|
1844 | 0 | change = dlist_container(ReorderBufferChange, node, iter.cur); |
1845 | |
|
1846 | 0 | Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID); |
1847 | | |
1848 | | /* be careful about padding */ |
1849 | 0 | memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); |
1850 | |
|
1851 | 0 | key.rlocator = change->data.tuplecid.locator; |
1852 | |
|
1853 | 0 | ItemPointerCopy(&change->data.tuplecid.tid, |
1854 | 0 | &key.tid); |
1855 | |
|
1856 | 0 | ent = (ReorderBufferTupleCidEnt *) |
1857 | 0 | hash_search(txn->tuplecid_hash, &key, HASH_ENTER, &found); |
1858 | 0 | if (!found) |
1859 | 0 | { |
1860 | 0 | ent->cmin = change->data.tuplecid.cmin; |
1861 | 0 | ent->cmax = change->data.tuplecid.cmax; |
1862 | 0 | ent->combocid = change->data.tuplecid.combocid; |
1863 | 0 | } |
1864 | 0 | else |
1865 | 0 | { |
1866 | | /* |
1867 | | * Maybe we already saw this tuple before in this transaction, but |
1868 | | * if so it must have the same cmin. |
1869 | | */ |
1870 | 0 | Assert(ent->cmin == change->data.tuplecid.cmin); |
1871 | | |
1872 | | /* |
1873 | | * cmax may be initially invalid, but once set it can only grow, |
1874 | | * and never become invalid again. |
1875 | | */ |
1876 | 0 | Assert((ent->cmax == InvalidCommandId) || |
1877 | 0 | ((change->data.tuplecid.cmax != InvalidCommandId) && |
1878 | 0 | (change->data.tuplecid.cmax > ent->cmax))); |
1879 | 0 | ent->cmax = change->data.tuplecid.cmax; |
1880 | 0 | } |
1881 | 0 | } |
1882 | 0 | } |
1883 | | |
1884 | | /* |
1885 | | * Copy a provided snapshot so we can modify it privately. This is needed so |
1886 | | * that catalog modifying transactions can look into intermediate catalog |
1887 | | * states. |
1888 | | */ |
1889 | | static Snapshot |
1890 | | ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap, |
1891 | | ReorderBufferTXN *txn, CommandId cid) |
1892 | 0 | { |
1893 | 0 | Snapshot snap; |
1894 | 0 | dlist_iter iter; |
1895 | 0 | int i = 0; |
1896 | 0 | Size size; |
1897 | |
|
1898 | 0 | size = sizeof(SnapshotData) + |
1899 | 0 | sizeof(TransactionId) * orig_snap->xcnt + |
1900 | 0 | sizeof(TransactionId) * (txn->nsubtxns + 1); |
1901 | |
|
1902 | 0 | snap = MemoryContextAllocZero(rb->context, size); |
1903 | 0 | memcpy(snap, orig_snap, sizeof(SnapshotData)); |
1904 | |
|
1905 | 0 | snap->copied = true; |
1906 | 0 | snap->active_count = 1; /* mark as active so nobody frees it */ |
1907 | 0 | snap->regd_count = 0; |
1908 | 0 | snap->xip = (TransactionId *) (snap + 1); |
1909 | |
|
1910 | 0 | memcpy(snap->xip, orig_snap->xip, sizeof(TransactionId) * snap->xcnt); |
1911 | | |
1912 | | /* |
1913 | | * snap->subxip contains all txids that belong to our transaction which we |
1914 | | * need to check via cmin/cmax. That's why we store the toplevel |
1915 | | * transaction in there as well. |
1916 | | */ |
1917 | 0 | snap->subxip = snap->xip + snap->xcnt; |
1918 | 0 | snap->subxip[i++] = txn->xid; |
1919 | | |
1920 | | /* |
1921 | | * txn->nsubtxns isn't decreased when subtransactions abort, so count |
1922 | | * manually. Since it's an upper boundary it is safe to use it for the |
1923 | | * allocation above. |
1924 | | */ |
1925 | 0 | snap->subxcnt = 1; |
1926 | |
|
1927 | 0 | dlist_foreach(iter, &txn->subtxns) |
1928 | 0 | { |
1929 | 0 | ReorderBufferTXN *sub_txn; |
1930 | |
|
1931 | 0 | sub_txn = dlist_container(ReorderBufferTXN, node, iter.cur); |
1932 | 0 | snap->subxip[i++] = sub_txn->xid; |
1933 | 0 | snap->subxcnt++; |
1934 | 0 | } |
1935 | | |
1936 | | /* sort so we can bsearch() later */ |
1937 | 0 | qsort(snap->subxip, snap->subxcnt, sizeof(TransactionId), xidComparator); |
1938 | | |
1939 | | /* store the specified current CommandId */ |
1940 | 0 | snap->curcid = cid; |
1941 | |
|
1942 | 0 | return snap; |
1943 | 0 | } |
1944 | | |
1945 | | /* |
1946 | | * Free a previously ReorderBufferCopySnap'ed snapshot |
1947 | | */ |
1948 | | static void |
1949 | | ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap) |
1950 | 0 | { |
1951 | 0 | if (snap->copied) |
1952 | 0 | pfree(snap); |
1953 | 0 | else |
1954 | 0 | SnapBuildSnapDecRefcount(snap); |
1955 | 0 | } |
1956 | | |
1957 | | /* |
1958 | | * If the transaction was (partially) streamed, we need to prepare or commit |
1959 | | * it in a 'streamed' way. That is, we first stream the remaining part of the |
1960 | | * transaction, and then invoke stream_prepare or stream_commit message as per |
1961 | | * the case. |
1962 | | */ |
1963 | | static void |
1964 | | ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn) |
1965 | 0 | { |
1966 | | /* we should only call this for previously streamed transactions */ |
1967 | 0 | Assert(rbtxn_is_streamed(txn)); |
1968 | |
|
1969 | 0 | ReorderBufferStreamTXN(rb, txn); |
1970 | |
|
1971 | 0 | if (rbtxn_is_prepared(txn)) |
1972 | 0 | { |
1973 | | /* |
1974 | | * Note, we send stream prepare even if a concurrent abort is |
1975 | | * detected. See DecodePrepare for more information. |
1976 | | */ |
1977 | 0 | Assert(!rbtxn_sent_prepare(txn)); |
1978 | 0 | rb->stream_prepare(rb, txn, txn->final_lsn); |
1979 | 0 | txn->txn_flags |= RBTXN_SENT_PREPARE; |
1980 | | |
1981 | | /* |
1982 | | * This is a PREPARED transaction, part of a two-phase commit. The |
1983 | | * full cleanup will happen as part of the COMMIT PREPAREDs, so now |
1984 | | * just truncate txn by removing changes and tuplecids. |
1985 | | */ |
1986 | 0 | ReorderBufferTruncateTXN(rb, txn, true); |
1987 | | /* Reset the CheckXidAlive */ |
1988 | 0 | CheckXidAlive = InvalidTransactionId; |
1989 | 0 | } |
1990 | 0 | else |
1991 | 0 | { |
1992 | 0 | rb->stream_commit(rb, txn, txn->final_lsn); |
1993 | 0 | ReorderBufferCleanupTXN(rb, txn); |
1994 | 0 | } |
1995 | 0 | } |
1996 | | |
1997 | | /* |
1998 | | * Set xid to detect concurrent aborts. |
1999 | | * |
2000 | | * While streaming an in-progress transaction or decoding a prepared |
2001 | | * transaction there is a possibility that the (sub)transaction might get |
2002 | | * aborted concurrently. In such case if the (sub)transaction has catalog |
2003 | | * update then we might decode the tuple using wrong catalog version. For |
2004 | | * example, suppose there is one catalog tuple with (xmin: 500, xmax: 0). Now, |
2005 | | * the transaction 501 updates the catalog tuple and after that we will have |
2006 | | * two tuples (xmin: 500, xmax: 501) and (xmin: 501, xmax: 0). Now, if 501 is |
2007 | | * aborted and some other transaction say 502 updates the same catalog tuple |
2008 | | * then the first tuple will be changed to (xmin: 500, xmax: 502). So, the |
2009 | | * problem is that when we try to decode the tuple inserted/updated in 501 |
2010 | | * after the catalog update, we will see the catalog tuple with (xmin: 500, |
2011 | | * xmax: 502) as visible because it will consider that the tuple is deleted by |
2012 | | * xid 502 which is not visible to our snapshot. And when we will try to |
2013 | | * decode with that catalog tuple, it can lead to a wrong result or a crash. |
2014 | | * So, it is necessary to detect concurrent aborts to allow streaming of |
2015 | | * in-progress transactions or decoding of prepared transactions. |
2016 | | * |
2017 | | * For detecting the concurrent abort we set CheckXidAlive to the current |
2018 | | * (sub)transaction's xid for which this change belongs to. And, during |
2019 | | * catalog scan we can check the status of the xid and if it is aborted we will |
2020 | | * report a specific error so that we can stop streaming current transaction |
2021 | | * and discard the already streamed changes on such an error. We might have |
2022 | | * already streamed some of the changes for the aborted (sub)transaction, but |
2023 | | * that is fine because when we decode the abort we will stream abort message |
2024 | | * to truncate the changes in the subscriber. Similarly, for prepared |
2025 | | * transactions, we stop decoding if concurrent abort is detected and then |
2026 | | * rollback the changes when rollback prepared is encountered. See |
2027 | | * DecodePrepare. |
2028 | | */ |
2029 | | static inline void |
2030 | | SetupCheckXidLive(TransactionId xid) |
2031 | 0 | { |
2032 | | /* |
2033 | | * If the input transaction id is already set as a CheckXidAlive then |
2034 | | * nothing to do. |
2035 | | */ |
2036 | 0 | if (TransactionIdEquals(CheckXidAlive, xid)) |
2037 | 0 | return; |
2038 | | |
2039 | | /* |
2040 | | * setup CheckXidAlive if it's not committed yet. We don't check if the |
2041 | | * xid is aborted. That will happen during catalog access. |
2042 | | */ |
2043 | 0 | if (!TransactionIdDidCommit(xid)) |
2044 | 0 | CheckXidAlive = xid; |
2045 | 0 | else |
2046 | 0 | CheckXidAlive = InvalidTransactionId; |
2047 | 0 | } |
2048 | | |
2049 | | /* |
2050 | | * Helper function for ReorderBufferProcessTXN for applying change. |
2051 | | */ |
2052 | | static inline void |
2053 | | ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2054 | | Relation relation, ReorderBufferChange *change, |
2055 | | bool streaming) |
2056 | 0 | { |
2057 | 0 | if (streaming) |
2058 | 0 | rb->stream_change(rb, txn, relation, change); |
2059 | 0 | else |
2060 | 0 | rb->apply_change(rb, txn, relation, change); |
2061 | 0 | } |
2062 | | |
2063 | | /* |
2064 | | * Helper function for ReorderBufferProcessTXN for applying the truncate. |
2065 | | */ |
2066 | | static inline void |
2067 | | ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2068 | | int nrelations, Relation *relations, |
2069 | | ReorderBufferChange *change, bool streaming) |
2070 | 0 | { |
2071 | 0 | if (streaming) |
2072 | 0 | rb->stream_truncate(rb, txn, nrelations, relations, change); |
2073 | 0 | else |
2074 | 0 | rb->apply_truncate(rb, txn, nrelations, relations, change); |
2075 | 0 | } |
2076 | | |
2077 | | /* |
2078 | | * Helper function for ReorderBufferProcessTXN for applying the message. |
2079 | | */ |
2080 | | static inline void |
2081 | | ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2082 | | ReorderBufferChange *change, bool streaming) |
2083 | 0 | { |
2084 | 0 | if (streaming) |
2085 | 0 | rb->stream_message(rb, txn, change->lsn, true, |
2086 | 0 | change->data.msg.prefix, |
2087 | 0 | change->data.msg.message_size, |
2088 | 0 | change->data.msg.message); |
2089 | 0 | else |
2090 | 0 | rb->message(rb, txn, change->lsn, true, |
2091 | 0 | change->data.msg.prefix, |
2092 | 0 | change->data.msg.message_size, |
2093 | 0 | change->data.msg.message); |
2094 | 0 | } |
2095 | | |
2096 | | /* |
2097 | | * Function to store the command id and snapshot at the end of the current |
2098 | | * stream so that we can reuse the same while sending the next stream. |
2099 | | */ |
2100 | | static inline void |
2101 | | ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2102 | | Snapshot snapshot_now, CommandId command_id) |
2103 | 0 | { |
2104 | 0 | txn->command_id = command_id; |
2105 | | |
2106 | | /* Avoid copying if it's already copied. */ |
2107 | 0 | if (snapshot_now->copied) |
2108 | 0 | txn->snapshot_now = snapshot_now; |
2109 | 0 | else |
2110 | 0 | txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, |
2111 | 0 | txn, command_id); |
2112 | 0 | } |
2113 | | |
2114 | | /* |
2115 | | * Mark the given transaction as streamed if it's a top-level transaction |
2116 | | * or has changes. |
2117 | | */ |
2118 | | static void |
2119 | | ReorderBufferMaybeMarkTXNStreamed(ReorderBuffer *rb, ReorderBufferTXN *txn) |
2120 | 0 | { |
2121 | | /* |
2122 | | * The top-level transaction, is marked as streamed always, even if it |
2123 | | * does not contain any changes (that is, when all the changes are in |
2124 | | * subtransactions). |
2125 | | * |
2126 | | * For subtransactions, we only mark them as streamed when there are |
2127 | | * changes in them. |
2128 | | * |
2129 | | * We do it this way because of aborts - we don't want to send aborts for |
2130 | | * XIDs the downstream is not aware of. And of course, it always knows |
2131 | | * about the top-level xact (we send the XID in all messages), but we |
2132 | | * never stream XIDs of empty subxacts. |
2133 | | */ |
2134 | 0 | if (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)) |
2135 | 0 | txn->txn_flags |= RBTXN_IS_STREAMED; |
2136 | 0 | } |
2137 | | |
2138 | | /* |
2139 | | * Helper function for ReorderBufferProcessTXN to handle the concurrent |
2140 | | * abort of the streaming transaction. This resets the TXN such that it |
2141 | | * can be used to stream the remaining data of transaction being processed. |
2142 | | * This can happen when the subtransaction is aborted and we still want to |
2143 | | * continue processing the main or other subtransactions data. |
2144 | | */ |
2145 | | static void |
2146 | | ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2147 | | Snapshot snapshot_now, |
2148 | | CommandId command_id, |
2149 | | XLogRecPtr last_lsn, |
2150 | | ReorderBufferChange *specinsert) |
2151 | 0 | { |
2152 | | /* Discard the changes that we just streamed */ |
2153 | 0 | ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); |
2154 | | |
2155 | | /* Free all resources allocated for toast reconstruction */ |
2156 | 0 | ReorderBufferToastReset(rb, txn); |
2157 | | |
2158 | | /* Return the spec insert change if it is not NULL */ |
2159 | 0 | if (specinsert != NULL) |
2160 | 0 | { |
2161 | 0 | ReorderBufferFreeChange(rb, specinsert, true); |
2162 | 0 | specinsert = NULL; |
2163 | 0 | } |
2164 | | |
2165 | | /* |
2166 | | * For the streaming case, stop the stream and remember the command ID and |
2167 | | * snapshot for the streaming run. |
2168 | | */ |
2169 | 0 | if (rbtxn_is_streamed(txn)) |
2170 | 0 | { |
2171 | 0 | rb->stream_stop(rb, txn, last_lsn); |
2172 | 0 | ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); |
2173 | 0 | } |
2174 | | |
2175 | | /* All changes must be deallocated */ |
2176 | 0 | Assert(txn->size == 0); |
2177 | 0 | } |
2178 | | |
2179 | | /* |
2180 | | * Helper function for ReorderBufferReplay and ReorderBufferStreamTXN. |
2181 | | * |
2182 | | * Send data of a transaction (and its subtransactions) to the |
2183 | | * output plugin. We iterate over the top and subtransactions (using a k-way |
2184 | | * merge) and replay the changes in lsn order. |
2185 | | * |
2186 | | * If streaming is true then data will be sent using stream API. |
2187 | | * |
2188 | | * Note: "volatile" markers on some parameters are to avoid trouble with |
2189 | | * PG_TRY inside the function. |
2190 | | */ |
2191 | | static void |
2192 | | ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, |
2193 | | XLogRecPtr commit_lsn, |
2194 | | volatile Snapshot snapshot_now, |
2195 | | volatile CommandId command_id, |
2196 | | bool streaming) |
2197 | 0 | { |
2198 | 0 | bool using_subtxn; |
2199 | 0 | MemoryContext ccxt = CurrentMemoryContext; |
2200 | 0 | ReorderBufferIterTXNState *volatile iterstate = NULL; |
2201 | 0 | volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr; |
2202 | 0 | ReorderBufferChange *volatile specinsert = NULL; |
2203 | 0 | volatile bool stream_started = false; |
2204 | 0 | ReorderBufferTXN *volatile curtxn = NULL; |
2205 | | |
2206 | | /* build data to be able to lookup the CommandIds of catalog tuples */ |
2207 | 0 | ReorderBufferBuildTupleCidHash(rb, txn); |
2208 | | |
2209 | | /* setup the initial snapshot */ |
2210 | 0 | SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); |
2211 | | |
2212 | | /* |
2213 | | * Decoding needs access to syscaches et al., which in turn use |
2214 | | * heavyweight locks and such. Thus we need to have enough state around to |
2215 | | * keep track of those. The easiest way is to simply use a transaction |
2216 | | * internally. That also allows us to easily enforce that nothing writes |
2217 | | * to the database by checking for xid assignments. |
2218 | | * |
2219 | | * When we're called via the SQL SRF there's already a transaction |
2220 | | * started, so start an explicit subtransaction there. |
2221 | | */ |
2222 | 0 | using_subtxn = IsTransactionOrTransactionBlock(); |
2223 | |
|
2224 | 0 | PG_TRY(); |
2225 | 0 | { |
2226 | 0 | ReorderBufferChange *change; |
2227 | 0 | int changes_count = 0; /* used to accumulate the number of |
2228 | | * changes */ |
2229 | |
|
2230 | 0 | if (using_subtxn) |
2231 | 0 | BeginInternalSubTransaction(streaming ? "stream" : "replay"); |
2232 | 0 | else |
2233 | 0 | StartTransactionCommand(); |
2234 | | |
2235 | | /* |
2236 | | * We only need to send begin/begin-prepare for non-streamed |
2237 | | * transactions. |
2238 | | */ |
2239 | 0 | if (!streaming) |
2240 | 0 | { |
2241 | 0 | if (rbtxn_is_prepared(txn)) |
2242 | 0 | rb->begin_prepare(rb, txn); |
2243 | 0 | else |
2244 | 0 | rb->begin(rb, txn); |
2245 | 0 | } |
2246 | |
|
2247 | 0 | ReorderBufferIterTXNInit(rb, txn, &iterstate); |
2248 | 0 | while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL) |
2249 | 0 | { |
2250 | 0 | Relation relation = NULL; |
2251 | 0 | Oid reloid; |
2252 | |
|
2253 | 0 | CHECK_FOR_INTERRUPTS(); |
2254 | | |
2255 | | /* |
2256 | | * We can't call start stream callback before processing first |
2257 | | * change. |
2258 | | */ |
2259 | 0 | if (prev_lsn == InvalidXLogRecPtr) |
2260 | 0 | { |
2261 | 0 | if (streaming) |
2262 | 0 | { |
2263 | 0 | txn->origin_id = change->origin_id; |
2264 | 0 | rb->stream_start(rb, txn, change->lsn); |
2265 | 0 | stream_started = true; |
2266 | 0 | } |
2267 | 0 | } |
2268 | | |
2269 | | /* |
2270 | | * Enforce correct ordering of changes, merged from multiple |
2271 | | * subtransactions. The changes may have the same LSN due to |
2272 | | * MULTI_INSERT xlog records. |
2273 | | */ |
2274 | 0 | Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn); |
2275 | |
|
2276 | 0 | prev_lsn = change->lsn; |
2277 | | |
2278 | | /* |
2279 | | * Set the current xid to detect concurrent aborts. This is |
2280 | | * required for the cases when we decode the changes before the |
2281 | | * COMMIT record is processed. |
2282 | | */ |
2283 | 0 | if (streaming || rbtxn_is_prepared(change->txn)) |
2284 | 0 | { |
2285 | 0 | curtxn = change->txn; |
2286 | 0 | SetupCheckXidLive(curtxn->xid); |
2287 | 0 | } |
2288 | |
|
2289 | 0 | switch (change->action) |
2290 | 0 | { |
2291 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
2292 | | |
2293 | | /* |
2294 | | * Confirmation for speculative insertion arrived. Simply |
2295 | | * use as a normal record. It'll be cleaned up at the end |
2296 | | * of INSERT processing. |
2297 | | */ |
2298 | 0 | if (specinsert == NULL) |
2299 | 0 | elog(ERROR, "invalid ordering of speculative insertion changes"); |
2300 | 0 | Assert(specinsert->data.tp.oldtuple == NULL); |
2301 | 0 | change = specinsert; |
2302 | 0 | change->action = REORDER_BUFFER_CHANGE_INSERT; |
2303 | | |
2304 | | /* intentionally fall through */ |
2305 | 0 | case REORDER_BUFFER_CHANGE_INSERT: |
2306 | 0 | case REORDER_BUFFER_CHANGE_UPDATE: |
2307 | 0 | case REORDER_BUFFER_CHANGE_DELETE: |
2308 | 0 | Assert(snapshot_now); |
2309 | |
|
2310 | 0 | reloid = RelidByRelfilenumber(change->data.tp.rlocator.spcOid, |
2311 | 0 | change->data.tp.rlocator.relNumber); |
2312 | | |
2313 | | /* |
2314 | | * Mapped catalog tuple without data, emitted while |
2315 | | * catalog table was in the process of being rewritten. We |
2316 | | * can fail to look up the relfilenumber, because the |
2317 | | * relmapper has no "historic" view, in contrast to the |
2318 | | * normal catalog during decoding. Thus repeated rewrites |
2319 | | * can cause a lookup failure. That's OK because we do not |
2320 | | * decode catalog changes anyway. Normally such tuples |
2321 | | * would be skipped over below, but we can't identify |
2322 | | * whether the table should be logically logged without |
2323 | | * mapping the relfilenumber to the oid. |
2324 | | */ |
2325 | 0 | if (reloid == InvalidOid && |
2326 | 0 | change->data.tp.newtuple == NULL && |
2327 | 0 | change->data.tp.oldtuple == NULL) |
2328 | 0 | goto change_done; |
2329 | 0 | else if (reloid == InvalidOid) |
2330 | 0 | elog(ERROR, "could not map filenumber \"%s\" to relation OID", |
2331 | 0 | relpathperm(change->data.tp.rlocator, |
2332 | 0 | MAIN_FORKNUM).str); |
2333 | | |
2334 | 0 | relation = RelationIdGetRelation(reloid); |
2335 | |
|
2336 | 0 | if (!RelationIsValid(relation)) |
2337 | 0 | elog(ERROR, "could not open relation with OID %u (for filenumber \"%s\")", |
2338 | 0 | reloid, |
2339 | 0 | relpathperm(change->data.tp.rlocator, |
2340 | 0 | MAIN_FORKNUM).str); |
2341 | | |
2342 | 0 | if (!RelationIsLogicallyLogged(relation)) |
2343 | 0 | goto change_done; |
2344 | | |
2345 | | /* |
2346 | | * Ignore temporary heaps created during DDL unless the |
2347 | | * plugin has asked for them. |
2348 | | */ |
2349 | 0 | if (relation->rd_rel->relrewrite && !rb->output_rewrites) |
2350 | 0 | goto change_done; |
2351 | | |
2352 | | /* |
2353 | | * For now ignore sequence changes entirely. Most of the |
2354 | | * time they don't log changes using records we |
2355 | | * understand, so it doesn't make sense to handle the few |
2356 | | * cases we do. |
2357 | | */ |
2358 | 0 | if (relation->rd_rel->relkind == RELKIND_SEQUENCE) |
2359 | 0 | goto change_done; |
2360 | | |
2361 | | /* user-triggered change */ |
2362 | 0 | if (!IsToastRelation(relation)) |
2363 | 0 | { |
2364 | 0 | ReorderBufferToastReplace(rb, txn, relation, change); |
2365 | 0 | ReorderBufferApplyChange(rb, txn, relation, change, |
2366 | 0 | streaming); |
2367 | | |
2368 | | /* |
2369 | | * Only clear reassembled toast chunks if we're sure |
2370 | | * they're not required anymore. The creator of the |
2371 | | * tuple tells us. |
2372 | | */ |
2373 | 0 | if (change->data.tp.clear_toast_afterwards) |
2374 | 0 | ReorderBufferToastReset(rb, txn); |
2375 | 0 | } |
2376 | | /* we're not interested in toast deletions */ |
2377 | 0 | else if (change->action == REORDER_BUFFER_CHANGE_INSERT) |
2378 | 0 | { |
2379 | | /* |
2380 | | * Need to reassemble the full toasted Datum in |
2381 | | * memory, to ensure the chunks don't get reused till |
2382 | | * we're done remove it from the list of this |
2383 | | * transaction's changes. Otherwise it will get |
2384 | | * freed/reused while restoring spooled data from |
2385 | | * disk. |
2386 | | */ |
2387 | 0 | Assert(change->data.tp.newtuple != NULL); |
2388 | |
|
2389 | 0 | dlist_delete(&change->node); |
2390 | 0 | ReorderBufferToastAppendChunk(rb, txn, relation, |
2391 | 0 | change); |
2392 | 0 | } |
2393 | |
|
2394 | 0 | change_done: |
2395 | | |
2396 | | /* |
2397 | | * If speculative insertion was confirmed, the record |
2398 | | * isn't needed anymore. |
2399 | | */ |
2400 | 0 | if (specinsert != NULL) |
2401 | 0 | { |
2402 | 0 | ReorderBufferFreeChange(rb, specinsert, true); |
2403 | 0 | specinsert = NULL; |
2404 | 0 | } |
2405 | |
|
2406 | 0 | if (RelationIsValid(relation)) |
2407 | 0 | { |
2408 | 0 | RelationClose(relation); |
2409 | 0 | relation = NULL; |
2410 | 0 | } |
2411 | 0 | break; |
2412 | | |
2413 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
2414 | | |
2415 | | /* |
2416 | | * Speculative insertions are dealt with by delaying the |
2417 | | * processing of the insert until the confirmation record |
2418 | | * arrives. For that we simply unlink the record from the |
2419 | | * chain, so it does not get freed/reused while restoring |
2420 | | * spooled data from disk. |
2421 | | * |
2422 | | * This is safe in the face of concurrent catalog changes |
2423 | | * because the relevant relation can't be changed between |
2424 | | * speculative insertion and confirmation due to |
2425 | | * CheckTableNotInUse() and locking. |
2426 | | */ |
2427 | | |
2428 | | /* clear out a pending (and thus failed) speculation */ |
2429 | 0 | if (specinsert != NULL) |
2430 | 0 | { |
2431 | 0 | ReorderBufferFreeChange(rb, specinsert, true); |
2432 | 0 | specinsert = NULL; |
2433 | 0 | } |
2434 | | |
2435 | | /* and memorize the pending insertion */ |
2436 | 0 | dlist_delete(&change->node); |
2437 | 0 | specinsert = change; |
2438 | 0 | break; |
2439 | | |
2440 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: |
2441 | | |
2442 | | /* |
2443 | | * Abort for speculative insertion arrived. So cleanup the |
2444 | | * specinsert tuple and toast hash. |
2445 | | * |
2446 | | * Note that we get the spec abort change for each toast |
2447 | | * entry but we need to perform the cleanup only the first |
2448 | | * time we get it for the main table. |
2449 | | */ |
2450 | 0 | if (specinsert != NULL) |
2451 | 0 | { |
2452 | | /* |
2453 | | * We must clean the toast hash before processing a |
2454 | | * completely new tuple to avoid confusion about the |
2455 | | * previous tuple's toast chunks. |
2456 | | */ |
2457 | 0 | Assert(change->data.tp.clear_toast_afterwards); |
2458 | 0 | ReorderBufferToastReset(rb, txn); |
2459 | | |
2460 | | /* We don't need this record anymore. */ |
2461 | 0 | ReorderBufferFreeChange(rb, specinsert, true); |
2462 | 0 | specinsert = NULL; |
2463 | 0 | } |
2464 | 0 | break; |
2465 | | |
2466 | 0 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
2467 | 0 | { |
2468 | 0 | int i; |
2469 | 0 | int nrelids = change->data.truncate.nrelids; |
2470 | 0 | int nrelations = 0; |
2471 | 0 | Relation *relations; |
2472 | |
|
2473 | 0 | relations = palloc0(nrelids * sizeof(Relation)); |
2474 | 0 | for (i = 0; i < nrelids; i++) |
2475 | 0 | { |
2476 | 0 | Oid relid = change->data.truncate.relids[i]; |
2477 | 0 | Relation rel; |
2478 | |
|
2479 | 0 | rel = RelationIdGetRelation(relid); |
2480 | |
|
2481 | 0 | if (!RelationIsValid(rel)) |
2482 | 0 | elog(ERROR, "could not open relation with OID %u", relid); |
2483 | | |
2484 | 0 | if (!RelationIsLogicallyLogged(rel)) |
2485 | 0 | continue; |
2486 | | |
2487 | 0 | relations[nrelations++] = rel; |
2488 | 0 | } |
2489 | | |
2490 | | /* Apply the truncate. */ |
2491 | 0 | ReorderBufferApplyTruncate(rb, txn, nrelations, |
2492 | 0 | relations, change, |
2493 | 0 | streaming); |
2494 | |
|
2495 | 0 | for (i = 0; i < nrelations; i++) |
2496 | 0 | RelationClose(relations[i]); |
2497 | |
|
2498 | 0 | break; |
2499 | 0 | } |
2500 | | |
2501 | 0 | case REORDER_BUFFER_CHANGE_MESSAGE: |
2502 | 0 | ReorderBufferApplyMessage(rb, txn, change, streaming); |
2503 | 0 | break; |
2504 | | |
2505 | 0 | case REORDER_BUFFER_CHANGE_INVALIDATION: |
2506 | | /* Execute the invalidation messages locally */ |
2507 | 0 | ReorderBufferExecuteInvalidations(change->data.inval.ninvalidations, |
2508 | 0 | change->data.inval.invalidations); |
2509 | 0 | break; |
2510 | | |
2511 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
2512 | | /* get rid of the old */ |
2513 | 0 | TeardownHistoricSnapshot(false); |
2514 | |
|
2515 | 0 | if (snapshot_now->copied) |
2516 | 0 | { |
2517 | 0 | ReorderBufferFreeSnap(rb, snapshot_now); |
2518 | 0 | snapshot_now = |
2519 | 0 | ReorderBufferCopySnap(rb, change->data.snapshot, |
2520 | 0 | txn, command_id); |
2521 | 0 | } |
2522 | | |
2523 | | /* |
2524 | | * Restored from disk, need to be careful not to double |
2525 | | * free. We could introduce refcounting for that, but for |
2526 | | * now this seems infrequent enough not to care. |
2527 | | */ |
2528 | 0 | else if (change->data.snapshot->copied) |
2529 | 0 | { |
2530 | 0 | snapshot_now = |
2531 | 0 | ReorderBufferCopySnap(rb, change->data.snapshot, |
2532 | 0 | txn, command_id); |
2533 | 0 | } |
2534 | 0 | else |
2535 | 0 | { |
2536 | 0 | snapshot_now = change->data.snapshot; |
2537 | 0 | } |
2538 | | |
2539 | | /* and continue with the new one */ |
2540 | 0 | SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); |
2541 | 0 | break; |
2542 | | |
2543 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
2544 | 0 | Assert(change->data.command_id != InvalidCommandId); |
2545 | |
|
2546 | 0 | if (command_id < change->data.command_id) |
2547 | 0 | { |
2548 | 0 | command_id = change->data.command_id; |
2549 | |
|
2550 | 0 | if (!snapshot_now->copied) |
2551 | 0 | { |
2552 | | /* we don't use the global one anymore */ |
2553 | 0 | snapshot_now = ReorderBufferCopySnap(rb, snapshot_now, |
2554 | 0 | txn, command_id); |
2555 | 0 | } |
2556 | |
|
2557 | 0 | snapshot_now->curcid = command_id; |
2558 | |
|
2559 | 0 | TeardownHistoricSnapshot(false); |
2560 | 0 | SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash); |
2561 | 0 | } |
2562 | |
|
2563 | 0 | break; |
2564 | | |
2565 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
2566 | 0 | elog(ERROR, "tuplecid value in changequeue"); |
2567 | 0 | break; |
2568 | 0 | } |
2569 | | |
2570 | | /* |
2571 | | * It is possible that the data is not sent to downstream for a |
2572 | | * long time either because the output plugin filtered it or there |
2573 | | * is a DDL that generates a lot of data that is not processed by |
2574 | | * the plugin. So, in such cases, the downstream can timeout. To |
2575 | | * avoid that we try to send a keepalive message if required. |
2576 | | * Trying to send a keepalive message after every change has some |
2577 | | * overhead, but testing showed there is no noticeable overhead if |
2578 | | * we do it after every ~100 changes. |
2579 | | */ |
2580 | 0 | #define CHANGES_THRESHOLD 100 |
2581 | | |
2582 | 0 | if (++changes_count >= CHANGES_THRESHOLD) |
2583 | 0 | { |
2584 | 0 | rb->update_progress_txn(rb, txn, change->lsn); |
2585 | 0 | changes_count = 0; |
2586 | 0 | } |
2587 | 0 | } |
2588 | | |
2589 | | /* speculative insertion record must be freed by now */ |
2590 | 0 | Assert(!specinsert); |
2591 | | |
2592 | | /* clean up the iterator */ |
2593 | 0 | ReorderBufferIterTXNFinish(rb, iterstate); |
2594 | 0 | iterstate = NULL; |
2595 | | |
2596 | | /* |
2597 | | * Update total transaction count and total bytes processed by the |
2598 | | * transaction and its subtransactions. Ensure to not count the |
2599 | | * streamed transaction multiple times. |
2600 | | * |
2601 | | * Note that the statistics computation has to be done after |
2602 | | * ReorderBufferIterTXNFinish as it releases the serialized change |
2603 | | * which we have already accounted in ReorderBufferIterTXNNext. |
2604 | | */ |
2605 | 0 | if (!rbtxn_is_streamed(txn)) |
2606 | 0 | rb->totalTxns++; |
2607 | |
|
2608 | 0 | rb->totalBytes += txn->total_size; |
2609 | | |
2610 | | /* |
2611 | | * Done with current changes, send the last message for this set of |
2612 | | * changes depending upon streaming mode. |
2613 | | */ |
2614 | 0 | if (streaming) |
2615 | 0 | { |
2616 | 0 | if (stream_started) |
2617 | 0 | { |
2618 | 0 | rb->stream_stop(rb, txn, prev_lsn); |
2619 | 0 | stream_started = false; |
2620 | 0 | } |
2621 | 0 | } |
2622 | 0 | else |
2623 | 0 | { |
2624 | | /* |
2625 | | * Call either PREPARE (for two-phase transactions) or COMMIT (for |
2626 | | * regular ones). |
2627 | | */ |
2628 | 0 | if (rbtxn_is_prepared(txn)) |
2629 | 0 | { |
2630 | 0 | Assert(!rbtxn_sent_prepare(txn)); |
2631 | 0 | rb->prepare(rb, txn, commit_lsn); |
2632 | 0 | txn->txn_flags |= RBTXN_SENT_PREPARE; |
2633 | 0 | } |
2634 | 0 | else |
2635 | 0 | rb->commit(rb, txn, commit_lsn); |
2636 | 0 | } |
2637 | | |
2638 | | /* this is just a sanity check against bad output plugin behaviour */ |
2639 | 0 | if (GetCurrentTransactionIdIfAny() != InvalidTransactionId) |
2640 | 0 | elog(ERROR, "output plugin used XID %u", |
2641 | 0 | GetCurrentTransactionId()); |
2642 | | |
2643 | | /* |
2644 | | * Remember the command ID and snapshot for the next set of changes in |
2645 | | * streaming mode. |
2646 | | */ |
2647 | 0 | if (streaming) |
2648 | 0 | ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id); |
2649 | 0 | else if (snapshot_now->copied) |
2650 | 0 | ReorderBufferFreeSnap(rb, snapshot_now); |
2651 | | |
2652 | | /* cleanup */ |
2653 | 0 | TeardownHistoricSnapshot(false); |
2654 | | |
2655 | | /* |
2656 | | * Aborting the current (sub-)transaction as a whole has the right |
2657 | | * semantics. We want all locks acquired in here to be released, not |
2658 | | * reassigned to the parent and we do not want any database access |
2659 | | * have persistent effects. |
2660 | | */ |
2661 | 0 | AbortCurrentTransaction(); |
2662 | | |
2663 | | /* make sure there's no cache pollution */ |
2664 | 0 | ReorderBufferExecuteInvalidations(txn->ninvalidations, txn->invalidations); |
2665 | |
|
2666 | 0 | if (using_subtxn) |
2667 | 0 | RollbackAndReleaseCurrentSubTransaction(); |
2668 | | |
2669 | | /* |
2670 | | * We are here due to one of the four reasons: 1. Decoding an |
2671 | | * in-progress txn. 2. Decoding a prepared txn. 3. Decoding of a |
2672 | | * prepared txn that was (partially) streamed. 4. Decoding a committed |
2673 | | * txn. |
2674 | | * |
2675 | | * For 1, we allow truncation of txn data by removing the changes |
2676 | | * already streamed but still keeping other things like invalidations, |
2677 | | * snapshot, and tuplecids. For 2 and 3, we indicate |
2678 | | * ReorderBufferTruncateTXN to do more elaborate truncation of txn |
2679 | | * data as the entire transaction has been decoded except for commit. |
2680 | | * For 4, as the entire txn has been decoded, we can fully clean up |
2681 | | * the TXN reorder buffer. |
2682 | | */ |
2683 | 0 | if (streaming || rbtxn_is_prepared(txn)) |
2684 | 0 | { |
2685 | 0 | if (streaming) |
2686 | 0 | ReorderBufferMaybeMarkTXNStreamed(rb, txn); |
2687 | |
|
2688 | 0 | ReorderBufferTruncateTXN(rb, txn, rbtxn_is_prepared(txn)); |
2689 | | /* Reset the CheckXidAlive */ |
2690 | 0 | CheckXidAlive = InvalidTransactionId; |
2691 | 0 | } |
2692 | 0 | else |
2693 | 0 | ReorderBufferCleanupTXN(rb, txn); |
2694 | 0 | } |
2695 | 0 | PG_CATCH(); |
2696 | 0 | { |
2697 | 0 | MemoryContext ecxt = MemoryContextSwitchTo(ccxt); |
2698 | 0 | ErrorData *errdata = CopyErrorData(); |
2699 | | |
2700 | | /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */ |
2701 | 0 | if (iterstate) |
2702 | 0 | ReorderBufferIterTXNFinish(rb, iterstate); |
2703 | |
|
2704 | 0 | TeardownHistoricSnapshot(true); |
2705 | | |
2706 | | /* |
2707 | | * Force cache invalidation to happen outside of a valid transaction |
2708 | | * to prevent catalog access as we just caught an error. |
2709 | | */ |
2710 | 0 | AbortCurrentTransaction(); |
2711 | | |
2712 | | /* make sure there's no cache pollution */ |
2713 | 0 | ReorderBufferExecuteInvalidations(txn->ninvalidations, |
2714 | 0 | txn->invalidations); |
2715 | |
|
2716 | 0 | if (using_subtxn) |
2717 | 0 | RollbackAndReleaseCurrentSubTransaction(); |
2718 | | |
2719 | | /* |
2720 | | * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent |
2721 | | * abort of the (sub)transaction we are streaming or preparing. We |
2722 | | * need to do the cleanup and return gracefully on this error, see |
2723 | | * SetupCheckXidLive. |
2724 | | * |
2725 | | * This error code can be thrown by one of the callbacks we call |
2726 | | * during decoding so we need to ensure that we return gracefully only |
2727 | | * when we are sending the data in streaming mode and the streaming is |
2728 | | * not finished yet or when we are sending the data out on a PREPARE |
2729 | | * during a two-phase commit. |
2730 | | */ |
2731 | 0 | if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK && |
2732 | 0 | (stream_started || rbtxn_is_prepared(txn))) |
2733 | 0 | { |
2734 | | /* curtxn must be set for streaming or prepared transactions */ |
2735 | 0 | Assert(curtxn); |
2736 | | |
2737 | | /* Cleanup the temporary error state. */ |
2738 | 0 | FlushErrorState(); |
2739 | 0 | FreeErrorData(errdata); |
2740 | 0 | errdata = NULL; |
2741 | | |
2742 | | /* Remember the transaction is aborted. */ |
2743 | 0 | Assert(!rbtxn_is_committed(curtxn)); |
2744 | 0 | curtxn->txn_flags |= RBTXN_IS_ABORTED; |
2745 | | |
2746 | | /* Mark the transaction is streamed if appropriate */ |
2747 | 0 | if (stream_started) |
2748 | 0 | ReorderBufferMaybeMarkTXNStreamed(rb, txn); |
2749 | | |
2750 | | /* Reset the TXN so that it is allowed to stream remaining data. */ |
2751 | 0 | ReorderBufferResetTXN(rb, txn, snapshot_now, |
2752 | 0 | command_id, prev_lsn, |
2753 | 0 | specinsert); |
2754 | 0 | } |
2755 | 0 | else |
2756 | 0 | { |
2757 | 0 | ReorderBufferCleanupTXN(rb, txn); |
2758 | 0 | MemoryContextSwitchTo(ecxt); |
2759 | 0 | PG_RE_THROW(); |
2760 | 0 | } |
2761 | 0 | } |
2762 | 0 | PG_END_TRY(); |
2763 | 0 | } |
2764 | | |
2765 | | /* |
2766 | | * Perform the replay of a transaction and its non-aborted subtransactions. |
2767 | | * |
2768 | | * Subtransactions previously have to be processed by |
2769 | | * ReorderBufferCommitChild(), even if previously assigned to the toplevel |
2770 | | * transaction with ReorderBufferAssignChild. |
2771 | | * |
2772 | | * This interface is called once a prepare or toplevel commit is read for both |
2773 | | * streamed as well as non-streamed transactions. |
2774 | | */ |
2775 | | static void |
2776 | | ReorderBufferReplay(ReorderBufferTXN *txn, |
2777 | | ReorderBuffer *rb, TransactionId xid, |
2778 | | XLogRecPtr commit_lsn, XLogRecPtr end_lsn, |
2779 | | TimestampTz commit_time, |
2780 | | RepOriginId origin_id, XLogRecPtr origin_lsn) |
2781 | 0 | { |
2782 | 0 | Snapshot snapshot_now; |
2783 | 0 | CommandId command_id = FirstCommandId; |
2784 | |
|
2785 | 0 | txn->final_lsn = commit_lsn; |
2786 | 0 | txn->end_lsn = end_lsn; |
2787 | 0 | txn->xact_time.commit_time = commit_time; |
2788 | 0 | txn->origin_id = origin_id; |
2789 | 0 | txn->origin_lsn = origin_lsn; |
2790 | | |
2791 | | /* |
2792 | | * If the transaction was (partially) streamed, we need to commit it in a |
2793 | | * 'streamed' way. That is, we first stream the remaining part of the |
2794 | | * transaction, and then invoke stream_commit message. |
2795 | | * |
2796 | | * Called after everything (origin ID, LSN, ...) is stored in the |
2797 | | * transaction to avoid passing that information directly. |
2798 | | */ |
2799 | 0 | if (rbtxn_is_streamed(txn)) |
2800 | 0 | { |
2801 | 0 | ReorderBufferStreamCommit(rb, txn); |
2802 | 0 | return; |
2803 | 0 | } |
2804 | | |
2805 | | /* |
2806 | | * If this transaction has no snapshot, it didn't make any changes to the |
2807 | | * database, so there's nothing to decode. Note that |
2808 | | * ReorderBufferCommitChild will have transferred any snapshots from |
2809 | | * subtransactions if there were any. |
2810 | | */ |
2811 | 0 | if (txn->base_snapshot == NULL) |
2812 | 0 | { |
2813 | 0 | Assert(txn->ninvalidations == 0); |
2814 | | |
2815 | | /* |
2816 | | * Removing this txn before a commit might result in the computation |
2817 | | * of an incorrect restart_lsn. See SnapBuildProcessRunningXacts. |
2818 | | */ |
2819 | 0 | if (!rbtxn_is_prepared(txn)) |
2820 | 0 | ReorderBufferCleanupTXN(rb, txn); |
2821 | 0 | return; |
2822 | 0 | } |
2823 | | |
2824 | 0 | snapshot_now = txn->base_snapshot; |
2825 | | |
2826 | | /* Process and send the changes to output plugin. */ |
2827 | 0 | ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now, |
2828 | 0 | command_id, false); |
2829 | 0 | } |
2830 | | |
2831 | | /* |
2832 | | * Commit a transaction. |
2833 | | * |
2834 | | * See comments for ReorderBufferReplay(). |
2835 | | */ |
2836 | | void |
2837 | | ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid, |
2838 | | XLogRecPtr commit_lsn, XLogRecPtr end_lsn, |
2839 | | TimestampTz commit_time, |
2840 | | RepOriginId origin_id, XLogRecPtr origin_lsn) |
2841 | 0 | { |
2842 | 0 | ReorderBufferTXN *txn; |
2843 | |
|
2844 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
2845 | 0 | false); |
2846 | | |
2847 | | /* unknown transaction, nothing to replay */ |
2848 | 0 | if (txn == NULL) |
2849 | 0 | return; |
2850 | | |
2851 | 0 | ReorderBufferReplay(txn, rb, xid, commit_lsn, end_lsn, commit_time, |
2852 | 0 | origin_id, origin_lsn); |
2853 | 0 | } |
2854 | | |
2855 | | /* |
2856 | | * Record the prepare information for a transaction. Also, mark the transaction |
2857 | | * as a prepared transaction. |
2858 | | */ |
2859 | | bool |
2860 | | ReorderBufferRememberPrepareInfo(ReorderBuffer *rb, TransactionId xid, |
2861 | | XLogRecPtr prepare_lsn, XLogRecPtr end_lsn, |
2862 | | TimestampTz prepare_time, |
2863 | | RepOriginId origin_id, XLogRecPtr origin_lsn) |
2864 | 0 | { |
2865 | 0 | ReorderBufferTXN *txn; |
2866 | |
|
2867 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); |
2868 | | |
2869 | | /* unknown transaction, nothing to do */ |
2870 | 0 | if (txn == NULL) |
2871 | 0 | return false; |
2872 | | |
2873 | | /* |
2874 | | * Remember the prepare information to be later used by commit prepared in |
2875 | | * case we skip doing prepare. |
2876 | | */ |
2877 | 0 | txn->final_lsn = prepare_lsn; |
2878 | 0 | txn->end_lsn = end_lsn; |
2879 | 0 | txn->xact_time.prepare_time = prepare_time; |
2880 | 0 | txn->origin_id = origin_id; |
2881 | 0 | txn->origin_lsn = origin_lsn; |
2882 | | |
2883 | | /* Mark this transaction as a prepared transaction */ |
2884 | 0 | Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == 0); |
2885 | 0 | txn->txn_flags |= RBTXN_IS_PREPARED; |
2886 | |
|
2887 | 0 | return true; |
2888 | 0 | } |
2889 | | |
2890 | | /* Remember that we have skipped prepare */ |
2891 | | void |
2892 | | ReorderBufferSkipPrepare(ReorderBuffer *rb, TransactionId xid) |
2893 | 0 | { |
2894 | 0 | ReorderBufferTXN *txn; |
2895 | |
|
2896 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, false); |
2897 | | |
2898 | | /* unknown transaction, nothing to do */ |
2899 | 0 | if (txn == NULL) |
2900 | 0 | return; |
2901 | | |
2902 | | /* txn must have been marked as a prepared transaction */ |
2903 | 0 | Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED); |
2904 | 0 | txn->txn_flags |= RBTXN_SKIPPED_PREPARE; |
2905 | 0 | } |
2906 | | |
2907 | | /* |
2908 | | * Prepare a two-phase transaction. |
2909 | | * |
2910 | | * See comments for ReorderBufferReplay(). |
2911 | | */ |
2912 | | void |
2913 | | ReorderBufferPrepare(ReorderBuffer *rb, TransactionId xid, |
2914 | | char *gid) |
2915 | 0 | { |
2916 | 0 | ReorderBufferTXN *txn; |
2917 | |
|
2918 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
2919 | 0 | false); |
2920 | | |
2921 | | /* unknown transaction, nothing to replay */ |
2922 | 0 | if (txn == NULL) |
2923 | 0 | return; |
2924 | | |
2925 | | /* |
2926 | | * txn must have been marked as a prepared transaction and must have |
2927 | | * neither been skipped nor sent a prepare. Also, the prepare info must |
2928 | | * have been updated in it by now. |
2929 | | */ |
2930 | 0 | Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == RBTXN_IS_PREPARED); |
2931 | 0 | Assert(txn->final_lsn != InvalidXLogRecPtr); |
2932 | |
|
2933 | 0 | txn->gid = pstrdup(gid); |
2934 | |
|
2935 | 0 | ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, |
2936 | 0 | txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); |
2937 | | |
2938 | | /* |
2939 | | * Send a prepare if not already done so. This might occur if we have |
2940 | | * detected a concurrent abort while replaying the non-streaming |
2941 | | * transaction. |
2942 | | */ |
2943 | 0 | if (!rbtxn_sent_prepare(txn)) |
2944 | 0 | { |
2945 | 0 | rb->prepare(rb, txn, txn->final_lsn); |
2946 | 0 | txn->txn_flags |= RBTXN_SENT_PREPARE; |
2947 | 0 | } |
2948 | 0 | } |
2949 | | |
2950 | | /* |
2951 | | * This is used to handle COMMIT/ROLLBACK PREPARED. |
2952 | | */ |
2953 | | void |
2954 | | ReorderBufferFinishPrepared(ReorderBuffer *rb, TransactionId xid, |
2955 | | XLogRecPtr commit_lsn, XLogRecPtr end_lsn, |
2956 | | XLogRecPtr two_phase_at, |
2957 | | TimestampTz commit_time, RepOriginId origin_id, |
2958 | | XLogRecPtr origin_lsn, char *gid, bool is_commit) |
2959 | 0 | { |
2960 | 0 | ReorderBufferTXN *txn; |
2961 | 0 | XLogRecPtr prepare_end_lsn; |
2962 | 0 | TimestampTz prepare_time; |
2963 | |
|
2964 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, commit_lsn, false); |
2965 | | |
2966 | | /* unknown transaction, nothing to do */ |
2967 | 0 | if (txn == NULL) |
2968 | 0 | return; |
2969 | | |
2970 | | /* |
2971 | | * By this time the txn has the prepare record information, remember it to |
2972 | | * be later used for rollback. |
2973 | | */ |
2974 | 0 | prepare_end_lsn = txn->end_lsn; |
2975 | 0 | prepare_time = txn->xact_time.prepare_time; |
2976 | | |
2977 | | /* add the gid in the txn */ |
2978 | 0 | txn->gid = pstrdup(gid); |
2979 | | |
2980 | | /* |
2981 | | * It is possible that this transaction is not decoded at prepare time |
2982 | | * either because by that time we didn't have a consistent snapshot, or |
2983 | | * two_phase was not enabled, or it was decoded earlier but we have |
2984 | | * restarted. We only need to send the prepare if it was not decoded |
2985 | | * earlier. We don't need to decode the xact for aborts if it is not done |
2986 | | * already. |
2987 | | */ |
2988 | 0 | if ((txn->final_lsn < two_phase_at) && is_commit) |
2989 | 0 | { |
2990 | | /* |
2991 | | * txn must have been marked as a prepared transaction and skipped but |
2992 | | * not sent a prepare. Also, the prepare info must have been updated |
2993 | | * in txn even if we skip prepare. |
2994 | | */ |
2995 | 0 | Assert((txn->txn_flags & RBTXN_PREPARE_STATUS_MASK) == |
2996 | 0 | (RBTXN_IS_PREPARED | RBTXN_SKIPPED_PREPARE)); |
2997 | 0 | Assert(txn->final_lsn != InvalidXLogRecPtr); |
2998 | | |
2999 | | /* |
3000 | | * By this time the txn has the prepare record information and it is |
3001 | | * important to use that so that downstream gets the accurate |
3002 | | * information. If instead, we have passed commit information here |
3003 | | * then downstream can behave as it has already replayed commit |
3004 | | * prepared after the restart. |
3005 | | */ |
3006 | 0 | ReorderBufferReplay(txn, rb, xid, txn->final_lsn, txn->end_lsn, |
3007 | 0 | txn->xact_time.prepare_time, txn->origin_id, txn->origin_lsn); |
3008 | 0 | } |
3009 | |
|
3010 | 0 | txn->final_lsn = commit_lsn; |
3011 | 0 | txn->end_lsn = end_lsn; |
3012 | 0 | txn->xact_time.commit_time = commit_time; |
3013 | 0 | txn->origin_id = origin_id; |
3014 | 0 | txn->origin_lsn = origin_lsn; |
3015 | |
|
3016 | 0 | if (is_commit) |
3017 | 0 | rb->commit_prepared(rb, txn, commit_lsn); |
3018 | 0 | else |
3019 | 0 | rb->rollback_prepared(rb, txn, prepare_end_lsn, prepare_time); |
3020 | | |
3021 | | /* cleanup: make sure there's no cache pollution */ |
3022 | 0 | ReorderBufferExecuteInvalidations(txn->ninvalidations, |
3023 | 0 | txn->invalidations); |
3024 | 0 | ReorderBufferCleanupTXN(rb, txn); |
3025 | 0 | } |
3026 | | |
3027 | | /* |
3028 | | * Abort a transaction that possibly has previous changes. Needs to be first |
3029 | | * called for subtransactions and then for the toplevel xid. |
3030 | | * |
3031 | | * NB: Transactions handled here have to have actively aborted (i.e. have |
3032 | | * produced an abort record). Implicitly aborted transactions are handled via |
3033 | | * ReorderBufferAbortOld(); transactions we're just not interested in, but |
3034 | | * which have committed are handled in ReorderBufferForget(). |
3035 | | * |
3036 | | * This function purges this transaction and its contents from memory and |
3037 | | * disk. |
3038 | | */ |
3039 | | void |
3040 | | ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn, |
3041 | | TimestampTz abort_time) |
3042 | 0 | { |
3043 | 0 | ReorderBufferTXN *txn; |
3044 | |
|
3045 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
3046 | 0 | false); |
3047 | | |
3048 | | /* unknown, nothing to remove */ |
3049 | 0 | if (txn == NULL) |
3050 | 0 | return; |
3051 | | |
3052 | 0 | txn->xact_time.abort_time = abort_time; |
3053 | | |
3054 | | /* For streamed transactions notify the remote node about the abort. */ |
3055 | 0 | if (rbtxn_is_streamed(txn)) |
3056 | 0 | { |
3057 | 0 | rb->stream_abort(rb, txn, lsn); |
3058 | | |
3059 | | /* |
3060 | | * We might have decoded changes for this transaction that could load |
3061 | | * the cache as per the current transaction's view (consider DDL's |
3062 | | * happened in this transaction). We don't want the decoding of future |
3063 | | * transactions to use those cache entries so execute invalidations. |
3064 | | */ |
3065 | 0 | if (txn->ninvalidations > 0) |
3066 | 0 | ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, |
3067 | 0 | txn->invalidations); |
3068 | 0 | } |
3069 | | |
3070 | | /* cosmetic... */ |
3071 | 0 | txn->final_lsn = lsn; |
3072 | | |
3073 | | /* remove potential on-disk data, and deallocate */ |
3074 | 0 | ReorderBufferCleanupTXN(rb, txn); |
3075 | 0 | } |
3076 | | |
3077 | | /* |
3078 | | * Abort all transactions that aren't actually running anymore because the |
3079 | | * server restarted. |
3080 | | * |
3081 | | * NB: These really have to be transactions that have aborted due to a server |
3082 | | * crash/immediate restart, as we don't deal with invalidations here. |
3083 | | */ |
3084 | | void |
3085 | | ReorderBufferAbortOld(ReorderBuffer *rb, TransactionId oldestRunningXid) |
3086 | 0 | { |
3087 | 0 | dlist_mutable_iter it; |
3088 | | |
3089 | | /* |
3090 | | * Iterate through all (potential) toplevel TXNs and abort all that are |
3091 | | * older than what possibly can be running. Once we've found the first |
3092 | | * that is alive we stop, there might be some that acquired an xid earlier |
3093 | | * but started writing later, but it's unlikely and they will be cleaned |
3094 | | * up in a later call to this function. |
3095 | | */ |
3096 | 0 | dlist_foreach_modify(it, &rb->toplevel_by_lsn) |
3097 | 0 | { |
3098 | 0 | ReorderBufferTXN *txn; |
3099 | |
|
3100 | 0 | txn = dlist_container(ReorderBufferTXN, node, it.cur); |
3101 | |
|
3102 | 0 | if (TransactionIdPrecedes(txn->xid, oldestRunningXid)) |
3103 | 0 | { |
3104 | 0 | elog(DEBUG2, "aborting old transaction %u", txn->xid); |
3105 | | |
3106 | | /* Notify the remote node about the crash/immediate restart. */ |
3107 | 0 | if (rbtxn_is_streamed(txn)) |
3108 | 0 | rb->stream_abort(rb, txn, InvalidXLogRecPtr); |
3109 | | |
3110 | | /* remove potential on-disk data, and deallocate this tx */ |
3111 | 0 | ReorderBufferCleanupTXN(rb, txn); |
3112 | 0 | } |
3113 | 0 | else |
3114 | 0 | return; |
3115 | 0 | } |
3116 | 0 | } |
3117 | | |
3118 | | /* |
3119 | | * Forget the contents of a transaction if we aren't interested in its |
3120 | | * contents. Needs to be first called for subtransactions and then for the |
3121 | | * toplevel xid. |
3122 | | * |
3123 | | * This is significantly different to ReorderBufferAbort() because |
3124 | | * transactions that have committed need to be treated differently from aborted |
3125 | | * ones since they may have modified the catalog. |
3126 | | * |
3127 | | * Note that this is only allowed to be called in the moment a transaction |
3128 | | * commit has just been read, not earlier; otherwise later records referring |
3129 | | * to this xid might re-create the transaction incompletely. |
3130 | | */ |
3131 | | void |
3132 | | ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) |
3133 | 0 | { |
3134 | 0 | ReorderBufferTXN *txn; |
3135 | |
|
3136 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
3137 | 0 | false); |
3138 | | |
3139 | | /* unknown, nothing to forget */ |
3140 | 0 | if (txn == NULL) |
3141 | 0 | return; |
3142 | | |
3143 | | /* this transaction mustn't be streamed */ |
3144 | 0 | Assert(!rbtxn_is_streamed(txn)); |
3145 | | |
3146 | | /* cosmetic... */ |
3147 | 0 | txn->final_lsn = lsn; |
3148 | | |
3149 | | /* |
3150 | | * Process cache invalidation messages if there are any. Even if we're not |
3151 | | * interested in the transaction's contents, it could have manipulated the |
3152 | | * catalog and we need to update the caches according to that. |
3153 | | */ |
3154 | 0 | if (txn->base_snapshot != NULL && txn->ninvalidations > 0) |
3155 | 0 | ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, |
3156 | 0 | txn->invalidations); |
3157 | 0 | else |
3158 | 0 | Assert(txn->ninvalidations == 0); |
3159 | | |
3160 | | /* remove potential on-disk data, and deallocate */ |
3161 | 0 | ReorderBufferCleanupTXN(rb, txn); |
3162 | 0 | } |
3163 | | |
3164 | | /* |
3165 | | * Invalidate cache for those transactions that need to be skipped just in case |
3166 | | * catalogs were manipulated as part of the transaction. |
3167 | | * |
3168 | | * Note that this is a special-purpose function for prepared transactions where |
3169 | | * we don't want to clean up the TXN even when we decide to skip it. See |
3170 | | * DecodePrepare. |
3171 | | */ |
3172 | | void |
3173 | | ReorderBufferInvalidate(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) |
3174 | 0 | { |
3175 | 0 | ReorderBufferTXN *txn; |
3176 | |
|
3177 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
3178 | 0 | false); |
3179 | | |
3180 | | /* unknown, nothing to do */ |
3181 | 0 | if (txn == NULL) |
3182 | 0 | return; |
3183 | | |
3184 | | /* |
3185 | | * Process cache invalidation messages if there are any. Even if we're not |
3186 | | * interested in the transaction's contents, it could have manipulated the |
3187 | | * catalog and we need to update the caches according to that. |
3188 | | */ |
3189 | 0 | if (txn->base_snapshot != NULL && txn->ninvalidations > 0) |
3190 | 0 | ReorderBufferImmediateInvalidation(rb, txn->ninvalidations, |
3191 | 0 | txn->invalidations); |
3192 | 0 | else |
3193 | 0 | Assert(txn->ninvalidations == 0); |
3194 | 0 | } |
3195 | | |
3196 | | |
3197 | | /* |
3198 | | * Execute invalidations happening outside the context of a decoded |
3199 | | * transaction. That currently happens either for xid-less commits |
3200 | | * (cf. RecordTransactionCommit()) or for invalidations in uninteresting |
3201 | | * transactions (via ReorderBufferForget()). |
3202 | | */ |
3203 | | void |
3204 | | ReorderBufferImmediateInvalidation(ReorderBuffer *rb, uint32 ninvalidations, |
3205 | | SharedInvalidationMessage *invalidations) |
3206 | 0 | { |
3207 | 0 | bool use_subtxn = IsTransactionOrTransactionBlock(); |
3208 | 0 | int i; |
3209 | |
|
3210 | 0 | if (use_subtxn) |
3211 | 0 | BeginInternalSubTransaction("replay"); |
3212 | | |
3213 | | /* |
3214 | | * Force invalidations to happen outside of a valid transaction - that way |
3215 | | * entries will just be marked as invalid without accessing the catalog. |
3216 | | * That's advantageous because we don't need to setup the full state |
3217 | | * necessary for catalog access. |
3218 | | */ |
3219 | 0 | if (use_subtxn) |
3220 | 0 | AbortCurrentTransaction(); |
3221 | |
|
3222 | 0 | for (i = 0; i < ninvalidations; i++) |
3223 | 0 | LocalExecuteInvalidationMessage(&invalidations[i]); |
3224 | |
|
3225 | 0 | if (use_subtxn) |
3226 | 0 | RollbackAndReleaseCurrentSubTransaction(); |
3227 | 0 | } |
3228 | | |
3229 | | /* |
3230 | | * Tell reorderbuffer about an xid seen in the WAL stream. Has to be called at |
3231 | | * least once for every xid in XLogRecord->xl_xid (other places in records |
3232 | | * may, but do not have to be passed through here). |
3233 | | * |
3234 | | * Reorderbuffer keeps some data structures about transactions in LSN order, |
3235 | | * for efficiency. To do that it has to know about when transactions are seen |
3236 | | * first in the WAL. As many types of records are not actually interesting for |
3237 | | * logical decoding, they do not necessarily pass through here. |
3238 | | */ |
3239 | | void |
3240 | | ReorderBufferProcessXid(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn) |
3241 | 0 | { |
3242 | | /* many records won't have an xid assigned, centralize check here */ |
3243 | 0 | if (xid != InvalidTransactionId) |
3244 | 0 | ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
3245 | 0 | } |
3246 | | |
3247 | | /* |
3248 | | * Add a new snapshot to this transaction that may only used after lsn 'lsn' |
3249 | | * because the previous snapshot doesn't describe the catalog correctly for |
3250 | | * following rows. |
3251 | | */ |
3252 | | void |
3253 | | ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid, |
3254 | | XLogRecPtr lsn, Snapshot snap) |
3255 | 0 | { |
3256 | 0 | ReorderBufferChange *change = ReorderBufferAllocChange(rb); |
3257 | |
|
3258 | 0 | change->data.snapshot = snap; |
3259 | 0 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT; |
3260 | |
|
3261 | 0 | ReorderBufferQueueChange(rb, xid, lsn, change, false); |
3262 | 0 | } |
3263 | | |
3264 | | /* |
3265 | | * Set up the transaction's base snapshot. |
3266 | | * |
3267 | | * If we know that xid is a subtransaction, set the base snapshot on the |
3268 | | * top-level transaction instead. |
3269 | | */ |
3270 | | void |
3271 | | ReorderBufferSetBaseSnapshot(ReorderBuffer *rb, TransactionId xid, |
3272 | | XLogRecPtr lsn, Snapshot snap) |
3273 | 0 | { |
3274 | 0 | ReorderBufferTXN *txn; |
3275 | 0 | bool is_new; |
3276 | |
|
3277 | 0 | Assert(snap != NULL); |
3278 | | |
3279 | | /* |
3280 | | * Fetch the transaction to operate on. If we know it's a subtransaction, |
3281 | | * operate on its top-level transaction instead. |
3282 | | */ |
3283 | 0 | txn = ReorderBufferTXNByXid(rb, xid, true, &is_new, lsn, true); |
3284 | 0 | if (rbtxn_is_known_subxact(txn)) |
3285 | 0 | txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, |
3286 | 0 | NULL, InvalidXLogRecPtr, false); |
3287 | 0 | Assert(txn->base_snapshot == NULL); |
3288 | |
|
3289 | 0 | txn->base_snapshot = snap; |
3290 | 0 | txn->base_snapshot_lsn = lsn; |
3291 | 0 | dlist_push_tail(&rb->txns_by_base_snapshot_lsn, &txn->base_snapshot_node); |
3292 | |
|
3293 | 0 | AssertTXNLsnOrder(rb); |
3294 | 0 | } |
3295 | | |
3296 | | /* |
3297 | | * Access the catalog with this CommandId at this point in the changestream. |
3298 | | * |
3299 | | * May only be called for command ids > 1 |
3300 | | */ |
3301 | | void |
3302 | | ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid, |
3303 | | XLogRecPtr lsn, CommandId cid) |
3304 | 0 | { |
3305 | 0 | ReorderBufferChange *change = ReorderBufferAllocChange(rb); |
3306 | |
|
3307 | 0 | change->data.command_id = cid; |
3308 | 0 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID; |
3309 | |
|
3310 | 0 | ReorderBufferQueueChange(rb, xid, lsn, change, false); |
3311 | 0 | } |
3312 | | |
3313 | | /* |
3314 | | * Update memory counters to account for the new or removed change. |
3315 | | * |
3316 | | * We update two counters - in the reorder buffer, and in the transaction |
3317 | | * containing the change. The reorder buffer counter allows us to quickly |
3318 | | * decide if we reached the memory limit, the transaction counter allows |
3319 | | * us to quickly pick the largest transaction for eviction. |
3320 | | * |
3321 | | * Either txn or change must be non-NULL at least. We update the memory |
3322 | | * counter of txn if it's non-NULL, otherwise change->txn. |
3323 | | * |
3324 | | * When streaming is enabled, we need to update the toplevel transaction |
3325 | | * counters instead - we don't really care about subtransactions as we |
3326 | | * can't stream them individually anyway, and we only pick toplevel |
3327 | | * transactions for eviction. So only toplevel transactions matter. |
3328 | | */ |
3329 | | static void |
3330 | | ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb, |
3331 | | ReorderBufferChange *change, |
3332 | | ReorderBufferTXN *txn, |
3333 | | bool addition, Size sz) |
3334 | 0 | { |
3335 | 0 | ReorderBufferTXN *toptxn; |
3336 | |
|
3337 | 0 | Assert(txn || change); |
3338 | | |
3339 | | /* |
3340 | | * Ignore tuple CID changes, because those are not evicted when reaching |
3341 | | * memory limit. So we just don't count them, because it might easily |
3342 | | * trigger a pointless attempt to spill. |
3343 | | */ |
3344 | 0 | if (change && change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID) |
3345 | 0 | return; |
3346 | | |
3347 | 0 | if (sz == 0) |
3348 | 0 | return; |
3349 | | |
3350 | 0 | if (txn == NULL) |
3351 | 0 | txn = change->txn; |
3352 | 0 | Assert(txn != NULL); |
3353 | | |
3354 | | /* |
3355 | | * Update the total size in top level as well. This is later used to |
3356 | | * compute the decoding stats. |
3357 | | */ |
3358 | 0 | toptxn = rbtxn_get_toptxn(txn); |
3359 | |
|
3360 | 0 | if (addition) |
3361 | 0 | { |
3362 | 0 | Size oldsize = txn->size; |
3363 | |
|
3364 | 0 | txn->size += sz; |
3365 | 0 | rb->size += sz; |
3366 | | |
3367 | | /* Update the total size in the top transaction. */ |
3368 | 0 | toptxn->total_size += sz; |
3369 | | |
3370 | | /* Update the max-heap */ |
3371 | 0 | if (oldsize != 0) |
3372 | 0 | pairingheap_remove(rb->txn_heap, &txn->txn_node); |
3373 | 0 | pairingheap_add(rb->txn_heap, &txn->txn_node); |
3374 | 0 | } |
3375 | 0 | else |
3376 | 0 | { |
3377 | 0 | Assert((rb->size >= sz) && (txn->size >= sz)); |
3378 | 0 | txn->size -= sz; |
3379 | 0 | rb->size -= sz; |
3380 | | |
3381 | | /* Update the total size in the top transaction. */ |
3382 | 0 | toptxn->total_size -= sz; |
3383 | | |
3384 | | /* Update the max-heap */ |
3385 | 0 | pairingheap_remove(rb->txn_heap, &txn->txn_node); |
3386 | 0 | if (txn->size != 0) |
3387 | 0 | pairingheap_add(rb->txn_heap, &txn->txn_node); |
3388 | 0 | } |
3389 | |
|
3390 | 0 | Assert(txn->size <= rb->size); |
3391 | 0 | } |
3392 | | |
3393 | | /* |
3394 | | * Add new (relfilelocator, tid) -> (cmin, cmax) mappings. |
3395 | | * |
3396 | | * We do not include this change type in memory accounting, because we |
3397 | | * keep CIDs in a separate list and do not evict them when reaching |
3398 | | * the memory limit. |
3399 | | */ |
3400 | | void |
3401 | | ReorderBufferAddNewTupleCids(ReorderBuffer *rb, TransactionId xid, |
3402 | | XLogRecPtr lsn, RelFileLocator locator, |
3403 | | ItemPointerData tid, CommandId cmin, |
3404 | | CommandId cmax, CommandId combocid) |
3405 | 0 | { |
3406 | 0 | ReorderBufferChange *change = ReorderBufferAllocChange(rb); |
3407 | 0 | ReorderBufferTXN *txn; |
3408 | |
|
3409 | 0 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
3410 | |
|
3411 | 0 | change->data.tuplecid.locator = locator; |
3412 | 0 | change->data.tuplecid.tid = tid; |
3413 | 0 | change->data.tuplecid.cmin = cmin; |
3414 | 0 | change->data.tuplecid.cmax = cmax; |
3415 | 0 | change->data.tuplecid.combocid = combocid; |
3416 | 0 | change->lsn = lsn; |
3417 | 0 | change->txn = txn; |
3418 | 0 | change->action = REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID; |
3419 | |
|
3420 | 0 | dlist_push_tail(&txn->tuplecids, &change->node); |
3421 | 0 | txn->ntuplecids++; |
3422 | 0 | } |
3423 | | |
3424 | | /* |
3425 | | * Accumulate the invalidations for executing them later. |
3426 | | * |
3427 | | * This needs to be called for each XLOG_XACT_INVALIDATIONS message and |
3428 | | * accumulates all the invalidation messages in the toplevel transaction, if |
3429 | | * available, otherwise in the current transaction, as well as in the form of |
3430 | | * change in reorder buffer. We require to record it in form of the change |
3431 | | * so that we can execute only the required invalidations instead of executing |
3432 | | * all the invalidations on each CommandId increment. We also need to |
3433 | | * accumulate these in the txn buffer because in some cases where we skip |
3434 | | * processing the transaction (see ReorderBufferForget), we need to execute |
3435 | | * all the invalidations together. |
3436 | | */ |
3437 | | void |
3438 | | ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid, |
3439 | | XLogRecPtr lsn, Size nmsgs, |
3440 | | SharedInvalidationMessage *msgs) |
3441 | 0 | { |
3442 | 0 | ReorderBufferTXN *txn; |
3443 | 0 | MemoryContext oldcontext; |
3444 | 0 | ReorderBufferChange *change; |
3445 | |
|
3446 | 0 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
3447 | |
|
3448 | 0 | oldcontext = MemoryContextSwitchTo(rb->context); |
3449 | | |
3450 | | /* |
3451 | | * Collect all the invalidations under the top transaction, if available, |
3452 | | * so that we can execute them all together. See comments atop this |
3453 | | * function. |
3454 | | */ |
3455 | 0 | txn = rbtxn_get_toptxn(txn); |
3456 | |
|
3457 | 0 | Assert(nmsgs > 0); |
3458 | | |
3459 | | /* Accumulate invalidations. */ |
3460 | 0 | if (txn->ninvalidations == 0) |
3461 | 0 | { |
3462 | 0 | txn->ninvalidations = nmsgs; |
3463 | 0 | txn->invalidations = (SharedInvalidationMessage *) |
3464 | 0 | palloc(sizeof(SharedInvalidationMessage) * nmsgs); |
3465 | 0 | memcpy(txn->invalidations, msgs, |
3466 | 0 | sizeof(SharedInvalidationMessage) * nmsgs); |
3467 | 0 | } |
3468 | 0 | else |
3469 | 0 | { |
3470 | 0 | txn->invalidations = (SharedInvalidationMessage *) |
3471 | 0 | repalloc(txn->invalidations, sizeof(SharedInvalidationMessage) * |
3472 | 0 | (txn->ninvalidations + nmsgs)); |
3473 | |
|
3474 | 0 | memcpy(txn->invalidations + txn->ninvalidations, msgs, |
3475 | 0 | nmsgs * sizeof(SharedInvalidationMessage)); |
3476 | 0 | txn->ninvalidations += nmsgs; |
3477 | 0 | } |
3478 | |
|
3479 | 0 | change = ReorderBufferAllocChange(rb); |
3480 | 0 | change->action = REORDER_BUFFER_CHANGE_INVALIDATION; |
3481 | 0 | change->data.inval.ninvalidations = nmsgs; |
3482 | 0 | change->data.inval.invalidations = (SharedInvalidationMessage *) |
3483 | 0 | palloc(sizeof(SharedInvalidationMessage) * nmsgs); |
3484 | 0 | memcpy(change->data.inval.invalidations, msgs, |
3485 | 0 | sizeof(SharedInvalidationMessage) * nmsgs); |
3486 | |
|
3487 | 0 | ReorderBufferQueueChange(rb, xid, lsn, change, false); |
3488 | |
|
3489 | 0 | MemoryContextSwitchTo(oldcontext); |
3490 | 0 | } |
3491 | | |
3492 | | /* |
3493 | | * Apply all invalidations we know. Possibly we only need parts at this point |
3494 | | * in the changestream but we don't know which those are. |
3495 | | */ |
3496 | | static void |
3497 | | ReorderBufferExecuteInvalidations(uint32 nmsgs, SharedInvalidationMessage *msgs) |
3498 | 0 | { |
3499 | 0 | int i; |
3500 | |
|
3501 | 0 | for (i = 0; i < nmsgs; i++) |
3502 | 0 | LocalExecuteInvalidationMessage(&msgs[i]); |
3503 | 0 | } |
3504 | | |
3505 | | /* |
3506 | | * Mark a transaction as containing catalog changes |
3507 | | */ |
3508 | | void |
3509 | | ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid, |
3510 | | XLogRecPtr lsn) |
3511 | 0 | { |
3512 | 0 | ReorderBufferTXN *txn; |
3513 | |
|
3514 | 0 | txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true); |
3515 | |
|
3516 | 0 | if (!rbtxn_has_catalog_changes(txn)) |
3517 | 0 | { |
3518 | 0 | txn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; |
3519 | 0 | dclist_push_tail(&rb->catchange_txns, &txn->catchange_node); |
3520 | 0 | } |
3521 | | |
3522 | | /* |
3523 | | * Mark top-level transaction as having catalog changes too if one of its |
3524 | | * children has so that the ReorderBufferBuildTupleCidHash can |
3525 | | * conveniently check just top-level transaction and decide whether to |
3526 | | * build the hash table or not. |
3527 | | */ |
3528 | 0 | if (rbtxn_is_subtxn(txn)) |
3529 | 0 | { |
3530 | 0 | ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn); |
3531 | |
|
3532 | 0 | if (!rbtxn_has_catalog_changes(toptxn)) |
3533 | 0 | { |
3534 | 0 | toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES; |
3535 | 0 | dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node); |
3536 | 0 | } |
3537 | 0 | } |
3538 | 0 | } |
3539 | | |
3540 | | /* |
3541 | | * Return palloc'ed array of the transactions that have changed catalogs. |
3542 | | * The returned array is sorted in xidComparator order. |
3543 | | * |
3544 | | * The caller must free the returned array when done with it. |
3545 | | */ |
3546 | | TransactionId * |
3547 | | ReorderBufferGetCatalogChangesXacts(ReorderBuffer *rb) |
3548 | 0 | { |
3549 | 0 | dlist_iter iter; |
3550 | 0 | TransactionId *xids = NULL; |
3551 | 0 | size_t xcnt = 0; |
3552 | | |
3553 | | /* Quick return if the list is empty */ |
3554 | 0 | if (dclist_count(&rb->catchange_txns) == 0) |
3555 | 0 | return NULL; |
3556 | | |
3557 | | /* Initialize XID array */ |
3558 | 0 | xids = (TransactionId *) palloc(sizeof(TransactionId) * |
3559 | 0 | dclist_count(&rb->catchange_txns)); |
3560 | 0 | dclist_foreach(iter, &rb->catchange_txns) |
3561 | 0 | { |
3562 | 0 | ReorderBufferTXN *txn = dclist_container(ReorderBufferTXN, |
3563 | 0 | catchange_node, |
3564 | 0 | iter.cur); |
3565 | |
|
3566 | 0 | Assert(rbtxn_has_catalog_changes(txn)); |
3567 | |
|
3568 | 0 | xids[xcnt++] = txn->xid; |
3569 | 0 | } |
3570 | |
|
3571 | 0 | qsort(xids, xcnt, sizeof(TransactionId), xidComparator); |
3572 | |
|
3573 | 0 | Assert(xcnt == dclist_count(&rb->catchange_txns)); |
3574 | 0 | return xids; |
3575 | 0 | } |
3576 | | |
3577 | | /* |
3578 | | * Query whether a transaction is already *known* to contain catalog |
3579 | | * changes. This can be wrong until directly before the commit! |
3580 | | */ |
3581 | | bool |
3582 | | ReorderBufferXidHasCatalogChanges(ReorderBuffer *rb, TransactionId xid) |
3583 | 0 | { |
3584 | 0 | ReorderBufferTXN *txn; |
3585 | |
|
3586 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
3587 | 0 | false); |
3588 | 0 | if (txn == NULL) |
3589 | 0 | return false; |
3590 | | |
3591 | 0 | return rbtxn_has_catalog_changes(txn); |
3592 | 0 | } |
3593 | | |
3594 | | /* |
3595 | | * ReorderBufferXidHasBaseSnapshot |
3596 | | * Have we already set the base snapshot for the given txn/subtxn? |
3597 | | */ |
3598 | | bool |
3599 | | ReorderBufferXidHasBaseSnapshot(ReorderBuffer *rb, TransactionId xid) |
3600 | 0 | { |
3601 | 0 | ReorderBufferTXN *txn; |
3602 | |
|
3603 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, |
3604 | 0 | NULL, InvalidXLogRecPtr, false); |
3605 | | |
3606 | | /* transaction isn't known yet, ergo no snapshot */ |
3607 | 0 | if (txn == NULL) |
3608 | 0 | return false; |
3609 | | |
3610 | | /* a known subtxn? operate on top-level txn instead */ |
3611 | 0 | if (rbtxn_is_known_subxact(txn)) |
3612 | 0 | txn = ReorderBufferTXNByXid(rb, txn->toplevel_xid, false, |
3613 | 0 | NULL, InvalidXLogRecPtr, false); |
3614 | |
|
3615 | 0 | return txn->base_snapshot != NULL; |
3616 | 0 | } |
3617 | | |
3618 | | |
3619 | | /* |
3620 | | * --------------------------------------- |
3621 | | * Disk serialization support |
3622 | | * --------------------------------------- |
3623 | | */ |
3624 | | |
3625 | | /* |
3626 | | * Ensure the IO buffer is >= sz. |
3627 | | */ |
3628 | | static void |
3629 | | ReorderBufferSerializeReserve(ReorderBuffer *rb, Size sz) |
3630 | 0 | { |
3631 | 0 | if (!rb->outbufsize) |
3632 | 0 | { |
3633 | 0 | rb->outbuf = MemoryContextAlloc(rb->context, sz); |
3634 | 0 | rb->outbufsize = sz; |
3635 | 0 | } |
3636 | 0 | else if (rb->outbufsize < sz) |
3637 | 0 | { |
3638 | 0 | rb->outbuf = repalloc(rb->outbuf, sz); |
3639 | 0 | rb->outbufsize = sz; |
3640 | 0 | } |
3641 | 0 | } |
3642 | | |
3643 | | |
3644 | | /* Compare two transactions by size */ |
3645 | | static int |
3646 | | ReorderBufferTXNSizeCompare(const pairingheap_node *a, const pairingheap_node *b, void *arg) |
3647 | 0 | { |
3648 | 0 | const ReorderBufferTXN *ta = pairingheap_const_container(ReorderBufferTXN, txn_node, a); |
3649 | 0 | const ReorderBufferTXN *tb = pairingheap_const_container(ReorderBufferTXN, txn_node, b); |
3650 | |
|
3651 | 0 | if (ta->size < tb->size) |
3652 | 0 | return -1; |
3653 | 0 | if (ta->size > tb->size) |
3654 | 0 | return 1; |
3655 | 0 | return 0; |
3656 | 0 | } |
3657 | | |
3658 | | /* |
3659 | | * Find the largest transaction (toplevel or subxact) to evict (spill to disk). |
3660 | | */ |
3661 | | static ReorderBufferTXN * |
3662 | | ReorderBufferLargestTXN(ReorderBuffer *rb) |
3663 | 0 | { |
3664 | 0 | ReorderBufferTXN *largest; |
3665 | | |
3666 | | /* Get the largest transaction from the max-heap */ |
3667 | 0 | largest = pairingheap_container(ReorderBufferTXN, txn_node, |
3668 | 0 | pairingheap_first(rb->txn_heap)); |
3669 | |
|
3670 | 0 | Assert(largest); |
3671 | 0 | Assert(largest->size > 0); |
3672 | 0 | Assert(largest->size <= rb->size); |
3673 | |
|
3674 | 0 | return largest; |
3675 | 0 | } |
3676 | | |
3677 | | /* |
3678 | | * Find the largest streamable (and non-aborted) toplevel transaction to evict |
3679 | | * (by streaming). |
3680 | | * |
3681 | | * This can be seen as an optimized version of ReorderBufferLargestTXN, which |
3682 | | * should give us the same transaction (because we don't update memory account |
3683 | | * for subtransaction with streaming, so it's always 0). But we can simply |
3684 | | * iterate over the limited number of toplevel transactions that have a base |
3685 | | * snapshot. There is no use of selecting a transaction that doesn't have base |
3686 | | * snapshot because we don't decode such transactions. Also, we do not select |
3687 | | * the transaction which doesn't have any streamable change. |
3688 | | * |
3689 | | * Note that, we skip transactions that contain incomplete changes. There |
3690 | | * is a scope of optimization here such that we can select the largest |
3691 | | * transaction which has incomplete changes. But that will make the code and |
3692 | | * design quite complex and that might not be worth the benefit. If we plan to |
3693 | | * stream the transactions that contain incomplete changes then we need to |
3694 | | * find a way to partially stream/truncate the transaction changes in-memory |
3695 | | * and build a mechanism to partially truncate the spilled files. |
3696 | | * Additionally, whenever we partially stream the transaction we need to |
3697 | | * maintain the last streamed lsn and next time we need to restore from that |
3698 | | * segment and the offset in WAL. As we stream the changes from the top |
3699 | | * transaction and restore them subtransaction wise, we need to even remember |
3700 | | * the subxact from where we streamed the last change. |
3701 | | */ |
3702 | | static ReorderBufferTXN * |
3703 | | ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb) |
3704 | 0 | { |
3705 | 0 | dlist_iter iter; |
3706 | 0 | Size largest_size = 0; |
3707 | 0 | ReorderBufferTXN *largest = NULL; |
3708 | | |
3709 | | /* Find the largest top-level transaction having a base snapshot. */ |
3710 | 0 | dlist_foreach(iter, &rb->txns_by_base_snapshot_lsn) |
3711 | 0 | { |
3712 | 0 | ReorderBufferTXN *txn; |
3713 | |
|
3714 | 0 | txn = dlist_container(ReorderBufferTXN, base_snapshot_node, iter.cur); |
3715 | | |
3716 | | /* must not be a subtxn */ |
3717 | 0 | Assert(!rbtxn_is_known_subxact(txn)); |
3718 | | /* base_snapshot must be set */ |
3719 | 0 | Assert(txn->base_snapshot != NULL); |
3720 | | |
3721 | | /* Don't consider these kinds of transactions for eviction. */ |
3722 | 0 | if (rbtxn_has_partial_change(txn) || |
3723 | 0 | !rbtxn_has_streamable_change(txn) || |
3724 | 0 | rbtxn_is_aborted(txn)) |
3725 | 0 | continue; |
3726 | | |
3727 | | /* Find the largest of the eviction candidates. */ |
3728 | 0 | if ((largest == NULL || txn->total_size > largest_size) && |
3729 | 0 | (txn->total_size > 0)) |
3730 | 0 | { |
3731 | 0 | largest = txn; |
3732 | 0 | largest_size = txn->total_size; |
3733 | 0 | } |
3734 | 0 | } |
3735 | |
|
3736 | 0 | return largest; |
3737 | 0 | } |
3738 | | |
3739 | | /* |
3740 | | * Check whether the logical_decoding_work_mem limit was reached, and if yes |
3741 | | * pick the largest (sub)transaction at-a-time to evict and spill its changes to |
3742 | | * disk or send to the output plugin until we reach under the memory limit. |
3743 | | * |
3744 | | * If debug_logical_replication_streaming is set to "immediate", stream or |
3745 | | * serialize the changes immediately. |
3746 | | * |
3747 | | * XXX At this point we select the transactions until we reach under the memory |
3748 | | * limit, but we might also adapt a more elaborate eviction strategy - for example |
3749 | | * evicting enough transactions to free certain fraction (e.g. 50%) of the memory |
3750 | | * limit. |
3751 | | */ |
3752 | | static void |
3753 | | ReorderBufferCheckMemoryLimit(ReorderBuffer *rb) |
3754 | 0 | { |
3755 | 0 | ReorderBufferTXN *txn; |
3756 | | |
3757 | | /* |
3758 | | * Bail out if debug_logical_replication_streaming is buffered and we |
3759 | | * haven't exceeded the memory limit. |
3760 | | */ |
3761 | 0 | if (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_BUFFERED && |
3762 | 0 | rb->size < logical_decoding_work_mem * (Size) 1024) |
3763 | 0 | return; |
3764 | | |
3765 | | /* |
3766 | | * If debug_logical_replication_streaming is immediate, loop until there's |
3767 | | * no change. Otherwise, loop until we reach under the memory limit. One |
3768 | | * might think that just by evicting the largest (sub)transaction we will |
3769 | | * come under the memory limit based on assumption that the selected |
3770 | | * transaction is at least as large as the most recent change (which |
3771 | | * caused us to go over the memory limit). However, that is not true |
3772 | | * because a user can reduce the logical_decoding_work_mem to a smaller |
3773 | | * value before the most recent change. |
3774 | | */ |
3775 | 0 | while (rb->size >= logical_decoding_work_mem * (Size) 1024 || |
3776 | 0 | (debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE && |
3777 | 0 | rb->size > 0)) |
3778 | 0 | { |
3779 | | /* |
3780 | | * Pick the largest non-aborted transaction and evict it from memory |
3781 | | * by streaming, if possible. Otherwise, spill to disk. |
3782 | | */ |
3783 | 0 | if (ReorderBufferCanStartStreaming(rb) && |
3784 | 0 | (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL) |
3785 | 0 | { |
3786 | | /* we know there has to be one, because the size is not zero */ |
3787 | 0 | Assert(txn && rbtxn_is_toptxn(txn)); |
3788 | 0 | Assert(txn->total_size > 0); |
3789 | 0 | Assert(rb->size >= txn->total_size); |
3790 | | |
3791 | | /* skip the transaction if aborted */ |
3792 | 0 | if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn)) |
3793 | 0 | continue; |
3794 | | |
3795 | 0 | ReorderBufferStreamTXN(rb, txn); |
3796 | 0 | } |
3797 | 0 | else |
3798 | 0 | { |
3799 | | /* |
3800 | | * Pick the largest transaction (or subtransaction) and evict it |
3801 | | * from memory by serializing it to disk. |
3802 | | */ |
3803 | 0 | txn = ReorderBufferLargestTXN(rb); |
3804 | | |
3805 | | /* we know there has to be one, because the size is not zero */ |
3806 | 0 | Assert(txn); |
3807 | 0 | Assert(txn->size > 0); |
3808 | 0 | Assert(rb->size >= txn->size); |
3809 | | |
3810 | | /* skip the transaction if aborted */ |
3811 | 0 | if (ReorderBufferCheckAndTruncateAbortedTXN(rb, txn)) |
3812 | 0 | continue; |
3813 | | |
3814 | 0 | ReorderBufferSerializeTXN(rb, txn); |
3815 | 0 | } |
3816 | | |
3817 | | /* |
3818 | | * After eviction, the transaction should have no entries in memory, |
3819 | | * and should use 0 bytes for changes. |
3820 | | */ |
3821 | 0 | Assert(txn->size == 0); |
3822 | 0 | Assert(txn->nentries_mem == 0); |
3823 | 0 | } |
3824 | | |
3825 | | /* We must be under the memory limit now. */ |
3826 | 0 | Assert(rb->size < logical_decoding_work_mem * (Size) 1024); |
3827 | 0 | } |
3828 | | |
3829 | | /* |
3830 | | * Spill data of a large transaction (and its subtransactions) to disk. |
3831 | | */ |
3832 | | static void |
3833 | | ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
3834 | 0 | { |
3835 | 0 | dlist_iter subtxn_i; |
3836 | 0 | dlist_mutable_iter change_i; |
3837 | 0 | int fd = -1; |
3838 | 0 | XLogSegNo curOpenSegNo = 0; |
3839 | 0 | Size spilled = 0; |
3840 | 0 | Size size = txn->size; |
3841 | |
|
3842 | 0 | elog(DEBUG2, "spill %u changes in XID %u to disk", |
3843 | 0 | (uint32) txn->nentries_mem, txn->xid); |
3844 | | |
3845 | | /* do the same to all child TXs */ |
3846 | 0 | dlist_foreach(subtxn_i, &txn->subtxns) |
3847 | 0 | { |
3848 | 0 | ReorderBufferTXN *subtxn; |
3849 | |
|
3850 | 0 | subtxn = dlist_container(ReorderBufferTXN, node, subtxn_i.cur); |
3851 | 0 | ReorderBufferSerializeTXN(rb, subtxn); |
3852 | 0 | } |
3853 | | |
3854 | | /* serialize changestream */ |
3855 | 0 | dlist_foreach_modify(change_i, &txn->changes) |
3856 | 0 | { |
3857 | 0 | ReorderBufferChange *change; |
3858 | |
|
3859 | 0 | change = dlist_container(ReorderBufferChange, node, change_i.cur); |
3860 | | |
3861 | | /* |
3862 | | * store in segment in which it belongs by start lsn, don't split over |
3863 | | * multiple segments tho |
3864 | | */ |
3865 | 0 | if (fd == -1 || |
3866 | 0 | !XLByteInSeg(change->lsn, curOpenSegNo, wal_segment_size)) |
3867 | 0 | { |
3868 | 0 | char path[MAXPGPATH]; |
3869 | |
|
3870 | 0 | if (fd != -1) |
3871 | 0 | CloseTransientFile(fd); |
3872 | |
|
3873 | 0 | XLByteToSeg(change->lsn, curOpenSegNo, wal_segment_size); |
3874 | | |
3875 | | /* |
3876 | | * No need to care about TLIs here, only used during a single run, |
3877 | | * so each LSN only maps to a specific WAL record. |
3878 | | */ |
3879 | 0 | ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, |
3880 | 0 | curOpenSegNo); |
3881 | | |
3882 | | /* open segment, create it if necessary */ |
3883 | 0 | fd = OpenTransientFile(path, |
3884 | 0 | O_CREAT | O_WRONLY | O_APPEND | PG_BINARY); |
3885 | |
|
3886 | 0 | if (fd < 0) |
3887 | 0 | ereport(ERROR, |
3888 | 0 | (errcode_for_file_access(), |
3889 | 0 | errmsg("could not open file \"%s\": %m", path))); |
3890 | 0 | } |
3891 | | |
3892 | 0 | ReorderBufferSerializeChange(rb, txn, fd, change); |
3893 | 0 | dlist_delete(&change->node); |
3894 | 0 | ReorderBufferFreeChange(rb, change, false); |
3895 | |
|
3896 | 0 | spilled++; |
3897 | 0 | } |
3898 | | |
3899 | | /* Update the memory counter */ |
3900 | 0 | ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, size); |
3901 | | |
3902 | | /* update the statistics iff we have spilled anything */ |
3903 | 0 | if (spilled) |
3904 | 0 | { |
3905 | 0 | rb->spillCount += 1; |
3906 | 0 | rb->spillBytes += size; |
3907 | | |
3908 | | /* don't consider already serialized transactions */ |
3909 | 0 | rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1; |
3910 | | |
3911 | | /* update the decoding stats */ |
3912 | 0 | UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); |
3913 | 0 | } |
3914 | |
|
3915 | 0 | Assert(spilled == txn->nentries_mem); |
3916 | 0 | Assert(dlist_is_empty(&txn->changes)); |
3917 | 0 | txn->nentries_mem = 0; |
3918 | 0 | txn->txn_flags |= RBTXN_IS_SERIALIZED; |
3919 | |
|
3920 | 0 | if (fd != -1) |
3921 | 0 | CloseTransientFile(fd); |
3922 | 0 | } |
3923 | | |
3924 | | /* |
3925 | | * Serialize individual change to disk. |
3926 | | */ |
3927 | | static void |
3928 | | ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
3929 | | int fd, ReorderBufferChange *change) |
3930 | 0 | { |
3931 | 0 | ReorderBufferDiskChange *ondisk; |
3932 | 0 | Size sz = sizeof(ReorderBufferDiskChange); |
3933 | |
|
3934 | 0 | ReorderBufferSerializeReserve(rb, sz); |
3935 | |
|
3936 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
3937 | 0 | memcpy(&ondisk->change, change, sizeof(ReorderBufferChange)); |
3938 | |
|
3939 | 0 | switch (change->action) |
3940 | 0 | { |
3941 | | /* fall through these, they're all similar enough */ |
3942 | 0 | case REORDER_BUFFER_CHANGE_INSERT: |
3943 | 0 | case REORDER_BUFFER_CHANGE_UPDATE: |
3944 | 0 | case REORDER_BUFFER_CHANGE_DELETE: |
3945 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
3946 | 0 | { |
3947 | 0 | char *data; |
3948 | 0 | HeapTuple oldtup, |
3949 | 0 | newtup; |
3950 | 0 | Size oldlen = 0; |
3951 | 0 | Size newlen = 0; |
3952 | |
|
3953 | 0 | oldtup = change->data.tp.oldtuple; |
3954 | 0 | newtup = change->data.tp.newtuple; |
3955 | |
|
3956 | 0 | if (oldtup) |
3957 | 0 | { |
3958 | 0 | sz += sizeof(HeapTupleData); |
3959 | 0 | oldlen = oldtup->t_len; |
3960 | 0 | sz += oldlen; |
3961 | 0 | } |
3962 | |
|
3963 | 0 | if (newtup) |
3964 | 0 | { |
3965 | 0 | sz += sizeof(HeapTupleData); |
3966 | 0 | newlen = newtup->t_len; |
3967 | 0 | sz += newlen; |
3968 | 0 | } |
3969 | | |
3970 | | /* make sure we have enough space */ |
3971 | 0 | ReorderBufferSerializeReserve(rb, sz); |
3972 | |
|
3973 | 0 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
3974 | | /* might have been reallocated above */ |
3975 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
3976 | |
|
3977 | 0 | if (oldlen) |
3978 | 0 | { |
3979 | 0 | memcpy(data, oldtup, sizeof(HeapTupleData)); |
3980 | 0 | data += sizeof(HeapTupleData); |
3981 | |
|
3982 | 0 | memcpy(data, oldtup->t_data, oldlen); |
3983 | 0 | data += oldlen; |
3984 | 0 | } |
3985 | |
|
3986 | 0 | if (newlen) |
3987 | 0 | { |
3988 | 0 | memcpy(data, newtup, sizeof(HeapTupleData)); |
3989 | 0 | data += sizeof(HeapTupleData); |
3990 | |
|
3991 | 0 | memcpy(data, newtup->t_data, newlen); |
3992 | 0 | data += newlen; |
3993 | 0 | } |
3994 | 0 | break; |
3995 | 0 | } |
3996 | 0 | case REORDER_BUFFER_CHANGE_MESSAGE: |
3997 | 0 | { |
3998 | 0 | char *data; |
3999 | 0 | Size prefix_size = strlen(change->data.msg.prefix) + 1; |
4000 | |
|
4001 | 0 | sz += prefix_size + change->data.msg.message_size + |
4002 | 0 | sizeof(Size) + sizeof(Size); |
4003 | 0 | ReorderBufferSerializeReserve(rb, sz); |
4004 | |
|
4005 | 0 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
4006 | | |
4007 | | /* might have been reallocated above */ |
4008 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
4009 | | |
4010 | | /* write the prefix including the size */ |
4011 | 0 | memcpy(data, &prefix_size, sizeof(Size)); |
4012 | 0 | data += sizeof(Size); |
4013 | 0 | memcpy(data, change->data.msg.prefix, |
4014 | 0 | prefix_size); |
4015 | 0 | data += prefix_size; |
4016 | | |
4017 | | /* write the message including the size */ |
4018 | 0 | memcpy(data, &change->data.msg.message_size, sizeof(Size)); |
4019 | 0 | data += sizeof(Size); |
4020 | 0 | memcpy(data, change->data.msg.message, |
4021 | 0 | change->data.msg.message_size); |
4022 | 0 | data += change->data.msg.message_size; |
4023 | |
|
4024 | 0 | break; |
4025 | 0 | } |
4026 | 0 | case REORDER_BUFFER_CHANGE_INVALIDATION: |
4027 | 0 | { |
4028 | 0 | char *data; |
4029 | 0 | Size inval_size = sizeof(SharedInvalidationMessage) * |
4030 | 0 | change->data.inval.ninvalidations; |
4031 | |
|
4032 | 0 | sz += inval_size; |
4033 | |
|
4034 | 0 | ReorderBufferSerializeReserve(rb, sz); |
4035 | 0 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
4036 | | |
4037 | | /* might have been reallocated above */ |
4038 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
4039 | 0 | memcpy(data, change->data.inval.invalidations, inval_size); |
4040 | 0 | data += inval_size; |
4041 | |
|
4042 | 0 | break; |
4043 | 0 | } |
4044 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
4045 | 0 | { |
4046 | 0 | Snapshot snap; |
4047 | 0 | char *data; |
4048 | |
|
4049 | 0 | snap = change->data.snapshot; |
4050 | |
|
4051 | 0 | sz += sizeof(SnapshotData) + |
4052 | 0 | sizeof(TransactionId) * snap->xcnt + |
4053 | 0 | sizeof(TransactionId) * snap->subxcnt; |
4054 | | |
4055 | | /* make sure we have enough space */ |
4056 | 0 | ReorderBufferSerializeReserve(rb, sz); |
4057 | 0 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
4058 | | /* might have been reallocated above */ |
4059 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
4060 | |
|
4061 | 0 | memcpy(data, snap, sizeof(SnapshotData)); |
4062 | 0 | data += sizeof(SnapshotData); |
4063 | |
|
4064 | 0 | if (snap->xcnt) |
4065 | 0 | { |
4066 | 0 | memcpy(data, snap->xip, |
4067 | 0 | sizeof(TransactionId) * snap->xcnt); |
4068 | 0 | data += sizeof(TransactionId) * snap->xcnt; |
4069 | 0 | } |
4070 | |
|
4071 | 0 | if (snap->subxcnt) |
4072 | 0 | { |
4073 | 0 | memcpy(data, snap->subxip, |
4074 | 0 | sizeof(TransactionId) * snap->subxcnt); |
4075 | 0 | data += sizeof(TransactionId) * snap->subxcnt; |
4076 | 0 | } |
4077 | 0 | break; |
4078 | 0 | } |
4079 | 0 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
4080 | 0 | { |
4081 | 0 | Size size; |
4082 | 0 | char *data; |
4083 | | |
4084 | | /* account for the OIDs of truncated relations */ |
4085 | 0 | size = sizeof(Oid) * change->data.truncate.nrelids; |
4086 | 0 | sz += size; |
4087 | | |
4088 | | /* make sure we have enough space */ |
4089 | 0 | ReorderBufferSerializeReserve(rb, sz); |
4090 | |
|
4091 | 0 | data = ((char *) rb->outbuf) + sizeof(ReorderBufferDiskChange); |
4092 | | /* might have been reallocated above */ |
4093 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
4094 | |
|
4095 | 0 | memcpy(data, change->data.truncate.relids, size); |
4096 | 0 | data += size; |
4097 | |
|
4098 | 0 | break; |
4099 | 0 | } |
4100 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
4101 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: |
4102 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
4103 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
4104 | | /* ReorderBufferChange contains everything important */ |
4105 | 0 | break; |
4106 | 0 | } |
4107 | | |
4108 | 0 | ondisk->size = sz; |
4109 | |
|
4110 | 0 | errno = 0; |
4111 | 0 | pgstat_report_wait_start(WAIT_EVENT_REORDER_BUFFER_WRITE); |
4112 | 0 | if (write(fd, rb->outbuf, ondisk->size) != ondisk->size) |
4113 | 0 | { |
4114 | 0 | int save_errno = errno; |
4115 | |
|
4116 | 0 | CloseTransientFile(fd); |
4117 | | |
4118 | | /* if write didn't set errno, assume problem is no disk space */ |
4119 | 0 | errno = save_errno ? save_errno : ENOSPC; |
4120 | 0 | ereport(ERROR, |
4121 | 0 | (errcode_for_file_access(), |
4122 | 0 | errmsg("could not write to data file for XID %u: %m", |
4123 | 0 | txn->xid))); |
4124 | 0 | } |
4125 | 0 | pgstat_report_wait_end(); |
4126 | | |
4127 | | /* |
4128 | | * Keep the transaction's final_lsn up to date with each change we send to |
4129 | | * disk, so that ReorderBufferRestoreCleanup works correctly. (We used to |
4130 | | * only do this on commit and abort records, but that doesn't work if a |
4131 | | * system crash leaves a transaction without its abort record). |
4132 | | * |
4133 | | * Make sure not to move it backwards. |
4134 | | */ |
4135 | 0 | if (txn->final_lsn < change->lsn) |
4136 | 0 | txn->final_lsn = change->lsn; |
4137 | |
|
4138 | 0 | Assert(ondisk->change.action == change->action); |
4139 | 0 | } |
4140 | | |
4141 | | /* Returns true, if the output plugin supports streaming, false, otherwise. */ |
4142 | | static inline bool |
4143 | | ReorderBufferCanStream(ReorderBuffer *rb) |
4144 | 0 | { |
4145 | 0 | LogicalDecodingContext *ctx = rb->private_data; |
4146 | |
|
4147 | 0 | return ctx->streaming; |
4148 | 0 | } |
4149 | | |
4150 | | /* Returns true, if the streaming can be started now, false, otherwise. */ |
4151 | | static inline bool |
4152 | | ReorderBufferCanStartStreaming(ReorderBuffer *rb) |
4153 | 0 | { |
4154 | 0 | LogicalDecodingContext *ctx = rb->private_data; |
4155 | 0 | SnapBuild *builder = ctx->snapshot_builder; |
4156 | | |
4157 | | /* We can't start streaming unless a consistent state is reached. */ |
4158 | 0 | if (SnapBuildCurrentState(builder) < SNAPBUILD_CONSISTENT) |
4159 | 0 | return false; |
4160 | | |
4161 | | /* |
4162 | | * We can't start streaming immediately even if the streaming is enabled |
4163 | | * because we previously decoded this transaction and now just are |
4164 | | * restarting. |
4165 | | */ |
4166 | 0 | if (ReorderBufferCanStream(rb) && |
4167 | 0 | !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr)) |
4168 | 0 | return true; |
4169 | | |
4170 | 0 | return false; |
4171 | 0 | } |
4172 | | |
4173 | | /* |
4174 | | * Send data of a large transaction (and its subtransactions) to the |
4175 | | * output plugin, but using the stream API. |
4176 | | */ |
4177 | | static void |
4178 | | ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn) |
4179 | 0 | { |
4180 | 0 | Snapshot snapshot_now; |
4181 | 0 | CommandId command_id; |
4182 | 0 | Size stream_bytes; |
4183 | 0 | bool txn_is_streamed; |
4184 | | |
4185 | | /* We can never reach here for a subtransaction. */ |
4186 | 0 | Assert(rbtxn_is_toptxn(txn)); |
4187 | | |
4188 | | /* |
4189 | | * We can't make any assumptions about base snapshot here, similar to what |
4190 | | * ReorderBufferCommit() does. That relies on base_snapshot getting |
4191 | | * transferred from subxact in ReorderBufferCommitChild(), but that was |
4192 | | * not yet called as the transaction is in-progress. |
4193 | | * |
4194 | | * So just walk the subxacts and use the same logic here. But we only need |
4195 | | * to do that once, when the transaction is streamed for the first time. |
4196 | | * After that we need to reuse the snapshot from the previous run. |
4197 | | * |
4198 | | * Unlike DecodeCommit which adds xids of all the subtransactions in |
4199 | | * snapshot's xip array via SnapBuildCommitTxn, we can't do that here but |
4200 | | * we do add them to subxip array instead via ReorderBufferCopySnap. This |
4201 | | * allows the catalog changes made in subtransactions decoded till now to |
4202 | | * be visible. |
4203 | | */ |
4204 | 0 | if (txn->snapshot_now == NULL) |
4205 | 0 | { |
4206 | 0 | dlist_iter subxact_i; |
4207 | | |
4208 | | /* make sure this transaction is streamed for the first time */ |
4209 | 0 | Assert(!rbtxn_is_streamed(txn)); |
4210 | | |
4211 | | /* at the beginning we should have invalid command ID */ |
4212 | 0 | Assert(txn->command_id == InvalidCommandId); |
4213 | |
|
4214 | 0 | dlist_foreach(subxact_i, &txn->subtxns) |
4215 | 0 | { |
4216 | 0 | ReorderBufferTXN *subtxn; |
4217 | |
|
4218 | 0 | subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur); |
4219 | 0 | ReorderBufferTransferSnapToParent(txn, subtxn); |
4220 | 0 | } |
4221 | | |
4222 | | /* |
4223 | | * If this transaction has no snapshot, it didn't make any changes to |
4224 | | * the database till now, so there's nothing to decode. |
4225 | | */ |
4226 | 0 | if (txn->base_snapshot == NULL) |
4227 | 0 | { |
4228 | 0 | Assert(txn->ninvalidations == 0); |
4229 | 0 | return; |
4230 | 0 | } |
4231 | | |
4232 | 0 | command_id = FirstCommandId; |
4233 | 0 | snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot, |
4234 | 0 | txn, command_id); |
4235 | 0 | } |
4236 | 0 | else |
4237 | 0 | { |
4238 | | /* the transaction must have been already streamed */ |
4239 | 0 | Assert(rbtxn_is_streamed(txn)); |
4240 | | |
4241 | | /* |
4242 | | * Nah, we already have snapshot from the previous streaming run. We |
4243 | | * assume new subxacts can't move the LSN backwards, and so can't beat |
4244 | | * the LSN condition in the previous branch (so no need to walk |
4245 | | * through subxacts again). In fact, we must not do that as we may be |
4246 | | * using snapshot half-way through the subxact. |
4247 | | */ |
4248 | 0 | command_id = txn->command_id; |
4249 | | |
4250 | | /* |
4251 | | * We can't use txn->snapshot_now directly because after the last |
4252 | | * streaming run, we might have got some new sub-transactions. So we |
4253 | | * need to add them to the snapshot. |
4254 | | */ |
4255 | 0 | snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now, |
4256 | 0 | txn, command_id); |
4257 | | |
4258 | | /* Free the previously copied snapshot. */ |
4259 | 0 | Assert(txn->snapshot_now->copied); |
4260 | 0 | ReorderBufferFreeSnap(rb, txn->snapshot_now); |
4261 | 0 | txn->snapshot_now = NULL; |
4262 | 0 | } |
4263 | | |
4264 | | /* |
4265 | | * Remember this information to be used later to update stats. We can't |
4266 | | * update the stats here as an error while processing the changes would |
4267 | | * lead to the accumulation of stats even though we haven't streamed all |
4268 | | * the changes. |
4269 | | */ |
4270 | 0 | txn_is_streamed = rbtxn_is_streamed(txn); |
4271 | 0 | stream_bytes = txn->total_size; |
4272 | | |
4273 | | /* Process and send the changes to output plugin. */ |
4274 | 0 | ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now, |
4275 | 0 | command_id, true); |
4276 | |
|
4277 | 0 | rb->streamCount += 1; |
4278 | 0 | rb->streamBytes += stream_bytes; |
4279 | | |
4280 | | /* Don't consider already streamed transaction. */ |
4281 | 0 | rb->streamTxns += (txn_is_streamed) ? 0 : 1; |
4282 | | |
4283 | | /* update the decoding stats */ |
4284 | 0 | UpdateDecodingStats((LogicalDecodingContext *) rb->private_data); |
4285 | |
|
4286 | 0 | Assert(dlist_is_empty(&txn->changes)); |
4287 | 0 | Assert(txn->nentries == 0); |
4288 | 0 | Assert(txn->nentries_mem == 0); |
4289 | 0 | } |
4290 | | |
4291 | | /* |
4292 | | * Size of a change in memory. |
4293 | | */ |
4294 | | static Size |
4295 | | ReorderBufferChangeSize(ReorderBufferChange *change) |
4296 | 0 | { |
4297 | 0 | Size sz = sizeof(ReorderBufferChange); |
4298 | |
|
4299 | 0 | switch (change->action) |
4300 | 0 | { |
4301 | | /* fall through these, they're all similar enough */ |
4302 | 0 | case REORDER_BUFFER_CHANGE_INSERT: |
4303 | 0 | case REORDER_BUFFER_CHANGE_UPDATE: |
4304 | 0 | case REORDER_BUFFER_CHANGE_DELETE: |
4305 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
4306 | 0 | { |
4307 | 0 | HeapTuple oldtup, |
4308 | 0 | newtup; |
4309 | 0 | Size oldlen = 0; |
4310 | 0 | Size newlen = 0; |
4311 | |
|
4312 | 0 | oldtup = change->data.tp.oldtuple; |
4313 | 0 | newtup = change->data.tp.newtuple; |
4314 | |
|
4315 | 0 | if (oldtup) |
4316 | 0 | { |
4317 | 0 | sz += sizeof(HeapTupleData); |
4318 | 0 | oldlen = oldtup->t_len; |
4319 | 0 | sz += oldlen; |
4320 | 0 | } |
4321 | |
|
4322 | 0 | if (newtup) |
4323 | 0 | { |
4324 | 0 | sz += sizeof(HeapTupleData); |
4325 | 0 | newlen = newtup->t_len; |
4326 | 0 | sz += newlen; |
4327 | 0 | } |
4328 | |
|
4329 | 0 | break; |
4330 | 0 | } |
4331 | 0 | case REORDER_BUFFER_CHANGE_MESSAGE: |
4332 | 0 | { |
4333 | 0 | Size prefix_size = strlen(change->data.msg.prefix) + 1; |
4334 | |
|
4335 | 0 | sz += prefix_size + change->data.msg.message_size + |
4336 | 0 | sizeof(Size) + sizeof(Size); |
4337 | |
|
4338 | 0 | break; |
4339 | 0 | } |
4340 | 0 | case REORDER_BUFFER_CHANGE_INVALIDATION: |
4341 | 0 | { |
4342 | 0 | sz += sizeof(SharedInvalidationMessage) * |
4343 | 0 | change->data.inval.ninvalidations; |
4344 | 0 | break; |
4345 | 0 | } |
4346 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
4347 | 0 | { |
4348 | 0 | Snapshot snap; |
4349 | |
|
4350 | 0 | snap = change->data.snapshot; |
4351 | |
|
4352 | 0 | sz += sizeof(SnapshotData) + |
4353 | 0 | sizeof(TransactionId) * snap->xcnt + |
4354 | 0 | sizeof(TransactionId) * snap->subxcnt; |
4355 | |
|
4356 | 0 | break; |
4357 | 0 | } |
4358 | 0 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
4359 | 0 | { |
4360 | 0 | sz += sizeof(Oid) * change->data.truncate.nrelids; |
4361 | |
|
4362 | 0 | break; |
4363 | 0 | } |
4364 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
4365 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: |
4366 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
4367 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
4368 | | /* ReorderBufferChange contains everything important */ |
4369 | 0 | break; |
4370 | 0 | } |
4371 | | |
4372 | 0 | return sz; |
4373 | 0 | } |
4374 | | |
4375 | | |
4376 | | /* |
4377 | | * Restore a number of changes spilled to disk back into memory. |
4378 | | */ |
4379 | | static Size |
4380 | | ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn, |
4381 | | TXNEntryFile *file, XLogSegNo *segno) |
4382 | 0 | { |
4383 | 0 | Size restored = 0; |
4384 | 0 | XLogSegNo last_segno; |
4385 | 0 | dlist_mutable_iter cleanup_iter; |
4386 | 0 | File *fd = &file->vfd; |
4387 | |
|
4388 | 0 | Assert(txn->first_lsn != InvalidXLogRecPtr); |
4389 | 0 | Assert(txn->final_lsn != InvalidXLogRecPtr); |
4390 | | |
4391 | | /* free current entries, so we have memory for more */ |
4392 | 0 | dlist_foreach_modify(cleanup_iter, &txn->changes) |
4393 | 0 | { |
4394 | 0 | ReorderBufferChange *cleanup = |
4395 | 0 | dlist_container(ReorderBufferChange, node, cleanup_iter.cur); |
4396 | |
|
4397 | 0 | dlist_delete(&cleanup->node); |
4398 | 0 | ReorderBufferFreeChange(rb, cleanup, true); |
4399 | 0 | } |
4400 | 0 | txn->nentries_mem = 0; |
4401 | 0 | Assert(dlist_is_empty(&txn->changes)); |
4402 | |
|
4403 | 0 | XLByteToSeg(txn->final_lsn, last_segno, wal_segment_size); |
4404 | |
|
4405 | 0 | while (restored < max_changes_in_memory && *segno <= last_segno) |
4406 | 0 | { |
4407 | 0 | int readBytes; |
4408 | 0 | ReorderBufferDiskChange *ondisk; |
4409 | |
|
4410 | 0 | CHECK_FOR_INTERRUPTS(); |
4411 | |
|
4412 | 0 | if (*fd == -1) |
4413 | 0 | { |
4414 | 0 | char path[MAXPGPATH]; |
4415 | | |
4416 | | /* first time in */ |
4417 | 0 | if (*segno == 0) |
4418 | 0 | XLByteToSeg(txn->first_lsn, *segno, wal_segment_size); |
4419 | |
|
4420 | 0 | Assert(*segno != 0 || dlist_is_empty(&txn->changes)); |
4421 | | |
4422 | | /* |
4423 | | * No need to care about TLIs here, only used during a single run, |
4424 | | * so each LSN only maps to a specific WAL record. |
4425 | | */ |
4426 | 0 | ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, |
4427 | 0 | *segno); |
4428 | |
|
4429 | 0 | *fd = PathNameOpenFile(path, O_RDONLY | PG_BINARY); |
4430 | | |
4431 | | /* No harm in resetting the offset even in case of failure */ |
4432 | 0 | file->curOffset = 0; |
4433 | |
|
4434 | 0 | if (*fd < 0 && errno == ENOENT) |
4435 | 0 | { |
4436 | 0 | *fd = -1; |
4437 | 0 | (*segno)++; |
4438 | 0 | continue; |
4439 | 0 | } |
4440 | 0 | else if (*fd < 0) |
4441 | 0 | ereport(ERROR, |
4442 | 0 | (errcode_for_file_access(), |
4443 | 0 | errmsg("could not open file \"%s\": %m", |
4444 | 0 | path))); |
4445 | 0 | } |
4446 | | |
4447 | | /* |
4448 | | * Read the statically sized part of a change which has information |
4449 | | * about the total size. If we couldn't read a record, we're at the |
4450 | | * end of this file. |
4451 | | */ |
4452 | 0 | ReorderBufferSerializeReserve(rb, sizeof(ReorderBufferDiskChange)); |
4453 | 0 | readBytes = FileRead(file->vfd, rb->outbuf, |
4454 | 0 | sizeof(ReorderBufferDiskChange), |
4455 | 0 | file->curOffset, WAIT_EVENT_REORDER_BUFFER_READ); |
4456 | | |
4457 | | /* eof */ |
4458 | 0 | if (readBytes == 0) |
4459 | 0 | { |
4460 | 0 | FileClose(*fd); |
4461 | 0 | *fd = -1; |
4462 | 0 | (*segno)++; |
4463 | 0 | continue; |
4464 | 0 | } |
4465 | 0 | else if (readBytes < 0) |
4466 | 0 | ereport(ERROR, |
4467 | 0 | (errcode_for_file_access(), |
4468 | 0 | errmsg("could not read from reorderbuffer spill file: %m"))); |
4469 | 0 | else if (readBytes != sizeof(ReorderBufferDiskChange)) |
4470 | 0 | ereport(ERROR, |
4471 | 0 | (errcode_for_file_access(), |
4472 | 0 | errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", |
4473 | 0 | readBytes, |
4474 | 0 | (uint32) sizeof(ReorderBufferDiskChange)))); |
4475 | | |
4476 | 0 | file->curOffset += readBytes; |
4477 | |
|
4478 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
4479 | |
|
4480 | 0 | ReorderBufferSerializeReserve(rb, |
4481 | 0 | sizeof(ReorderBufferDiskChange) + ondisk->size); |
4482 | 0 | ondisk = (ReorderBufferDiskChange *) rb->outbuf; |
4483 | |
|
4484 | 0 | readBytes = FileRead(file->vfd, |
4485 | 0 | rb->outbuf + sizeof(ReorderBufferDiskChange), |
4486 | 0 | ondisk->size - sizeof(ReorderBufferDiskChange), |
4487 | 0 | file->curOffset, |
4488 | 0 | WAIT_EVENT_REORDER_BUFFER_READ); |
4489 | |
|
4490 | 0 | if (readBytes < 0) |
4491 | 0 | ereport(ERROR, |
4492 | 0 | (errcode_for_file_access(), |
4493 | 0 | errmsg("could not read from reorderbuffer spill file: %m"))); |
4494 | 0 | else if (readBytes != ondisk->size - sizeof(ReorderBufferDiskChange)) |
4495 | 0 | ereport(ERROR, |
4496 | 0 | (errcode_for_file_access(), |
4497 | 0 | errmsg("could not read from reorderbuffer spill file: read %d instead of %u bytes", |
4498 | 0 | readBytes, |
4499 | 0 | (uint32) (ondisk->size - sizeof(ReorderBufferDiskChange))))); |
4500 | | |
4501 | 0 | file->curOffset += readBytes; |
4502 | | |
4503 | | /* |
4504 | | * ok, read a full change from disk, now restore it into proper |
4505 | | * in-memory format |
4506 | | */ |
4507 | 0 | ReorderBufferRestoreChange(rb, txn, rb->outbuf); |
4508 | 0 | restored++; |
4509 | 0 | } |
4510 | | |
4511 | 0 | return restored; |
4512 | 0 | } |
4513 | | |
4514 | | /* |
4515 | | * Convert change from its on-disk format to in-memory format and queue it onto |
4516 | | * the TXN's ->changes list. |
4517 | | * |
4518 | | * Note: although "data" is declared char*, at entry it points to a |
4519 | | * maxalign'd buffer, making it safe in most of this function to assume |
4520 | | * that the pointed-to data is suitably aligned for direct access. |
4521 | | */ |
4522 | | static void |
4523 | | ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn, |
4524 | | char *data) |
4525 | 0 | { |
4526 | 0 | ReorderBufferDiskChange *ondisk; |
4527 | 0 | ReorderBufferChange *change; |
4528 | |
|
4529 | 0 | ondisk = (ReorderBufferDiskChange *) data; |
4530 | |
|
4531 | 0 | change = ReorderBufferAllocChange(rb); |
4532 | | |
4533 | | /* copy static part */ |
4534 | 0 | memcpy(change, &ondisk->change, sizeof(ReorderBufferChange)); |
4535 | |
|
4536 | 0 | data += sizeof(ReorderBufferDiskChange); |
4537 | | |
4538 | | /* restore individual stuff */ |
4539 | 0 | switch (change->action) |
4540 | 0 | { |
4541 | | /* fall through these, they're all similar enough */ |
4542 | 0 | case REORDER_BUFFER_CHANGE_INSERT: |
4543 | 0 | case REORDER_BUFFER_CHANGE_UPDATE: |
4544 | 0 | case REORDER_BUFFER_CHANGE_DELETE: |
4545 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT: |
4546 | 0 | if (change->data.tp.oldtuple) |
4547 | 0 | { |
4548 | 0 | uint32 tuplelen = ((HeapTuple) data)->t_len; |
4549 | |
|
4550 | 0 | change->data.tp.oldtuple = |
4551 | 0 | ReorderBufferAllocTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); |
4552 | | |
4553 | | /* restore ->tuple */ |
4554 | 0 | memcpy(change->data.tp.oldtuple, data, |
4555 | 0 | sizeof(HeapTupleData)); |
4556 | 0 | data += sizeof(HeapTupleData); |
4557 | | |
4558 | | /* reset t_data pointer into the new tuplebuf */ |
4559 | 0 | change->data.tp.oldtuple->t_data = |
4560 | 0 | (HeapTupleHeader) ((char *) change->data.tp.oldtuple + HEAPTUPLESIZE); |
4561 | | |
4562 | | /* restore tuple data itself */ |
4563 | 0 | memcpy(change->data.tp.oldtuple->t_data, data, tuplelen); |
4564 | 0 | data += tuplelen; |
4565 | 0 | } |
4566 | |
|
4567 | 0 | if (change->data.tp.newtuple) |
4568 | 0 | { |
4569 | | /* here, data might not be suitably aligned! */ |
4570 | 0 | uint32 tuplelen; |
4571 | |
|
4572 | 0 | memcpy(&tuplelen, data + offsetof(HeapTupleData, t_len), |
4573 | 0 | sizeof(uint32)); |
4574 | |
|
4575 | 0 | change->data.tp.newtuple = |
4576 | 0 | ReorderBufferAllocTupleBuf(rb, tuplelen - SizeofHeapTupleHeader); |
4577 | | |
4578 | | /* restore ->tuple */ |
4579 | 0 | memcpy(change->data.tp.newtuple, data, |
4580 | 0 | sizeof(HeapTupleData)); |
4581 | 0 | data += sizeof(HeapTupleData); |
4582 | | |
4583 | | /* reset t_data pointer into the new tuplebuf */ |
4584 | 0 | change->data.tp.newtuple->t_data = |
4585 | 0 | (HeapTupleHeader) ((char *) change->data.tp.newtuple + HEAPTUPLESIZE); |
4586 | | |
4587 | | /* restore tuple data itself */ |
4588 | 0 | memcpy(change->data.tp.newtuple->t_data, data, tuplelen); |
4589 | 0 | data += tuplelen; |
4590 | 0 | } |
4591 | |
|
4592 | 0 | break; |
4593 | 0 | case REORDER_BUFFER_CHANGE_MESSAGE: |
4594 | 0 | { |
4595 | 0 | Size prefix_size; |
4596 | | |
4597 | | /* read prefix */ |
4598 | 0 | memcpy(&prefix_size, data, sizeof(Size)); |
4599 | 0 | data += sizeof(Size); |
4600 | 0 | change->data.msg.prefix = MemoryContextAlloc(rb->context, |
4601 | 0 | prefix_size); |
4602 | 0 | memcpy(change->data.msg.prefix, data, prefix_size); |
4603 | 0 | Assert(change->data.msg.prefix[prefix_size - 1] == '\0'); |
4604 | 0 | data += prefix_size; |
4605 | | |
4606 | | /* read the message */ |
4607 | 0 | memcpy(&change->data.msg.message_size, data, sizeof(Size)); |
4608 | 0 | data += sizeof(Size); |
4609 | 0 | change->data.msg.message = MemoryContextAlloc(rb->context, |
4610 | 0 | change->data.msg.message_size); |
4611 | 0 | memcpy(change->data.msg.message, data, |
4612 | 0 | change->data.msg.message_size); |
4613 | 0 | data += change->data.msg.message_size; |
4614 | |
|
4615 | 0 | break; |
4616 | 0 | } |
4617 | 0 | case REORDER_BUFFER_CHANGE_INVALIDATION: |
4618 | 0 | { |
4619 | 0 | Size inval_size = sizeof(SharedInvalidationMessage) * |
4620 | 0 | change->data.inval.ninvalidations; |
4621 | |
|
4622 | 0 | change->data.inval.invalidations = |
4623 | 0 | MemoryContextAlloc(rb->context, inval_size); |
4624 | | |
4625 | | /* read the message */ |
4626 | 0 | memcpy(change->data.inval.invalidations, data, inval_size); |
4627 | |
|
4628 | 0 | break; |
4629 | 0 | } |
4630 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT: |
4631 | 0 | { |
4632 | 0 | Snapshot oldsnap; |
4633 | 0 | Snapshot newsnap; |
4634 | 0 | Size size; |
4635 | |
|
4636 | 0 | oldsnap = (Snapshot) data; |
4637 | |
|
4638 | 0 | size = sizeof(SnapshotData) + |
4639 | 0 | sizeof(TransactionId) * oldsnap->xcnt + |
4640 | 0 | sizeof(TransactionId) * (oldsnap->subxcnt + 0); |
4641 | |
|
4642 | 0 | change->data.snapshot = MemoryContextAllocZero(rb->context, size); |
4643 | |
|
4644 | 0 | newsnap = change->data.snapshot; |
4645 | |
|
4646 | 0 | memcpy(newsnap, data, size); |
4647 | 0 | newsnap->xip = (TransactionId *) |
4648 | 0 | (((char *) newsnap) + sizeof(SnapshotData)); |
4649 | 0 | newsnap->subxip = newsnap->xip + newsnap->xcnt; |
4650 | 0 | newsnap->copied = true; |
4651 | 0 | break; |
4652 | 0 | } |
4653 | | /* the base struct contains all the data, easy peasy */ |
4654 | 0 | case REORDER_BUFFER_CHANGE_TRUNCATE: |
4655 | 0 | { |
4656 | 0 | Oid *relids; |
4657 | |
|
4658 | 0 | relids = ReorderBufferAllocRelids(rb, change->data.truncate.nrelids); |
4659 | 0 | memcpy(relids, data, change->data.truncate.nrelids * sizeof(Oid)); |
4660 | 0 | change->data.truncate.relids = relids; |
4661 | |
|
4662 | 0 | break; |
4663 | 0 | } |
4664 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM: |
4665 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_ABORT: |
4666 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID: |
4667 | 0 | case REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID: |
4668 | 0 | break; |
4669 | 0 | } |
4670 | | |
4671 | 0 | dlist_push_tail(&txn->changes, &change->node); |
4672 | 0 | txn->nentries_mem++; |
4673 | | |
4674 | | /* |
4675 | | * Update memory accounting for the restored change. We need to do this |
4676 | | * although we don't check the memory limit when restoring the changes in |
4677 | | * this branch (we only do that when initially queueing the changes after |
4678 | | * decoding), because we will release the changes later, and that will |
4679 | | * update the accounting too (subtracting the size from the counters). And |
4680 | | * we don't want to underflow there. |
4681 | | */ |
4682 | 0 | ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, |
4683 | 0 | ReorderBufferChangeSize(change)); |
4684 | 0 | } |
4685 | | |
4686 | | /* |
4687 | | * Remove all on-disk stored for the passed in transaction. |
4688 | | */ |
4689 | | static void |
4690 | | ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn) |
4691 | 0 | { |
4692 | 0 | XLogSegNo first; |
4693 | 0 | XLogSegNo cur; |
4694 | 0 | XLogSegNo last; |
4695 | |
|
4696 | 0 | Assert(txn->first_lsn != InvalidXLogRecPtr); |
4697 | 0 | Assert(txn->final_lsn != InvalidXLogRecPtr); |
4698 | |
|
4699 | 0 | XLByteToSeg(txn->first_lsn, first, wal_segment_size); |
4700 | 0 | XLByteToSeg(txn->final_lsn, last, wal_segment_size); |
4701 | | |
4702 | | /* iterate over all possible filenames, and delete them */ |
4703 | 0 | for (cur = first; cur <= last; cur++) |
4704 | 0 | { |
4705 | 0 | char path[MAXPGPATH]; |
4706 | |
|
4707 | 0 | ReorderBufferSerializedPath(path, MyReplicationSlot, txn->xid, cur); |
4708 | 0 | if (unlink(path) != 0 && errno != ENOENT) |
4709 | 0 | ereport(ERROR, |
4710 | 0 | (errcode_for_file_access(), |
4711 | 0 | errmsg("could not remove file \"%s\": %m", path))); |
4712 | 0 | } |
4713 | 0 | } |
4714 | | |
4715 | | /* |
4716 | | * Remove any leftover serialized reorder buffers from a slot directory after a |
4717 | | * prior crash or decoding session exit. |
4718 | | */ |
4719 | | static void |
4720 | | ReorderBufferCleanupSerializedTXNs(const char *slotname) |
4721 | 0 | { |
4722 | 0 | DIR *spill_dir; |
4723 | 0 | struct dirent *spill_de; |
4724 | 0 | struct stat statbuf; |
4725 | 0 | char path[MAXPGPATH * 2 + sizeof(PG_REPLSLOT_DIR)]; |
4726 | |
|
4727 | 0 | sprintf(path, "%s/%s", PG_REPLSLOT_DIR, slotname); |
4728 | | |
4729 | | /* we're only handling directories here, skip if it's not ours */ |
4730 | 0 | if (lstat(path, &statbuf) == 0 && !S_ISDIR(statbuf.st_mode)) |
4731 | 0 | return; |
4732 | | |
4733 | 0 | spill_dir = AllocateDir(path); |
4734 | 0 | while ((spill_de = ReadDirExtended(spill_dir, path, INFO)) != NULL) |
4735 | 0 | { |
4736 | | /* only look at names that can be ours */ |
4737 | 0 | if (strncmp(spill_de->d_name, "xid", 3) == 0) |
4738 | 0 | { |
4739 | 0 | snprintf(path, sizeof(path), |
4740 | 0 | "%s/%s/%s", PG_REPLSLOT_DIR, slotname, |
4741 | 0 | spill_de->d_name); |
4742 | |
|
4743 | 0 | if (unlink(path) != 0) |
4744 | 0 | ereport(ERROR, |
4745 | 0 | (errcode_for_file_access(), |
4746 | 0 | errmsg("could not remove file \"%s\" during removal of %s/%s/xid*: %m", |
4747 | 0 | path, PG_REPLSLOT_DIR, slotname))); |
4748 | 0 | } |
4749 | 0 | } |
4750 | 0 | FreeDir(spill_dir); |
4751 | 0 | } |
4752 | | |
4753 | | /* |
4754 | | * Given a replication slot, transaction ID and segment number, fill in the |
4755 | | * corresponding spill file into 'path', which is a caller-owned buffer of size |
4756 | | * at least MAXPGPATH. |
4757 | | */ |
4758 | | static void |
4759 | | ReorderBufferSerializedPath(char *path, ReplicationSlot *slot, TransactionId xid, |
4760 | | XLogSegNo segno) |
4761 | 0 | { |
4762 | 0 | XLogRecPtr recptr; |
4763 | |
|
4764 | 0 | XLogSegNoOffsetToRecPtr(segno, 0, wal_segment_size, recptr); |
4765 | |
|
4766 | 0 | snprintf(path, MAXPGPATH, "%s/%s/xid-%u-lsn-%X-%X.spill", |
4767 | 0 | PG_REPLSLOT_DIR, |
4768 | 0 | NameStr(MyReplicationSlot->data.name), |
4769 | 0 | xid, LSN_FORMAT_ARGS(recptr)); |
4770 | 0 | } |
4771 | | |
4772 | | /* |
4773 | | * Delete all data spilled to disk after we've restarted/crashed. It will be |
4774 | | * recreated when the respective slots are reused. |
4775 | | */ |
4776 | | void |
4777 | | StartupReorderBuffer(void) |
4778 | 0 | { |
4779 | 0 | DIR *logical_dir; |
4780 | 0 | struct dirent *logical_de; |
4781 | |
|
4782 | 0 | logical_dir = AllocateDir(PG_REPLSLOT_DIR); |
4783 | 0 | while ((logical_de = ReadDir(logical_dir, PG_REPLSLOT_DIR)) != NULL) |
4784 | 0 | { |
4785 | 0 | if (strcmp(logical_de->d_name, ".") == 0 || |
4786 | 0 | strcmp(logical_de->d_name, "..") == 0) |
4787 | 0 | continue; |
4788 | | |
4789 | | /* if it cannot be a slot, skip the directory */ |
4790 | 0 | if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2)) |
4791 | 0 | continue; |
4792 | | |
4793 | | /* |
4794 | | * ok, has to be a surviving logical slot, iterate and delete |
4795 | | * everything starting with xid-* |
4796 | | */ |
4797 | 0 | ReorderBufferCleanupSerializedTXNs(logical_de->d_name); |
4798 | 0 | } |
4799 | 0 | FreeDir(logical_dir); |
4800 | 0 | } |
4801 | | |
4802 | | /* --------------------------------------- |
4803 | | * toast reassembly support |
4804 | | * --------------------------------------- |
4805 | | */ |
4806 | | |
4807 | | /* |
4808 | | * Initialize per tuple toast reconstruction support. |
4809 | | */ |
4810 | | static void |
4811 | | ReorderBufferToastInitHash(ReorderBuffer *rb, ReorderBufferTXN *txn) |
4812 | 0 | { |
4813 | 0 | HASHCTL hash_ctl; |
4814 | |
|
4815 | 0 | Assert(txn->toast_hash == NULL); |
4816 | |
|
4817 | 0 | hash_ctl.keysize = sizeof(Oid); |
4818 | 0 | hash_ctl.entrysize = sizeof(ReorderBufferToastEnt); |
4819 | 0 | hash_ctl.hcxt = rb->context; |
4820 | 0 | txn->toast_hash = hash_create("ReorderBufferToastHash", 5, &hash_ctl, |
4821 | 0 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
4822 | 0 | } |
4823 | | |
4824 | | /* |
4825 | | * Per toast-chunk handling for toast reconstruction |
4826 | | * |
4827 | | * Appends a toast chunk so we can reconstruct it when the tuple "owning" the |
4828 | | * toasted Datum comes along. |
4829 | | */ |
4830 | | static void |
4831 | | ReorderBufferToastAppendChunk(ReorderBuffer *rb, ReorderBufferTXN *txn, |
4832 | | Relation relation, ReorderBufferChange *change) |
4833 | 0 | { |
4834 | 0 | ReorderBufferToastEnt *ent; |
4835 | 0 | HeapTuple newtup; |
4836 | 0 | bool found; |
4837 | 0 | int32 chunksize; |
4838 | 0 | bool isnull; |
4839 | 0 | Pointer chunk; |
4840 | 0 | TupleDesc desc = RelationGetDescr(relation); |
4841 | 0 | Oid chunk_id; |
4842 | 0 | int32 chunk_seq; |
4843 | |
|
4844 | 0 | if (txn->toast_hash == NULL) |
4845 | 0 | ReorderBufferToastInitHash(rb, txn); |
4846 | |
|
4847 | 0 | Assert(IsToastRelation(relation)); |
4848 | |
|
4849 | 0 | newtup = change->data.tp.newtuple; |
4850 | 0 | chunk_id = DatumGetObjectId(fastgetattr(newtup, 1, desc, &isnull)); |
4851 | 0 | Assert(!isnull); |
4852 | 0 | chunk_seq = DatumGetInt32(fastgetattr(newtup, 2, desc, &isnull)); |
4853 | 0 | Assert(!isnull); |
4854 | |
|
4855 | 0 | ent = (ReorderBufferToastEnt *) |
4856 | 0 | hash_search(txn->toast_hash, &chunk_id, HASH_ENTER, &found); |
4857 | |
|
4858 | 0 | if (!found) |
4859 | 0 | { |
4860 | 0 | Assert(ent->chunk_id == chunk_id); |
4861 | 0 | ent->num_chunks = 0; |
4862 | 0 | ent->last_chunk_seq = 0; |
4863 | 0 | ent->size = 0; |
4864 | 0 | ent->reconstructed = NULL; |
4865 | 0 | dlist_init(&ent->chunks); |
4866 | |
|
4867 | 0 | if (chunk_seq != 0) |
4868 | 0 | elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq 0", |
4869 | 0 | chunk_seq, chunk_id); |
4870 | 0 | } |
4871 | 0 | else if (found && chunk_seq != ent->last_chunk_seq + 1) |
4872 | 0 | elog(ERROR, "got sequence entry %d for toast chunk %u instead of seq %d", |
4873 | 0 | chunk_seq, chunk_id, ent->last_chunk_seq + 1); |
4874 | | |
4875 | 0 | chunk = DatumGetPointer(fastgetattr(newtup, 3, desc, &isnull)); |
4876 | 0 | Assert(!isnull); |
4877 | | |
4878 | | /* calculate size so we can allocate the right size at once later */ |
4879 | 0 | if (!VARATT_IS_EXTENDED(chunk)) |
4880 | 0 | chunksize = VARSIZE(chunk) - VARHDRSZ; |
4881 | 0 | else if (VARATT_IS_SHORT(chunk)) |
4882 | | /* could happen due to heap_form_tuple doing its thing */ |
4883 | 0 | chunksize = VARSIZE_SHORT(chunk) - VARHDRSZ_SHORT; |
4884 | 0 | else |
4885 | 0 | elog(ERROR, "unexpected type of toast chunk"); |
4886 | | |
4887 | 0 | ent->size += chunksize; |
4888 | 0 | ent->last_chunk_seq = chunk_seq; |
4889 | 0 | ent->num_chunks++; |
4890 | 0 | dlist_push_tail(&ent->chunks, &change->node); |
4891 | 0 | } |
4892 | | |
4893 | | /* |
4894 | | * Rejigger change->newtuple to point to in-memory toast tuples instead of |
4895 | | * on-disk toast tuples that may no longer exist (think DROP TABLE or VACUUM). |
4896 | | * |
4897 | | * We cannot replace unchanged toast tuples though, so those will still point |
4898 | | * to on-disk toast data. |
4899 | | * |
4900 | | * While updating the existing change with detoasted tuple data, we need to |
4901 | | * update the memory accounting info, because the change size will differ. |
4902 | | * Otherwise the accounting may get out of sync, triggering serialization |
4903 | | * at unexpected times. |
4904 | | * |
4905 | | * We simply subtract size of the change before rejiggering the tuple, and |
4906 | | * then add the new size. This makes it look like the change was removed |
4907 | | * and then added back, except it only tweaks the accounting info. |
4908 | | * |
4909 | | * In particular it can't trigger serialization, which would be pointless |
4910 | | * anyway as it happens during commit processing right before handing |
4911 | | * the change to the output plugin. |
4912 | | */ |
4913 | | static void |
4914 | | ReorderBufferToastReplace(ReorderBuffer *rb, ReorderBufferTXN *txn, |
4915 | | Relation relation, ReorderBufferChange *change) |
4916 | 0 | { |
4917 | 0 | TupleDesc desc; |
4918 | 0 | int natt; |
4919 | 0 | Datum *attrs; |
4920 | 0 | bool *isnull; |
4921 | 0 | bool *free; |
4922 | 0 | HeapTuple tmphtup; |
4923 | 0 | Relation toast_rel; |
4924 | 0 | TupleDesc toast_desc; |
4925 | 0 | MemoryContext oldcontext; |
4926 | 0 | HeapTuple newtup; |
4927 | 0 | Size old_size; |
4928 | | |
4929 | | /* no toast tuples changed */ |
4930 | 0 | if (txn->toast_hash == NULL) |
4931 | 0 | return; |
4932 | | |
4933 | | /* |
4934 | | * We're going to modify the size of the change. So, to make sure the |
4935 | | * accounting is correct we record the current change size and then after |
4936 | | * re-computing the change we'll subtract the recorded size and then |
4937 | | * re-add the new change size at the end. We don't immediately subtract |
4938 | | * the old size because if there is any error before we add the new size, |
4939 | | * we will release the changes and that will update the accounting info |
4940 | | * (subtracting the size from the counters). And we don't want to |
4941 | | * underflow there. |
4942 | | */ |
4943 | 0 | old_size = ReorderBufferChangeSize(change); |
4944 | |
|
4945 | 0 | oldcontext = MemoryContextSwitchTo(rb->context); |
4946 | | |
4947 | | /* we should only have toast tuples in an INSERT or UPDATE */ |
4948 | 0 | Assert(change->data.tp.newtuple); |
4949 | |
|
4950 | 0 | desc = RelationGetDescr(relation); |
4951 | |
|
4952 | 0 | toast_rel = RelationIdGetRelation(relation->rd_rel->reltoastrelid); |
4953 | 0 | if (!RelationIsValid(toast_rel)) |
4954 | 0 | elog(ERROR, "could not open toast relation with OID %u (base relation \"%s\")", |
4955 | 0 | relation->rd_rel->reltoastrelid, RelationGetRelationName(relation)); |
4956 | | |
4957 | 0 | toast_desc = RelationGetDescr(toast_rel); |
4958 | | |
4959 | | /* should we allocate from stack instead? */ |
4960 | 0 | attrs = palloc0(sizeof(Datum) * desc->natts); |
4961 | 0 | isnull = palloc0(sizeof(bool) * desc->natts); |
4962 | 0 | free = palloc0(sizeof(bool) * desc->natts); |
4963 | |
|
4964 | 0 | newtup = change->data.tp.newtuple; |
4965 | |
|
4966 | 0 | heap_deform_tuple(newtup, desc, attrs, isnull); |
4967 | |
|
4968 | 0 | for (natt = 0; natt < desc->natts; natt++) |
4969 | 0 | { |
4970 | 0 | Form_pg_attribute attr = TupleDescAttr(desc, natt); |
4971 | 0 | ReorderBufferToastEnt *ent; |
4972 | 0 | struct varlena *varlena; |
4973 | | |
4974 | | /* va_rawsize is the size of the original datum -- including header */ |
4975 | 0 | struct varatt_external toast_pointer; |
4976 | 0 | struct varatt_indirect redirect_pointer; |
4977 | 0 | struct varlena *new_datum = NULL; |
4978 | 0 | struct varlena *reconstructed; |
4979 | 0 | dlist_iter it; |
4980 | 0 | Size data_done = 0; |
4981 | | |
4982 | | /* system columns aren't toasted */ |
4983 | 0 | if (attr->attnum < 0) |
4984 | 0 | continue; |
4985 | | |
4986 | 0 | if (attr->attisdropped) |
4987 | 0 | continue; |
4988 | | |
4989 | | /* not a varlena datatype */ |
4990 | 0 | if (attr->attlen != -1) |
4991 | 0 | continue; |
4992 | | |
4993 | | /* no data */ |
4994 | 0 | if (isnull[natt]) |
4995 | 0 | continue; |
4996 | | |
4997 | | /* ok, we know we have a toast datum */ |
4998 | 0 | varlena = (struct varlena *) DatumGetPointer(attrs[natt]); |
4999 | | |
5000 | | /* no need to do anything if the tuple isn't external */ |
5001 | 0 | if (!VARATT_IS_EXTERNAL(varlena)) |
5002 | 0 | continue; |
5003 | | |
5004 | 0 | VARATT_EXTERNAL_GET_POINTER(toast_pointer, varlena); |
5005 | | |
5006 | | /* |
5007 | | * Check whether the toast tuple changed, replace if so. |
5008 | | */ |
5009 | 0 | ent = (ReorderBufferToastEnt *) |
5010 | 0 | hash_search(txn->toast_hash, |
5011 | 0 | &toast_pointer.va_valueid, |
5012 | 0 | HASH_FIND, |
5013 | 0 | NULL); |
5014 | 0 | if (ent == NULL) |
5015 | 0 | continue; |
5016 | | |
5017 | 0 | new_datum = |
5018 | 0 | (struct varlena *) palloc0(INDIRECT_POINTER_SIZE); |
5019 | |
|
5020 | 0 | free[natt] = true; |
5021 | |
|
5022 | 0 | reconstructed = palloc0(toast_pointer.va_rawsize); |
5023 | |
|
5024 | 0 | ent->reconstructed = reconstructed; |
5025 | | |
5026 | | /* stitch toast tuple back together from its parts */ |
5027 | 0 | dlist_foreach(it, &ent->chunks) |
5028 | 0 | { |
5029 | 0 | bool cisnull; |
5030 | 0 | ReorderBufferChange *cchange; |
5031 | 0 | HeapTuple ctup; |
5032 | 0 | Pointer chunk; |
5033 | |
|
5034 | 0 | cchange = dlist_container(ReorderBufferChange, node, it.cur); |
5035 | 0 | ctup = cchange->data.tp.newtuple; |
5036 | 0 | chunk = DatumGetPointer(fastgetattr(ctup, 3, toast_desc, &cisnull)); |
5037 | |
|
5038 | 0 | Assert(!cisnull); |
5039 | 0 | Assert(!VARATT_IS_EXTERNAL(chunk)); |
5040 | 0 | Assert(!VARATT_IS_SHORT(chunk)); |
5041 | |
|
5042 | 0 | memcpy(VARDATA(reconstructed) + data_done, |
5043 | 0 | VARDATA(chunk), |
5044 | 0 | VARSIZE(chunk) - VARHDRSZ); |
5045 | 0 | data_done += VARSIZE(chunk) - VARHDRSZ; |
5046 | 0 | } |
5047 | 0 | Assert(data_done == VARATT_EXTERNAL_GET_EXTSIZE(toast_pointer)); |
5048 | | |
5049 | | /* make sure its marked as compressed or not */ |
5050 | 0 | if (VARATT_EXTERNAL_IS_COMPRESSED(toast_pointer)) |
5051 | 0 | SET_VARSIZE_COMPRESSED(reconstructed, data_done + VARHDRSZ); |
5052 | 0 | else |
5053 | 0 | SET_VARSIZE(reconstructed, data_done + VARHDRSZ); |
5054 | |
|
5055 | 0 | memset(&redirect_pointer, 0, sizeof(redirect_pointer)); |
5056 | 0 | redirect_pointer.pointer = reconstructed; |
5057 | |
|
5058 | 0 | SET_VARTAG_EXTERNAL(new_datum, VARTAG_INDIRECT); |
5059 | 0 | memcpy(VARDATA_EXTERNAL(new_datum), &redirect_pointer, |
5060 | 0 | sizeof(redirect_pointer)); |
5061 | |
|
5062 | 0 | attrs[natt] = PointerGetDatum(new_datum); |
5063 | 0 | } |
5064 | | |
5065 | | /* |
5066 | | * Build tuple in separate memory & copy tuple back into the tuplebuf |
5067 | | * passed to the output plugin. We can't directly heap_fill_tuple() into |
5068 | | * the tuplebuf because attrs[] will point back into the current content. |
5069 | | */ |
5070 | 0 | tmphtup = heap_form_tuple(desc, attrs, isnull); |
5071 | 0 | Assert(newtup->t_len <= MaxHeapTupleSize); |
5072 | 0 | Assert(newtup->t_data == (HeapTupleHeader) ((char *) newtup + HEAPTUPLESIZE)); |
5073 | |
|
5074 | 0 | memcpy(newtup->t_data, tmphtup->t_data, tmphtup->t_len); |
5075 | 0 | newtup->t_len = tmphtup->t_len; |
5076 | | |
5077 | | /* |
5078 | | * free resources we won't further need, more persistent stuff will be |
5079 | | * free'd in ReorderBufferToastReset(). |
5080 | | */ |
5081 | 0 | RelationClose(toast_rel); |
5082 | 0 | pfree(tmphtup); |
5083 | 0 | for (natt = 0; natt < desc->natts; natt++) |
5084 | 0 | { |
5085 | 0 | if (free[natt]) |
5086 | 0 | pfree(DatumGetPointer(attrs[natt])); |
5087 | 0 | } |
5088 | 0 | pfree(attrs); |
5089 | 0 | pfree(free); |
5090 | 0 | pfree(isnull); |
5091 | |
|
5092 | 0 | MemoryContextSwitchTo(oldcontext); |
5093 | | |
5094 | | /* subtract the old change size */ |
5095 | 0 | ReorderBufferChangeMemoryUpdate(rb, change, NULL, false, old_size); |
5096 | | /* now add the change back, with the correct size */ |
5097 | 0 | ReorderBufferChangeMemoryUpdate(rb, change, NULL, true, |
5098 | 0 | ReorderBufferChangeSize(change)); |
5099 | 0 | } |
5100 | | |
5101 | | /* |
5102 | | * Free all resources allocated for toast reconstruction. |
5103 | | */ |
5104 | | static void |
5105 | | ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn) |
5106 | 0 | { |
5107 | 0 | HASH_SEQ_STATUS hstat; |
5108 | 0 | ReorderBufferToastEnt *ent; |
5109 | |
|
5110 | 0 | if (txn->toast_hash == NULL) |
5111 | 0 | return; |
5112 | | |
5113 | | /* sequentially walk over the hash and free everything */ |
5114 | 0 | hash_seq_init(&hstat, txn->toast_hash); |
5115 | 0 | while ((ent = (ReorderBufferToastEnt *) hash_seq_search(&hstat)) != NULL) |
5116 | 0 | { |
5117 | 0 | dlist_mutable_iter it; |
5118 | |
|
5119 | 0 | if (ent->reconstructed != NULL) |
5120 | 0 | pfree(ent->reconstructed); |
5121 | |
|
5122 | 0 | dlist_foreach_modify(it, &ent->chunks) |
5123 | 0 | { |
5124 | 0 | ReorderBufferChange *change = |
5125 | 0 | dlist_container(ReorderBufferChange, node, it.cur); |
5126 | |
|
5127 | 0 | dlist_delete(&change->node); |
5128 | 0 | ReorderBufferFreeChange(rb, change, true); |
5129 | 0 | } |
5130 | 0 | } |
5131 | |
|
5132 | 0 | hash_destroy(txn->toast_hash); |
5133 | 0 | txn->toast_hash = NULL; |
5134 | 0 | } |
5135 | | |
5136 | | |
5137 | | /* --------------------------------------- |
5138 | | * Visibility support for logical decoding |
5139 | | * |
5140 | | * |
5141 | | * Lookup actual cmin/cmax values when using decoding snapshot. We can't |
5142 | | * always rely on stored cmin/cmax values because of two scenarios: |
5143 | | * |
5144 | | * * A tuple got changed multiple times during a single transaction and thus |
5145 | | * has got a combo CID. Combo CIDs are only valid for the duration of a |
5146 | | * single transaction. |
5147 | | * * A tuple with a cmin but no cmax (and thus no combo CID) got |
5148 | | * deleted/updated in another transaction than the one which created it |
5149 | | * which we are looking at right now. As only one of cmin, cmax or combo CID |
5150 | | * is actually stored in the heap we don't have access to the value we |
5151 | | * need anymore. |
5152 | | * |
5153 | | * To resolve those problems we have a per-transaction hash of (cmin, |
5154 | | * cmax) tuples keyed by (relfilelocator, ctid) which contains the actual |
5155 | | * (cmin, cmax) values. That also takes care of combo CIDs by simply |
5156 | | * not caring about them at all. As we have the real cmin/cmax values |
5157 | | * combo CIDs aren't interesting. |
5158 | | * |
5159 | | * As we only care about catalog tuples here the overhead of this |
5160 | | * hashtable should be acceptable. |
5161 | | * |
5162 | | * Heap rewrites complicate this a bit, check rewriteheap.c for |
5163 | | * details. |
5164 | | * ------------------------------------------------------------------------- |
5165 | | */ |
5166 | | |
5167 | | /* struct for sorting mapping files by LSN efficiently */ |
5168 | | typedef struct RewriteMappingFile |
5169 | | { |
5170 | | XLogRecPtr lsn; |
5171 | | char fname[MAXPGPATH]; |
5172 | | } RewriteMappingFile; |
5173 | | |
5174 | | #ifdef NOT_USED |
5175 | | static void |
5176 | | DisplayMapping(HTAB *tuplecid_data) |
5177 | | { |
5178 | | HASH_SEQ_STATUS hstat; |
5179 | | ReorderBufferTupleCidEnt *ent; |
5180 | | |
5181 | | hash_seq_init(&hstat, tuplecid_data); |
5182 | | while ((ent = (ReorderBufferTupleCidEnt *) hash_seq_search(&hstat)) != NULL) |
5183 | | { |
5184 | | elog(DEBUG3, "mapping: node: %u/%u/%u tid: %u/%u cmin: %u, cmax: %u", |
5185 | | ent->key.rlocator.dbOid, |
5186 | | ent->key.rlocator.spcOid, |
5187 | | ent->key.rlocator.relNumber, |
5188 | | ItemPointerGetBlockNumber(&ent->key.tid), |
5189 | | ItemPointerGetOffsetNumber(&ent->key.tid), |
5190 | | ent->cmin, |
5191 | | ent->cmax |
5192 | | ); |
5193 | | } |
5194 | | } |
5195 | | #endif |
5196 | | |
5197 | | /* |
5198 | | * Apply a single mapping file to tuplecid_data. |
5199 | | * |
5200 | | * The mapping file has to have been verified to be a) committed b) for our |
5201 | | * transaction c) applied in LSN order. |
5202 | | */ |
5203 | | static void |
5204 | | ApplyLogicalMappingFile(HTAB *tuplecid_data, Oid relid, const char *fname) |
5205 | 0 | { |
5206 | 0 | char path[MAXPGPATH]; |
5207 | 0 | int fd; |
5208 | 0 | int readBytes; |
5209 | 0 | LogicalRewriteMappingData map; |
5210 | |
|
5211 | 0 | sprintf(path, "%s/%s", PG_LOGICAL_MAPPINGS_DIR, fname); |
5212 | 0 | fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); |
5213 | 0 | if (fd < 0) |
5214 | 0 | ereport(ERROR, |
5215 | 0 | (errcode_for_file_access(), |
5216 | 0 | errmsg("could not open file \"%s\": %m", path))); |
5217 | | |
5218 | 0 | while (true) |
5219 | 0 | { |
5220 | 0 | ReorderBufferTupleCidKey key; |
5221 | 0 | ReorderBufferTupleCidEnt *ent; |
5222 | 0 | ReorderBufferTupleCidEnt *new_ent; |
5223 | 0 | bool found; |
5224 | | |
5225 | | /* be careful about padding */ |
5226 | 0 | memset(&key, 0, sizeof(ReorderBufferTupleCidKey)); |
5227 | | |
5228 | | /* read all mappings till the end of the file */ |
5229 | 0 | pgstat_report_wait_start(WAIT_EVENT_REORDER_LOGICAL_MAPPING_READ); |
5230 | 0 | readBytes = read(fd, &map, sizeof(LogicalRewriteMappingData)); |
5231 | 0 | pgstat_report_wait_end(); |
5232 | |
|
5233 | 0 | if (readBytes < 0) |
5234 | 0 | ereport(ERROR, |
5235 | 0 | (errcode_for_file_access(), |
5236 | 0 | errmsg("could not read file \"%s\": %m", |
5237 | 0 | path))); |
5238 | 0 | else if (readBytes == 0) /* EOF */ |
5239 | 0 | break; |
5240 | 0 | else if (readBytes != sizeof(LogicalRewriteMappingData)) |
5241 | 0 | ereport(ERROR, |
5242 | 0 | (errcode_for_file_access(), |
5243 | 0 | errmsg("could not read from file \"%s\": read %d instead of %d bytes", |
5244 | 0 | path, readBytes, |
5245 | 0 | (int32) sizeof(LogicalRewriteMappingData)))); |
5246 | | |
5247 | 0 | key.rlocator = map.old_locator; |
5248 | 0 | ItemPointerCopy(&map.old_tid, |
5249 | 0 | &key.tid); |
5250 | | |
5251 | |
|
5252 | 0 | ent = (ReorderBufferTupleCidEnt *) |
5253 | 0 | hash_search(tuplecid_data, &key, HASH_FIND, NULL); |
5254 | | |
5255 | | /* no existing mapping, no need to update */ |
5256 | 0 | if (!ent) |
5257 | 0 | continue; |
5258 | | |
5259 | 0 | key.rlocator = map.new_locator; |
5260 | 0 | ItemPointerCopy(&map.new_tid, |
5261 | 0 | &key.tid); |
5262 | |
|
5263 | 0 | new_ent = (ReorderBufferTupleCidEnt *) |
5264 | 0 | hash_search(tuplecid_data, &key, HASH_ENTER, &found); |
5265 | |
|
5266 | 0 | if (found) |
5267 | 0 | { |
5268 | | /* |
5269 | | * Make sure the existing mapping makes sense. We sometime update |
5270 | | * old records that did not yet have a cmax (e.g. pg_class' own |
5271 | | * entry while rewriting it) during rewrites, so allow that. |
5272 | | */ |
5273 | 0 | Assert(ent->cmin == InvalidCommandId || ent->cmin == new_ent->cmin); |
5274 | 0 | Assert(ent->cmax == InvalidCommandId || ent->cmax == new_ent->cmax); |
5275 | 0 | } |
5276 | 0 | else |
5277 | 0 | { |
5278 | | /* update mapping */ |
5279 | 0 | new_ent->cmin = ent->cmin; |
5280 | 0 | new_ent->cmax = ent->cmax; |
5281 | 0 | new_ent->combocid = ent->combocid; |
5282 | 0 | } |
5283 | 0 | } |
5284 | | |
5285 | 0 | if (CloseTransientFile(fd) != 0) |
5286 | 0 | ereport(ERROR, |
5287 | 0 | (errcode_for_file_access(), |
5288 | 0 | errmsg("could not close file \"%s\": %m", path))); |
5289 | 0 | } |
5290 | | |
5291 | | |
5292 | | /* |
5293 | | * Check whether the TransactionId 'xid' is in the pre-sorted array 'xip'. |
5294 | | */ |
5295 | | static bool |
5296 | | TransactionIdInArray(TransactionId xid, TransactionId *xip, Size num) |
5297 | 0 | { |
5298 | 0 | return bsearch(&xid, xip, num, |
5299 | 0 | sizeof(TransactionId), xidComparator) != NULL; |
5300 | 0 | } |
5301 | | |
5302 | | /* |
5303 | | * list_sort() comparator for sorting RewriteMappingFiles in LSN order. |
5304 | | */ |
5305 | | static int |
5306 | | file_sort_by_lsn(const ListCell *a_p, const ListCell *b_p) |
5307 | 0 | { |
5308 | 0 | RewriteMappingFile *a = (RewriteMappingFile *) lfirst(a_p); |
5309 | 0 | RewriteMappingFile *b = (RewriteMappingFile *) lfirst(b_p); |
5310 | |
|
5311 | 0 | return pg_cmp_u64(a->lsn, b->lsn); |
5312 | 0 | } |
5313 | | |
5314 | | /* |
5315 | | * Apply any existing logical remapping files if there are any targeted at our |
5316 | | * transaction for relid. |
5317 | | */ |
5318 | | static void |
5319 | | UpdateLogicalMappings(HTAB *tuplecid_data, Oid relid, Snapshot snapshot) |
5320 | 0 | { |
5321 | 0 | DIR *mapping_dir; |
5322 | 0 | struct dirent *mapping_de; |
5323 | 0 | List *files = NIL; |
5324 | 0 | ListCell *file; |
5325 | 0 | Oid dboid = IsSharedRelation(relid) ? InvalidOid : MyDatabaseId; |
5326 | |
|
5327 | 0 | mapping_dir = AllocateDir(PG_LOGICAL_MAPPINGS_DIR); |
5328 | 0 | while ((mapping_de = ReadDir(mapping_dir, PG_LOGICAL_MAPPINGS_DIR)) != NULL) |
5329 | 0 | { |
5330 | 0 | Oid f_dboid; |
5331 | 0 | Oid f_relid; |
5332 | 0 | TransactionId f_mapped_xid; |
5333 | 0 | TransactionId f_create_xid; |
5334 | 0 | XLogRecPtr f_lsn; |
5335 | 0 | uint32 f_hi, |
5336 | 0 | f_lo; |
5337 | 0 | RewriteMappingFile *f; |
5338 | |
|
5339 | 0 | if (strcmp(mapping_de->d_name, ".") == 0 || |
5340 | 0 | strcmp(mapping_de->d_name, "..") == 0) |
5341 | 0 | continue; |
5342 | | |
5343 | | /* Ignore files that aren't ours */ |
5344 | 0 | if (strncmp(mapping_de->d_name, "map-", 4) != 0) |
5345 | 0 | continue; |
5346 | | |
5347 | 0 | if (sscanf(mapping_de->d_name, LOGICAL_REWRITE_FORMAT, |
5348 | 0 | &f_dboid, &f_relid, &f_hi, &f_lo, |
5349 | 0 | &f_mapped_xid, &f_create_xid) != 6) |
5350 | 0 | elog(ERROR, "could not parse filename \"%s\"", mapping_de->d_name); |
5351 | | |
5352 | 0 | f_lsn = ((uint64) f_hi) << 32 | f_lo; |
5353 | | |
5354 | | /* mapping for another database */ |
5355 | 0 | if (f_dboid != dboid) |
5356 | 0 | continue; |
5357 | | |
5358 | | /* mapping for another relation */ |
5359 | 0 | if (f_relid != relid) |
5360 | 0 | continue; |
5361 | | |
5362 | | /* did the creating transaction abort? */ |
5363 | 0 | if (!TransactionIdDidCommit(f_create_xid)) |
5364 | 0 | continue; |
5365 | | |
5366 | | /* not for our transaction */ |
5367 | 0 | if (!TransactionIdInArray(f_mapped_xid, snapshot->subxip, snapshot->subxcnt)) |
5368 | 0 | continue; |
5369 | | |
5370 | | /* ok, relevant, queue for apply */ |
5371 | 0 | f = palloc(sizeof(RewriteMappingFile)); |
5372 | 0 | f->lsn = f_lsn; |
5373 | 0 | strcpy(f->fname, mapping_de->d_name); |
5374 | 0 | files = lappend(files, f); |
5375 | 0 | } |
5376 | 0 | FreeDir(mapping_dir); |
5377 | | |
5378 | | /* sort files so we apply them in LSN order */ |
5379 | 0 | list_sort(files, file_sort_by_lsn); |
5380 | |
|
5381 | 0 | foreach(file, files) |
5382 | 0 | { |
5383 | 0 | RewriteMappingFile *f = (RewriteMappingFile *) lfirst(file); |
5384 | |
|
5385 | 0 | elog(DEBUG1, "applying mapping: \"%s\" in %u", f->fname, |
5386 | 0 | snapshot->subxip[0]); |
5387 | 0 | ApplyLogicalMappingFile(tuplecid_data, relid, f->fname); |
5388 | 0 | pfree(f); |
5389 | 0 | } |
5390 | 0 | } |
5391 | | |
5392 | | /* |
5393 | | * Lookup cmin/cmax of a tuple, during logical decoding where we can't rely on |
5394 | | * combo CIDs. |
5395 | | */ |
5396 | | bool |
5397 | | ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data, |
5398 | | Snapshot snapshot, |
5399 | | HeapTuple htup, Buffer buffer, |
5400 | | CommandId *cmin, CommandId *cmax) |
5401 | 0 | { |
5402 | 0 | ReorderBufferTupleCidKey key; |
5403 | 0 | ReorderBufferTupleCidEnt *ent; |
5404 | 0 | ForkNumber forkno; |
5405 | 0 | BlockNumber blockno; |
5406 | 0 | bool updated_mapping = false; |
5407 | | |
5408 | | /* |
5409 | | * Return unresolved if tuplecid_data is not valid. That's because when |
5410 | | * streaming in-progress transactions we may run into tuples with the CID |
5411 | | * before actually decoding them. Think e.g. about INSERT followed by |
5412 | | * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the |
5413 | | * INSERT. So in such cases, we assume the CID is from the future |
5414 | | * command. |
5415 | | */ |
5416 | 0 | if (tuplecid_data == NULL) |
5417 | 0 | return false; |
5418 | | |
5419 | | /* be careful about padding */ |
5420 | 0 | memset(&key, 0, sizeof(key)); |
5421 | |
|
5422 | 0 | Assert(!BufferIsLocal(buffer)); |
5423 | | |
5424 | | /* |
5425 | | * get relfilelocator from the buffer, no convenient way to access it |
5426 | | * other than that. |
5427 | | */ |
5428 | 0 | BufferGetTag(buffer, &key.rlocator, &forkno, &blockno); |
5429 | | |
5430 | | /* tuples can only be in the main fork */ |
5431 | 0 | Assert(forkno == MAIN_FORKNUM); |
5432 | 0 | Assert(blockno == ItemPointerGetBlockNumber(&htup->t_self)); |
5433 | |
|
5434 | 0 | ItemPointerCopy(&htup->t_self, |
5435 | 0 | &key.tid); |
5436 | |
|
5437 | 0 | restart: |
5438 | 0 | ent = (ReorderBufferTupleCidEnt *) |
5439 | 0 | hash_search(tuplecid_data, &key, HASH_FIND, NULL); |
5440 | | |
5441 | | /* |
5442 | | * failed to find a mapping, check whether the table was rewritten and |
5443 | | * apply mapping if so, but only do that once - there can be no new |
5444 | | * mappings while we are in here since we have to hold a lock on the |
5445 | | * relation. |
5446 | | */ |
5447 | 0 | if (ent == NULL && !updated_mapping) |
5448 | 0 | { |
5449 | 0 | UpdateLogicalMappings(tuplecid_data, htup->t_tableOid, snapshot); |
5450 | | /* now check but don't update for a mapping again */ |
5451 | 0 | updated_mapping = true; |
5452 | 0 | goto restart; |
5453 | 0 | } |
5454 | 0 | else if (ent == NULL) |
5455 | 0 | return false; |
5456 | | |
5457 | 0 | if (cmin) |
5458 | 0 | *cmin = ent->cmin; |
5459 | 0 | if (cmax) |
5460 | 0 | *cmax = ent->cmax; |
5461 | 0 | return true; |
5462 | 0 | } |
5463 | | |
5464 | | /* |
5465 | | * Count invalidation messages of specified transaction. |
5466 | | * |
5467 | | * Returns number of messages, and msgs is set to the pointer of the linked |
5468 | | * list for the messages. |
5469 | | */ |
5470 | | uint32 |
5471 | | ReorderBufferGetInvalidations(ReorderBuffer *rb, TransactionId xid, |
5472 | | SharedInvalidationMessage **msgs) |
5473 | 0 | { |
5474 | 0 | ReorderBufferTXN *txn; |
5475 | |
|
5476 | 0 | txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr, |
5477 | 0 | false); |
5478 | |
|
5479 | 0 | if (txn == NULL) |
5480 | 0 | return 0; |
5481 | | |
5482 | 0 | *msgs = txn->invalidations; |
5483 | |
|
5484 | 0 | return txn->ninvalidations; |
5485 | 0 | } |