/src/postgres/src/backend/replication/logical/snapbuild.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * snapbuild.c |
4 | | * |
5 | | * Infrastructure for building historic catalog snapshots based on contents |
6 | | * of the WAL, for the purpose of decoding heapam.c style values in the |
7 | | * WAL. |
8 | | * |
9 | | * NOTES: |
10 | | * |
11 | | * We build snapshots which can *only* be used to read catalog contents and we |
12 | | * do so by reading and interpreting the WAL stream. The aim is to build a |
13 | | * snapshot that behaves the same as a freshly taken MVCC snapshot would have |
14 | | * at the time the XLogRecord was generated. |
15 | | * |
16 | | * To build the snapshots we reuse the infrastructure built for Hot |
17 | | * Standby. The in-memory snapshots we build look different than HS' because |
18 | | * we have different needs. To successfully decode data from the WAL we only |
19 | | * need to access catalog tables and (sys|rel|cat)cache, not the actual user |
20 | | * tables since the data we decode is wholly contained in the WAL |
21 | | * records. Also, our snapshots need to be different in comparison to normal |
22 | | * MVCC ones because in contrast to those we cannot fully rely on the clog and |
23 | | * pg_subtrans for information about committed transactions because they might |
24 | | * commit in the future from the POV of the WAL entry we're currently |
25 | | * decoding. This definition has the advantage that we only need to prevent |
26 | | * removal of catalog rows, while normal table's rows can still be |
27 | | * removed. This is achieved by using the replication slot mechanism. |
28 | | * |
29 | | * As the percentage of transactions modifying the catalog normally is fairly |
30 | | * small in comparisons to ones only manipulating user data, we keep track of |
31 | | * the committed catalog modifying ones inside [xmin, xmax) instead of keeping |
32 | | * track of all running transactions like it's done in a normal snapshot. Note |
33 | | * that we're generally only looking at transactions that have acquired an |
34 | | * xid. That is we keep a list of transactions between snapshot->(xmin, xmax) |
35 | | * that we consider committed, everything else is considered aborted/in |
36 | | * progress. That also allows us not to care about subtransactions before they |
37 | | * have committed which means this module, in contrast to HS, doesn't have to |
38 | | * care about suboverflowed subtransactions and similar. |
39 | | * |
40 | | * One complexity of doing this is that to e.g. handle mixed DDL/DML |
41 | | * transactions we need Snapshots that see intermediate versions of the |
42 | | * catalog in a transaction. During normal operation this is achieved by using |
43 | | * CommandIds/cmin/cmax. The problem with that however is that for space |
44 | | * efficiency reasons, the cmin and cmax are not included in WAL records. We |
45 | | * cannot read the cmin/cmax from the tuple itself, either, because it is |
46 | | * reset on crash recovery. Even if we could, we could not decode combocids |
47 | | * which are only tracked in the original backend's memory. To work around |
48 | | * that, heapam writes an extra WAL record (XLOG_HEAP2_NEW_CID) every time a |
49 | | * catalog row is modified, which includes the cmin and cmax of the |
50 | | * tuple. During decoding, we insert the ctid->(cmin,cmax) mappings into the |
51 | | * reorder buffer, and use them at visibility checks instead of the cmin/cmax |
52 | | * on the tuple itself. Check the reorderbuffer.c's comment above |
53 | | * ResolveCminCmaxDuringDecoding() for details. |
54 | | * |
55 | | * To facilitate all this we need our own visibility routine, as the normal |
56 | | * ones are optimized for different usecases. |
57 | | * |
58 | | * To replace the normal catalog snapshots with decoding ones use the |
59 | | * SetupHistoricSnapshot() and TeardownHistoricSnapshot() functions. |
60 | | * |
61 | | * |
62 | | * |
63 | | * The snapbuild machinery is starting up in several stages, as illustrated |
64 | | * by the following graph describing the SnapBuild->state transitions: |
65 | | * |
66 | | * +-------------------------+ |
67 | | * +----| START |-------------+ |
68 | | * | +-------------------------+ | |
69 | | * | | | |
70 | | * | | | |
71 | | * | running_xacts #1 | |
72 | | * | | | |
73 | | * | | | |
74 | | * | v | |
75 | | * | +-------------------------+ v |
76 | | * | | BUILDING_SNAPSHOT |------------>| |
77 | | * | +-------------------------+ | |
78 | | * | | | |
79 | | * | | | |
80 | | * | running_xacts #2, xacts from #1 finished | |
81 | | * | | | |
82 | | * | | | |
83 | | * | v | |
84 | | * | +-------------------------+ v |
85 | | * | | FULL_SNAPSHOT |------------>| |
86 | | * | +-------------------------+ | |
87 | | * | | | |
88 | | * running_xacts | saved snapshot |
89 | | * with zero xacts | at running_xacts's lsn |
90 | | * | | | |
91 | | * | running_xacts with xacts from #2 finished | |
92 | | * | | | |
93 | | * | v | |
94 | | * | +-------------------------+ | |
95 | | * +--->|SNAPBUILD_CONSISTENT |<------------+ |
96 | | * +-------------------------+ |
97 | | * |
98 | | * Initially the machinery is in the START stage. When an xl_running_xacts |
99 | | * record is read that is sufficiently new (above the safe xmin horizon), |
100 | | * there's a state transition. If there were no running xacts when the |
101 | | * xl_running_xacts record was generated, we'll directly go into CONSISTENT |
102 | | * state, otherwise we'll switch to the BUILDING_SNAPSHOT state. Having a full |
103 | | * snapshot means that all transactions that start henceforth can be decoded |
104 | | * in their entirety, but transactions that started previously can't. In |
105 | | * FULL_SNAPSHOT we'll switch into CONSISTENT once all those previously |
106 | | * running transactions have committed or aborted. |
107 | | * |
108 | | * Only transactions that commit after CONSISTENT state has been reached will |
109 | | * be replayed, even though they might have started while still in |
110 | | * FULL_SNAPSHOT. That ensures that we'll reach a point where no previous |
111 | | * changes has been exported, but all the following ones will be. That point |
112 | | * is a convenient point to initialize replication from, which is why we |
113 | | * export a snapshot at that point, which *can* be used to read normal data. |
114 | | * |
115 | | * Copyright (c) 2012-2025, PostgreSQL Global Development Group |
116 | | * |
117 | | * IDENTIFICATION |
118 | | * src/backend/replication/logical/snapbuild.c |
119 | | * |
120 | | *------------------------------------------------------------------------- |
121 | | */ |
122 | | |
123 | | #include "postgres.h" |
124 | | |
125 | | #include <sys/stat.h> |
126 | | #include <unistd.h> |
127 | | |
128 | | #include "access/heapam_xlog.h" |
129 | | #include "access/transam.h" |
130 | | #include "access/xact.h" |
131 | | #include "common/file_utils.h" |
132 | | #include "miscadmin.h" |
133 | | #include "pgstat.h" |
134 | | #include "replication/logical.h" |
135 | | #include "replication/reorderbuffer.h" |
136 | | #include "replication/snapbuild.h" |
137 | | #include "replication/snapbuild_internal.h" |
138 | | #include "storage/fd.h" |
139 | | #include "storage/lmgr.h" |
140 | | #include "storage/proc.h" |
141 | | #include "storage/procarray.h" |
142 | | #include "storage/standby.h" |
143 | | #include "utils/builtins.h" |
144 | | #include "utils/memutils.h" |
145 | | #include "utils/snapmgr.h" |
146 | | #include "utils/snapshot.h" |
147 | | /* |
148 | | * Starting a transaction -- which we need to do while exporting a snapshot -- |
149 | | * removes knowledge about the previously used resowner, so we save it here. |
150 | | */ |
151 | | static ResourceOwner SavedResourceOwnerDuringExport = NULL; |
152 | | static bool ExportInProgress = false; |
153 | | |
154 | | /* ->committed and ->catchange manipulation */ |
155 | | static void SnapBuildPurgeOlderTxn(SnapBuild *builder); |
156 | | |
157 | | /* snapshot building/manipulation/distribution functions */ |
158 | | static Snapshot SnapBuildBuildSnapshot(SnapBuild *builder); |
159 | | |
160 | | static void SnapBuildFreeSnapshot(Snapshot snap); |
161 | | |
162 | | static void SnapBuildSnapIncRefcount(Snapshot snap); |
163 | | |
164 | | static void SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid); |
165 | | |
166 | | static inline bool SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, |
167 | | uint32 xinfo); |
168 | | |
169 | | /* xlog reading helper functions for SnapBuildProcessRunningXacts */ |
170 | | static bool SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running); |
171 | | static void SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff); |
172 | | |
173 | | /* serialization functions */ |
174 | | static void SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn); |
175 | | static bool SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn); |
176 | | static void SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path); |
177 | | |
178 | | /* |
179 | | * Allocate a new snapshot builder. |
180 | | * |
181 | | * xmin_horizon is the xid >= which we can be sure no catalog rows have been |
182 | | * removed, start_lsn is the LSN >= we want to replay commits. |
183 | | */ |
184 | | SnapBuild * |
185 | | AllocateSnapshotBuilder(ReorderBuffer *reorder, |
186 | | TransactionId xmin_horizon, |
187 | | XLogRecPtr start_lsn, |
188 | | bool need_full_snapshot, |
189 | | bool in_slot_creation, |
190 | | XLogRecPtr two_phase_at) |
191 | 0 | { |
192 | 0 | MemoryContext context; |
193 | 0 | MemoryContext oldcontext; |
194 | 0 | SnapBuild *builder; |
195 | | |
196 | | /* allocate memory in own context, to have better accountability */ |
197 | 0 | context = AllocSetContextCreate(CurrentMemoryContext, |
198 | 0 | "snapshot builder context", |
199 | 0 | ALLOCSET_DEFAULT_SIZES); |
200 | 0 | oldcontext = MemoryContextSwitchTo(context); |
201 | |
|
202 | 0 | builder = palloc0(sizeof(SnapBuild)); |
203 | |
|
204 | 0 | builder->state = SNAPBUILD_START; |
205 | 0 | builder->context = context; |
206 | 0 | builder->reorder = reorder; |
207 | | /* Other struct members initialized by zeroing via palloc0 above */ |
208 | |
|
209 | 0 | builder->committed.xcnt = 0; |
210 | 0 | builder->committed.xcnt_space = 128; /* arbitrary number */ |
211 | 0 | builder->committed.xip = |
212 | 0 | palloc0(builder->committed.xcnt_space * sizeof(TransactionId)); |
213 | 0 | builder->committed.includes_all_transactions = true; |
214 | |
|
215 | 0 | builder->catchange.xcnt = 0; |
216 | 0 | builder->catchange.xip = NULL; |
217 | |
|
218 | 0 | builder->initial_xmin_horizon = xmin_horizon; |
219 | 0 | builder->start_decoding_at = start_lsn; |
220 | 0 | builder->in_slot_creation = in_slot_creation; |
221 | 0 | builder->building_full_snapshot = need_full_snapshot; |
222 | 0 | builder->two_phase_at = two_phase_at; |
223 | |
|
224 | 0 | MemoryContextSwitchTo(oldcontext); |
225 | |
|
226 | 0 | return builder; |
227 | 0 | } |
228 | | |
229 | | /* |
230 | | * Free a snapshot builder. |
231 | | */ |
232 | | void |
233 | | FreeSnapshotBuilder(SnapBuild *builder) |
234 | 0 | { |
235 | 0 | MemoryContext context = builder->context; |
236 | | |
237 | | /* free snapshot explicitly, that contains some error checking */ |
238 | 0 | if (builder->snapshot != NULL) |
239 | 0 | { |
240 | 0 | SnapBuildSnapDecRefcount(builder->snapshot); |
241 | 0 | builder->snapshot = NULL; |
242 | 0 | } |
243 | | |
244 | | /* other resources are deallocated via memory context reset */ |
245 | 0 | MemoryContextDelete(context); |
246 | 0 | } |
247 | | |
248 | | /* |
249 | | * Free an unreferenced snapshot that has previously been built by us. |
250 | | */ |
251 | | static void |
252 | | SnapBuildFreeSnapshot(Snapshot snap) |
253 | 0 | { |
254 | | /* make sure we don't get passed an external snapshot */ |
255 | 0 | Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC); |
256 | | |
257 | | /* make sure nobody modified our snapshot */ |
258 | 0 | Assert(snap->curcid == FirstCommandId); |
259 | 0 | Assert(!snap->suboverflowed); |
260 | 0 | Assert(!snap->takenDuringRecovery); |
261 | 0 | Assert(snap->regd_count == 0); |
262 | | |
263 | | /* slightly more likely, so it's checked even without c-asserts */ |
264 | 0 | if (snap->copied) |
265 | 0 | elog(ERROR, "cannot free a copied snapshot"); |
266 | | |
267 | 0 | if (snap->active_count) |
268 | 0 | elog(ERROR, "cannot free an active snapshot"); |
269 | | |
270 | 0 | pfree(snap); |
271 | 0 | } |
272 | | |
273 | | /* |
274 | | * In which state of snapshot building are we? |
275 | | */ |
276 | | SnapBuildState |
277 | | SnapBuildCurrentState(SnapBuild *builder) |
278 | 0 | { |
279 | 0 | return builder->state; |
280 | 0 | } |
281 | | |
282 | | /* |
283 | | * Return the LSN at which the two-phase decoding was first enabled. |
284 | | */ |
285 | | XLogRecPtr |
286 | | SnapBuildGetTwoPhaseAt(SnapBuild *builder) |
287 | 0 | { |
288 | 0 | return builder->two_phase_at; |
289 | 0 | } |
290 | | |
291 | | /* |
292 | | * Set the LSN at which two-phase decoding is enabled. |
293 | | */ |
294 | | void |
295 | | SnapBuildSetTwoPhaseAt(SnapBuild *builder, XLogRecPtr ptr) |
296 | 0 | { |
297 | 0 | builder->two_phase_at = ptr; |
298 | 0 | } |
299 | | |
300 | | /* |
301 | | * Should the contents of transaction ending at 'ptr' be decoded? |
302 | | */ |
303 | | bool |
304 | | SnapBuildXactNeedsSkip(SnapBuild *builder, XLogRecPtr ptr) |
305 | 0 | { |
306 | 0 | return ptr < builder->start_decoding_at; |
307 | 0 | } |
308 | | |
309 | | /* |
310 | | * Increase refcount of a snapshot. |
311 | | * |
312 | | * This is used when handing out a snapshot to some external resource or when |
313 | | * adding a Snapshot as builder->snapshot. |
314 | | */ |
315 | | static void |
316 | | SnapBuildSnapIncRefcount(Snapshot snap) |
317 | 0 | { |
318 | 0 | snap->active_count++; |
319 | 0 | } |
320 | | |
321 | | /* |
322 | | * Decrease refcount of a snapshot and free if the refcount reaches zero. |
323 | | * |
324 | | * Externally visible, so that external resources that have been handed an |
325 | | * IncRef'ed Snapshot can adjust its refcount easily. |
326 | | */ |
327 | | void |
328 | | SnapBuildSnapDecRefcount(Snapshot snap) |
329 | 0 | { |
330 | | /* make sure we don't get passed an external snapshot */ |
331 | 0 | Assert(snap->snapshot_type == SNAPSHOT_HISTORIC_MVCC); |
332 | | |
333 | | /* make sure nobody modified our snapshot */ |
334 | 0 | Assert(snap->curcid == FirstCommandId); |
335 | 0 | Assert(!snap->suboverflowed); |
336 | 0 | Assert(!snap->takenDuringRecovery); |
337 | |
|
338 | 0 | Assert(snap->regd_count == 0); |
339 | |
|
340 | 0 | Assert(snap->active_count > 0); |
341 | | |
342 | | /* slightly more likely, so it's checked even without casserts */ |
343 | 0 | if (snap->copied) |
344 | 0 | elog(ERROR, "cannot free a copied snapshot"); |
345 | | |
346 | 0 | snap->active_count--; |
347 | 0 | if (snap->active_count == 0) |
348 | 0 | SnapBuildFreeSnapshot(snap); |
349 | 0 | } |
350 | | |
351 | | /* |
352 | | * Build a new snapshot, based on currently committed catalog-modifying |
353 | | * transactions. |
354 | | * |
355 | | * In-progress transactions with catalog access are *not* allowed to modify |
356 | | * these snapshots; they have to copy them and fill in appropriate ->curcid |
357 | | * and ->subxip/subxcnt values. |
358 | | */ |
359 | | static Snapshot |
360 | | SnapBuildBuildSnapshot(SnapBuild *builder) |
361 | 0 | { |
362 | 0 | Snapshot snapshot; |
363 | 0 | Size ssize; |
364 | |
|
365 | 0 | Assert(builder->state >= SNAPBUILD_FULL_SNAPSHOT); |
366 | |
|
367 | 0 | ssize = sizeof(SnapshotData) |
368 | 0 | + sizeof(TransactionId) * builder->committed.xcnt |
369 | 0 | + sizeof(TransactionId) * 1 /* toplevel xid */ ; |
370 | |
|
371 | 0 | snapshot = MemoryContextAllocZero(builder->context, ssize); |
372 | |
|
373 | 0 | snapshot->snapshot_type = SNAPSHOT_HISTORIC_MVCC; |
374 | | |
375 | | /* |
376 | | * We misuse the original meaning of SnapshotData's xip and subxip fields |
377 | | * to make the more fitting for our needs. |
378 | | * |
379 | | * In the 'xip' array we store transactions that have to be treated as |
380 | | * committed. Since we will only ever look at tuples from transactions |
381 | | * that have modified the catalog it's more efficient to store those few |
382 | | * that exist between xmin and xmax (frequently there are none). |
383 | | * |
384 | | * Snapshots that are used in transactions that have modified the catalog |
385 | | * also use the 'subxip' array to store their toplevel xid and all the |
386 | | * subtransaction xids so we can recognize when we need to treat rows as |
387 | | * visible that are not in xip but still need to be visible. Subxip only |
388 | | * gets filled when the transaction is copied into the context of a |
389 | | * catalog modifying transaction since we otherwise share a snapshot |
390 | | * between transactions. As long as a txn hasn't modified the catalog it |
391 | | * doesn't need to treat any uncommitted rows as visible, so there is no |
392 | | * need for those xids. |
393 | | * |
394 | | * Both arrays are qsort'ed so that we can use bsearch() on them. |
395 | | */ |
396 | 0 | Assert(TransactionIdIsNormal(builder->xmin)); |
397 | 0 | Assert(TransactionIdIsNormal(builder->xmax)); |
398 | |
|
399 | 0 | snapshot->xmin = builder->xmin; |
400 | 0 | snapshot->xmax = builder->xmax; |
401 | | |
402 | | /* store all transactions to be treated as committed by this snapshot */ |
403 | 0 | snapshot->xip = |
404 | 0 | (TransactionId *) ((char *) snapshot + sizeof(SnapshotData)); |
405 | 0 | snapshot->xcnt = builder->committed.xcnt; |
406 | 0 | memcpy(snapshot->xip, |
407 | 0 | builder->committed.xip, |
408 | 0 | builder->committed.xcnt * sizeof(TransactionId)); |
409 | | |
410 | | /* sort so we can bsearch() */ |
411 | 0 | qsort(snapshot->xip, snapshot->xcnt, sizeof(TransactionId), xidComparator); |
412 | | |
413 | | /* |
414 | | * Initially, subxip is empty, i.e. it's a snapshot to be used by |
415 | | * transactions that don't modify the catalog. Will be filled by |
416 | | * ReorderBufferCopySnap() if necessary. |
417 | | */ |
418 | 0 | snapshot->subxcnt = 0; |
419 | 0 | snapshot->subxip = NULL; |
420 | |
|
421 | 0 | snapshot->suboverflowed = false; |
422 | 0 | snapshot->takenDuringRecovery = false; |
423 | 0 | snapshot->copied = false; |
424 | 0 | snapshot->curcid = FirstCommandId; |
425 | 0 | snapshot->active_count = 0; |
426 | 0 | snapshot->regd_count = 0; |
427 | 0 | snapshot->snapXactCompletionCount = 0; |
428 | |
|
429 | 0 | return snapshot; |
430 | 0 | } |
431 | | |
432 | | /* |
433 | | * Build the initial slot snapshot and convert it to a normal snapshot that |
434 | | * is understood by HeapTupleSatisfiesMVCC. |
435 | | * |
436 | | * The snapshot will be usable directly in current transaction or exported |
437 | | * for loading in different transaction. |
438 | | */ |
439 | | Snapshot |
440 | | SnapBuildInitialSnapshot(SnapBuild *builder) |
441 | 0 | { |
442 | 0 | Snapshot snap; |
443 | 0 | TransactionId xid; |
444 | 0 | TransactionId safeXid; |
445 | 0 | TransactionId *newxip; |
446 | 0 | int newxcnt = 0; |
447 | |
|
448 | 0 | Assert(XactIsoLevel == XACT_REPEATABLE_READ); |
449 | 0 | Assert(builder->building_full_snapshot); |
450 | | |
451 | | /* don't allow older snapshots */ |
452 | 0 | InvalidateCatalogSnapshot(); /* about to overwrite MyProc->xmin */ |
453 | 0 | if (HaveRegisteredOrActiveSnapshot()) |
454 | 0 | elog(ERROR, "cannot build an initial slot snapshot when snapshots exist"); |
455 | 0 | Assert(!HistoricSnapshotActive()); |
456 | |
|
457 | 0 | if (builder->state != SNAPBUILD_CONSISTENT) |
458 | 0 | elog(ERROR, "cannot build an initial slot snapshot before reaching a consistent state"); |
459 | | |
460 | 0 | if (!builder->committed.includes_all_transactions) |
461 | 0 | elog(ERROR, "cannot build an initial slot snapshot, not all transactions are monitored anymore"); |
462 | | |
463 | | /* so we don't overwrite the existing value */ |
464 | 0 | if (TransactionIdIsValid(MyProc->xmin)) |
465 | 0 | elog(ERROR, "cannot build an initial slot snapshot when MyProc->xmin already is valid"); |
466 | | |
467 | 0 | snap = SnapBuildBuildSnapshot(builder); |
468 | | |
469 | | /* |
470 | | * We know that snap->xmin is alive, enforced by the logical xmin |
471 | | * mechanism. Due to that we can do this without locks, we're only |
472 | | * changing our own value. |
473 | | * |
474 | | * Building an initial snapshot is expensive and an unenforced xmin |
475 | | * horizon would have bad consequences, therefore always double-check that |
476 | | * the horizon is enforced. |
477 | | */ |
478 | 0 | LWLockAcquire(ProcArrayLock, LW_SHARED); |
479 | 0 | safeXid = GetOldestSafeDecodingTransactionId(false); |
480 | 0 | LWLockRelease(ProcArrayLock); |
481 | |
|
482 | 0 | if (TransactionIdFollows(safeXid, snap->xmin)) |
483 | 0 | elog(ERROR, "cannot build an initial slot snapshot as oldest safe xid %u follows snapshot's xmin %u", |
484 | 0 | safeXid, snap->xmin); |
485 | | |
486 | 0 | MyProc->xmin = snap->xmin; |
487 | | |
488 | | /* allocate in transaction context */ |
489 | 0 | newxip = (TransactionId *) |
490 | 0 | palloc(sizeof(TransactionId) * GetMaxSnapshotXidCount()); |
491 | | |
492 | | /* |
493 | | * snapbuild.c builds transactions in an "inverted" manner, which means it |
494 | | * stores committed transactions in ->xip, not ones in progress. Build a |
495 | | * classical snapshot by marking all non-committed transactions as |
496 | | * in-progress. This can be expensive. |
497 | | */ |
498 | 0 | for (xid = snap->xmin; NormalTransactionIdPrecedes(xid, snap->xmax);) |
499 | 0 | { |
500 | 0 | void *test; |
501 | | |
502 | | /* |
503 | | * Check whether transaction committed using the decoding snapshot |
504 | | * meaning of ->xip. |
505 | | */ |
506 | 0 | test = bsearch(&xid, snap->xip, snap->xcnt, |
507 | 0 | sizeof(TransactionId), xidComparator); |
508 | |
|
509 | 0 | if (test == NULL) |
510 | 0 | { |
511 | 0 | if (newxcnt >= GetMaxSnapshotXidCount()) |
512 | 0 | ereport(ERROR, |
513 | 0 | (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), |
514 | 0 | errmsg("initial slot snapshot too large"))); |
515 | | |
516 | 0 | newxip[newxcnt++] = xid; |
517 | 0 | } |
518 | | |
519 | 0 | TransactionIdAdvance(xid); |
520 | 0 | } |
521 | | |
522 | | /* adjust remaining snapshot fields as needed */ |
523 | 0 | snap->snapshot_type = SNAPSHOT_MVCC; |
524 | 0 | snap->xcnt = newxcnt; |
525 | 0 | snap->xip = newxip; |
526 | |
|
527 | 0 | return snap; |
528 | 0 | } |
529 | | |
530 | | /* |
531 | | * Export a snapshot so it can be set in another session with SET TRANSACTION |
532 | | * SNAPSHOT. |
533 | | * |
534 | | * For that we need to start a transaction in the current backend as the |
535 | | * importing side checks whether the source transaction is still open to make |
536 | | * sure the xmin horizon hasn't advanced since then. |
537 | | */ |
538 | | const char * |
539 | | SnapBuildExportSnapshot(SnapBuild *builder) |
540 | 0 | { |
541 | 0 | Snapshot snap; |
542 | 0 | char *snapname; |
543 | |
|
544 | 0 | if (IsTransactionOrTransactionBlock()) |
545 | 0 | elog(ERROR, "cannot export a snapshot from within a transaction"); |
546 | | |
547 | 0 | if (SavedResourceOwnerDuringExport) |
548 | 0 | elog(ERROR, "can only export one snapshot at a time"); |
549 | | |
550 | 0 | SavedResourceOwnerDuringExport = CurrentResourceOwner; |
551 | 0 | ExportInProgress = true; |
552 | |
|
553 | 0 | StartTransactionCommand(); |
554 | | |
555 | | /* There doesn't seem to a nice API to set these */ |
556 | 0 | XactIsoLevel = XACT_REPEATABLE_READ; |
557 | 0 | XactReadOnly = true; |
558 | |
|
559 | 0 | snap = SnapBuildInitialSnapshot(builder); |
560 | | |
561 | | /* |
562 | | * now that we've built a plain snapshot, make it active and use the |
563 | | * normal mechanisms for exporting it |
564 | | */ |
565 | 0 | snapname = ExportSnapshot(snap); |
566 | |
|
567 | 0 | ereport(LOG, |
568 | 0 | (errmsg_plural("exported logical decoding snapshot: \"%s\" with %u transaction ID", |
569 | 0 | "exported logical decoding snapshot: \"%s\" with %u transaction IDs", |
570 | 0 | snap->xcnt, |
571 | 0 | snapname, snap->xcnt))); |
572 | 0 | return snapname; |
573 | 0 | } |
574 | | |
575 | | /* |
576 | | * Ensure there is a snapshot and if not build one for current transaction. |
577 | | */ |
578 | | Snapshot |
579 | | SnapBuildGetOrBuildSnapshot(SnapBuild *builder) |
580 | 0 | { |
581 | 0 | Assert(builder->state == SNAPBUILD_CONSISTENT); |
582 | | |
583 | | /* only build a new snapshot if we don't have a prebuilt one */ |
584 | 0 | if (builder->snapshot == NULL) |
585 | 0 | { |
586 | 0 | builder->snapshot = SnapBuildBuildSnapshot(builder); |
587 | | /* increase refcount for the snapshot builder */ |
588 | 0 | SnapBuildSnapIncRefcount(builder->snapshot); |
589 | 0 | } |
590 | |
|
591 | 0 | return builder->snapshot; |
592 | 0 | } |
593 | | |
594 | | /* |
595 | | * Reset a previously SnapBuildExportSnapshot()'ed snapshot if there is |
596 | | * any. Aborts the previously started transaction and resets the resource |
597 | | * owner back to its original value. |
598 | | */ |
599 | | void |
600 | | SnapBuildClearExportedSnapshot(void) |
601 | 0 | { |
602 | 0 | ResourceOwner tmpResOwner; |
603 | | |
604 | | /* nothing exported, that is the usual case */ |
605 | 0 | if (!ExportInProgress) |
606 | 0 | return; |
607 | | |
608 | 0 | if (!IsTransactionState()) |
609 | 0 | elog(ERROR, "clearing exported snapshot in wrong transaction state"); |
610 | | |
611 | | /* |
612 | | * AbortCurrentTransaction() takes care of resetting the snapshot state, |
613 | | * so remember SavedResourceOwnerDuringExport. |
614 | | */ |
615 | 0 | tmpResOwner = SavedResourceOwnerDuringExport; |
616 | | |
617 | | /* make sure nothing could have ever happened */ |
618 | 0 | AbortCurrentTransaction(); |
619 | |
|
620 | 0 | CurrentResourceOwner = tmpResOwner; |
621 | 0 | } |
622 | | |
623 | | /* |
624 | | * Clear snapshot export state during transaction abort. |
625 | | */ |
626 | | void |
627 | | SnapBuildResetExportedSnapshotState(void) |
628 | 0 | { |
629 | 0 | SavedResourceOwnerDuringExport = NULL; |
630 | 0 | ExportInProgress = false; |
631 | 0 | } |
632 | | |
633 | | /* |
634 | | * Handle the effects of a single heap change, appropriate to the current state |
635 | | * of the snapshot builder and returns whether changes made at (xid, lsn) can |
636 | | * be decoded. |
637 | | */ |
638 | | bool |
639 | | SnapBuildProcessChange(SnapBuild *builder, TransactionId xid, XLogRecPtr lsn) |
640 | 0 | { |
641 | | /* |
642 | | * We can't handle data in transactions if we haven't built a snapshot |
643 | | * yet, so don't store them. |
644 | | */ |
645 | 0 | if (builder->state < SNAPBUILD_FULL_SNAPSHOT) |
646 | 0 | return false; |
647 | | |
648 | | /* |
649 | | * No point in keeping track of changes in transactions that we don't have |
650 | | * enough information about to decode. This means that they started before |
651 | | * we got into the SNAPBUILD_FULL_SNAPSHOT state. |
652 | | */ |
653 | 0 | if (builder->state < SNAPBUILD_CONSISTENT && |
654 | 0 | TransactionIdPrecedes(xid, builder->next_phase_at)) |
655 | 0 | return false; |
656 | | |
657 | | /* |
658 | | * If the reorderbuffer doesn't yet have a snapshot, add one now, it will |
659 | | * be needed to decode the change we're currently processing. |
660 | | */ |
661 | 0 | if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) |
662 | 0 | { |
663 | | /* only build a new snapshot if we don't have a prebuilt one */ |
664 | 0 | if (builder->snapshot == NULL) |
665 | 0 | { |
666 | 0 | builder->snapshot = SnapBuildBuildSnapshot(builder); |
667 | | /* increase refcount for the snapshot builder */ |
668 | 0 | SnapBuildSnapIncRefcount(builder->snapshot); |
669 | 0 | } |
670 | | |
671 | | /* |
672 | | * Increase refcount for the transaction we're handing the snapshot |
673 | | * out to. |
674 | | */ |
675 | 0 | SnapBuildSnapIncRefcount(builder->snapshot); |
676 | 0 | ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn, |
677 | 0 | builder->snapshot); |
678 | 0 | } |
679 | |
|
680 | 0 | return true; |
681 | 0 | } |
682 | | |
683 | | /* |
684 | | * Do CommandId/combo CID handling after reading an xl_heap_new_cid record. |
685 | | * This implies that a transaction has done some form of write to system |
686 | | * catalogs. |
687 | | */ |
688 | | void |
689 | | SnapBuildProcessNewCid(SnapBuild *builder, TransactionId xid, |
690 | | XLogRecPtr lsn, xl_heap_new_cid *xlrec) |
691 | 0 | { |
692 | 0 | CommandId cid; |
693 | | |
694 | | /* |
695 | | * we only log new_cid's if a catalog tuple was modified, so mark the |
696 | | * transaction as containing catalog modifications |
697 | | */ |
698 | 0 | ReorderBufferXidSetCatalogChanges(builder->reorder, xid, lsn); |
699 | |
|
700 | 0 | ReorderBufferAddNewTupleCids(builder->reorder, xlrec->top_xid, lsn, |
701 | 0 | xlrec->target_locator, xlrec->target_tid, |
702 | 0 | xlrec->cmin, xlrec->cmax, |
703 | 0 | xlrec->combocid); |
704 | | |
705 | | /* figure out new command id */ |
706 | 0 | if (xlrec->cmin != InvalidCommandId && |
707 | 0 | xlrec->cmax != InvalidCommandId) |
708 | 0 | cid = Max(xlrec->cmin, xlrec->cmax); |
709 | 0 | else if (xlrec->cmax != InvalidCommandId) |
710 | 0 | cid = xlrec->cmax; |
711 | 0 | else if (xlrec->cmin != InvalidCommandId) |
712 | 0 | cid = xlrec->cmin; |
713 | 0 | else |
714 | 0 | { |
715 | 0 | cid = InvalidCommandId; /* silence compiler */ |
716 | 0 | elog(ERROR, "xl_heap_new_cid record without a valid CommandId"); |
717 | 0 | } |
718 | | |
719 | 0 | ReorderBufferAddNewCommandId(builder->reorder, xid, lsn, cid + 1); |
720 | 0 | } |
721 | | |
722 | | /* |
723 | | * Add a new Snapshot and invalidation messages to all transactions we're |
724 | | * decoding that currently are in-progress so they can see new catalog contents |
725 | | * made by the transaction that just committed. This is necessary because those |
726 | | * in-progress transactions will use the new catalog's contents from here on |
727 | | * (at the very least everything they do needs to be compatible with newer |
728 | | * catalog contents). |
729 | | */ |
730 | | static void |
731 | | SnapBuildDistributeSnapshotAndInval(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid) |
732 | 0 | { |
733 | 0 | dlist_iter txn_i; |
734 | 0 | ReorderBufferTXN *txn; |
735 | | |
736 | | /* |
737 | | * Iterate through all toplevel transactions. This can include |
738 | | * subtransactions which we just don't yet know to be that, but that's |
739 | | * fine, they will just get an unnecessary snapshot and invalidations |
740 | | * queued. |
741 | | */ |
742 | 0 | dlist_foreach(txn_i, &builder->reorder->toplevel_by_lsn) |
743 | 0 | { |
744 | 0 | txn = dlist_container(ReorderBufferTXN, node, txn_i.cur); |
745 | |
|
746 | 0 | Assert(TransactionIdIsValid(txn->xid)); |
747 | | |
748 | | /* |
749 | | * If we don't have a base snapshot yet, there are no changes in this |
750 | | * transaction which in turn implies we don't yet need a snapshot at |
751 | | * all. We'll add a snapshot when the first change gets queued. |
752 | | * |
753 | | * Similarly, we don't need to add invalidations to a transaction |
754 | | * whose base snapshot is not yet set. Once a base snapshot is built, |
755 | | * it will include the xids of committed transactions that have |
756 | | * modified the catalog, thus reflecting the new catalog contents. The |
757 | | * existing catalog cache will have already been invalidated after |
758 | | * processing the invalidations in the transaction that modified |
759 | | * catalogs, ensuring that a fresh cache is constructed during |
760 | | * decoding. |
761 | | * |
762 | | * NB: This works correctly even for subtransactions because |
763 | | * ReorderBufferAssignChild() takes care to transfer the base snapshot |
764 | | * to the top-level transaction, and while iterating the changequeue |
765 | | * we'll get the change from the subtxn. |
766 | | */ |
767 | 0 | if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, txn->xid)) |
768 | 0 | continue; |
769 | | |
770 | | /* |
771 | | * We don't need to add snapshot or invalidations to prepared |
772 | | * transactions as they should not see the new catalog contents. |
773 | | */ |
774 | 0 | if (rbtxn_is_prepared(txn)) |
775 | 0 | continue; |
776 | | |
777 | 0 | elog(DEBUG2, "adding a new snapshot and invalidations to %u at %X/%08X", |
778 | 0 | txn->xid, LSN_FORMAT_ARGS(lsn)); |
779 | | |
780 | | /* |
781 | | * increase the snapshot's refcount for the transaction we are handing |
782 | | * it out to |
783 | | */ |
784 | 0 | SnapBuildSnapIncRefcount(builder->snapshot); |
785 | 0 | ReorderBufferAddSnapshot(builder->reorder, txn->xid, lsn, |
786 | 0 | builder->snapshot); |
787 | | |
788 | | /* |
789 | | * Add invalidation messages to the reorder buffer of in-progress |
790 | | * transactions except the current committed transaction, for which we |
791 | | * will execute invalidations at the end. |
792 | | * |
793 | | * It is required, otherwise, we will end up using the stale catcache |
794 | | * contents built by the current transaction even after its decoding, |
795 | | * which should have been invalidated due to concurrent catalog |
796 | | * changing transaction. |
797 | | * |
798 | | * Distribute only the invalidation messages generated by the current |
799 | | * committed transaction. Invalidation messages received from other |
800 | | * transactions would have already been propagated to the relevant |
801 | | * in-progress transactions. This transaction would have processed |
802 | | * those invalidations, ensuring that subsequent transactions observe |
803 | | * a consistent cache state. |
804 | | */ |
805 | 0 | if (txn->xid != xid) |
806 | 0 | { |
807 | 0 | uint32 ninvalidations; |
808 | 0 | SharedInvalidationMessage *msgs = NULL; |
809 | |
|
810 | 0 | ninvalidations = ReorderBufferGetInvalidations(builder->reorder, |
811 | 0 | xid, &msgs); |
812 | |
|
813 | 0 | if (ninvalidations > 0) |
814 | 0 | { |
815 | 0 | Assert(msgs != NULL); |
816 | |
|
817 | 0 | ReorderBufferAddDistributedInvalidations(builder->reorder, |
818 | 0 | txn->xid, lsn, |
819 | 0 | ninvalidations, msgs); |
820 | 0 | } |
821 | 0 | } |
822 | 0 | } |
823 | 0 | } |
824 | | |
825 | | /* |
826 | | * Keep track of a new catalog changing transaction that has committed. |
827 | | */ |
828 | | static void |
829 | | SnapBuildAddCommittedTxn(SnapBuild *builder, TransactionId xid) |
830 | 0 | { |
831 | 0 | Assert(TransactionIdIsValid(xid)); |
832 | |
|
833 | 0 | if (builder->committed.xcnt == builder->committed.xcnt_space) |
834 | 0 | { |
835 | 0 | builder->committed.xcnt_space = builder->committed.xcnt_space * 2 + 1; |
836 | |
|
837 | 0 | elog(DEBUG1, "increasing space for committed transactions to %u", |
838 | 0 | (uint32) builder->committed.xcnt_space); |
839 | | |
840 | 0 | builder->committed.xip = repalloc(builder->committed.xip, |
841 | 0 | builder->committed.xcnt_space * sizeof(TransactionId)); |
842 | 0 | } |
843 | | |
844 | | /* |
845 | | * TODO: It might make sense to keep the array sorted here instead of |
846 | | * doing it every time we build a new snapshot. On the other hand this |
847 | | * gets called repeatedly when a transaction with subtransactions commits. |
848 | | */ |
849 | 0 | builder->committed.xip[builder->committed.xcnt++] = xid; |
850 | 0 | } |
851 | | |
852 | | /* |
853 | | * Remove knowledge about transactions we treat as committed or containing catalog |
854 | | * changes that are smaller than ->xmin. Those won't ever get checked via |
855 | | * the ->committed or ->catchange array, respectively. The committed xids will |
856 | | * get checked via the clog machinery. |
857 | | * |
858 | | * We can ideally remove the transaction from catchange array once it is |
859 | | * finished (committed/aborted) but that could be costly as we need to maintain |
860 | | * the xids order in the array. |
861 | | */ |
862 | | static void |
863 | | SnapBuildPurgeOlderTxn(SnapBuild *builder) |
864 | 0 | { |
865 | 0 | int off; |
866 | 0 | TransactionId *workspace; |
867 | 0 | int surviving_xids = 0; |
868 | | |
869 | | /* not ready yet */ |
870 | 0 | if (!TransactionIdIsNormal(builder->xmin)) |
871 | 0 | return; |
872 | | |
873 | | /* TODO: Neater algorithm than just copying and iterating? */ |
874 | 0 | workspace = |
875 | 0 | MemoryContextAlloc(builder->context, |
876 | 0 | builder->committed.xcnt * sizeof(TransactionId)); |
877 | | |
878 | | /* copy xids that still are interesting to workspace */ |
879 | 0 | for (off = 0; off < builder->committed.xcnt; off++) |
880 | 0 | { |
881 | 0 | if (NormalTransactionIdPrecedes(builder->committed.xip[off], |
882 | 0 | builder->xmin)) |
883 | 0 | ; /* remove */ |
884 | 0 | else |
885 | 0 | workspace[surviving_xids++] = builder->committed.xip[off]; |
886 | 0 | } |
887 | | |
888 | | /* copy workspace back to persistent state */ |
889 | 0 | memcpy(builder->committed.xip, workspace, |
890 | 0 | surviving_xids * sizeof(TransactionId)); |
891 | |
|
892 | 0 | elog(DEBUG3, "purged committed transactions from %u to %u, xmin: %u, xmax: %u", |
893 | 0 | (uint32) builder->committed.xcnt, (uint32) surviving_xids, |
894 | 0 | builder->xmin, builder->xmax); |
895 | 0 | builder->committed.xcnt = surviving_xids; |
896 | |
|
897 | 0 | pfree(workspace); |
898 | | |
899 | | /* |
900 | | * Purge xids in ->catchange as well. The purged array must also be sorted |
901 | | * in xidComparator order. |
902 | | */ |
903 | 0 | if (builder->catchange.xcnt > 0) |
904 | 0 | { |
905 | | /* |
906 | | * Since catchange.xip is sorted, we find the lower bound of xids that |
907 | | * are still interesting. |
908 | | */ |
909 | 0 | for (off = 0; off < builder->catchange.xcnt; off++) |
910 | 0 | { |
911 | 0 | if (TransactionIdFollowsOrEquals(builder->catchange.xip[off], |
912 | 0 | builder->xmin)) |
913 | 0 | break; |
914 | 0 | } |
915 | |
|
916 | 0 | surviving_xids = builder->catchange.xcnt - off; |
917 | |
|
918 | 0 | if (surviving_xids > 0) |
919 | 0 | { |
920 | 0 | memmove(builder->catchange.xip, &(builder->catchange.xip[off]), |
921 | 0 | surviving_xids * sizeof(TransactionId)); |
922 | 0 | } |
923 | 0 | else |
924 | 0 | { |
925 | 0 | pfree(builder->catchange.xip); |
926 | 0 | builder->catchange.xip = NULL; |
927 | 0 | } |
928 | |
|
929 | 0 | elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u", |
930 | 0 | (uint32) builder->catchange.xcnt, (uint32) surviving_xids, |
931 | 0 | builder->xmin, builder->xmax); |
932 | 0 | builder->catchange.xcnt = surviving_xids; |
933 | 0 | } |
934 | 0 | } |
935 | | |
936 | | /* |
937 | | * Handle everything that needs to be done when a transaction commits |
938 | | */ |
939 | | void |
940 | | SnapBuildCommitTxn(SnapBuild *builder, XLogRecPtr lsn, TransactionId xid, |
941 | | int nsubxacts, TransactionId *subxacts, uint32 xinfo) |
942 | 0 | { |
943 | 0 | int nxact; |
944 | |
|
945 | 0 | bool needs_snapshot = false; |
946 | 0 | bool needs_timetravel = false; |
947 | 0 | bool sub_needs_timetravel = false; |
948 | |
|
949 | 0 | TransactionId xmax = xid; |
950 | | |
951 | | /* |
952 | | * Transactions preceding BUILDING_SNAPSHOT will neither be decoded, nor |
953 | | * will they be part of a snapshot. So we don't need to record anything. |
954 | | */ |
955 | 0 | if (builder->state == SNAPBUILD_START || |
956 | 0 | (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && |
957 | 0 | TransactionIdPrecedes(xid, builder->next_phase_at))) |
958 | 0 | { |
959 | | /* ensure that only commits after this are getting replayed */ |
960 | 0 | if (builder->start_decoding_at <= lsn) |
961 | 0 | builder->start_decoding_at = lsn + 1; |
962 | 0 | return; |
963 | 0 | } |
964 | | |
965 | 0 | if (builder->state < SNAPBUILD_CONSISTENT) |
966 | 0 | { |
967 | | /* ensure that only commits after this are getting replayed */ |
968 | 0 | if (builder->start_decoding_at <= lsn) |
969 | 0 | builder->start_decoding_at = lsn + 1; |
970 | | |
971 | | /* |
972 | | * If building an exportable snapshot, force xid to be tracked, even |
973 | | * if the transaction didn't modify the catalog. |
974 | | */ |
975 | 0 | if (builder->building_full_snapshot) |
976 | 0 | { |
977 | 0 | needs_timetravel = true; |
978 | 0 | } |
979 | 0 | } |
980 | |
|
981 | 0 | for (nxact = 0; nxact < nsubxacts; nxact++) |
982 | 0 | { |
983 | 0 | TransactionId subxid = subxacts[nxact]; |
984 | | |
985 | | /* |
986 | | * Add subtransaction to base snapshot if catalog modifying, we don't |
987 | | * distinguish to toplevel transactions there. |
988 | | */ |
989 | 0 | if (SnapBuildXidHasCatalogChanges(builder, subxid, xinfo)) |
990 | 0 | { |
991 | 0 | sub_needs_timetravel = true; |
992 | 0 | needs_snapshot = true; |
993 | |
|
994 | 0 | elog(DEBUG1, "found subtransaction %u:%u with catalog changes", |
995 | 0 | xid, subxid); |
996 | | |
997 | 0 | SnapBuildAddCommittedTxn(builder, subxid); |
998 | |
|
999 | 0 | if (NormalTransactionIdFollows(subxid, xmax)) |
1000 | 0 | xmax = subxid; |
1001 | 0 | } |
1002 | | |
1003 | | /* |
1004 | | * If we're forcing timetravel we also need visibility information |
1005 | | * about subtransaction, so keep track of subtransaction's state, even |
1006 | | * if not catalog modifying. Don't need to distribute a snapshot in |
1007 | | * that case. |
1008 | | */ |
1009 | 0 | else if (needs_timetravel) |
1010 | 0 | { |
1011 | 0 | SnapBuildAddCommittedTxn(builder, subxid); |
1012 | 0 | if (NormalTransactionIdFollows(subxid, xmax)) |
1013 | 0 | xmax = subxid; |
1014 | 0 | } |
1015 | 0 | } |
1016 | | |
1017 | | /* if top-level modified catalog, it'll need a snapshot */ |
1018 | 0 | if (SnapBuildXidHasCatalogChanges(builder, xid, xinfo)) |
1019 | 0 | { |
1020 | 0 | elog(DEBUG2, "found top level transaction %u, with catalog changes", |
1021 | 0 | xid); |
1022 | 0 | needs_snapshot = true; |
1023 | 0 | needs_timetravel = true; |
1024 | 0 | SnapBuildAddCommittedTxn(builder, xid); |
1025 | 0 | } |
1026 | 0 | else if (sub_needs_timetravel) |
1027 | 0 | { |
1028 | | /* track toplevel txn as well, subxact alone isn't meaningful */ |
1029 | 0 | elog(DEBUG2, "forced transaction %u to do timetravel due to one of its subtransactions", |
1030 | 0 | xid); |
1031 | 0 | needs_timetravel = true; |
1032 | 0 | SnapBuildAddCommittedTxn(builder, xid); |
1033 | 0 | } |
1034 | 0 | else if (needs_timetravel) |
1035 | 0 | { |
1036 | 0 | elog(DEBUG2, "forced transaction %u to do timetravel", xid); |
1037 | | |
1038 | 0 | SnapBuildAddCommittedTxn(builder, xid); |
1039 | 0 | } |
1040 | | |
1041 | 0 | if (!needs_timetravel) |
1042 | 0 | { |
1043 | | /* record that we cannot export a general snapshot anymore */ |
1044 | 0 | builder->committed.includes_all_transactions = false; |
1045 | 0 | } |
1046 | |
|
1047 | 0 | Assert(!needs_snapshot || needs_timetravel); |
1048 | | |
1049 | | /* |
1050 | | * Adjust xmax of the snapshot builder, we only do that for committed, |
1051 | | * catalog modifying, transactions, everything else isn't interesting for |
1052 | | * us since we'll never look at the respective rows. |
1053 | | */ |
1054 | 0 | if (needs_timetravel && |
1055 | 0 | (!TransactionIdIsValid(builder->xmax) || |
1056 | 0 | TransactionIdFollowsOrEquals(xmax, builder->xmax))) |
1057 | 0 | { |
1058 | 0 | builder->xmax = xmax; |
1059 | 0 | TransactionIdAdvance(builder->xmax); |
1060 | 0 | } |
1061 | | |
1062 | | /* if there's any reason to build a historic snapshot, do so now */ |
1063 | 0 | if (needs_snapshot) |
1064 | 0 | { |
1065 | | /* |
1066 | | * If we haven't built a complete snapshot yet there's no need to hand |
1067 | | * it out, it wouldn't (and couldn't) be used anyway. |
1068 | | */ |
1069 | 0 | if (builder->state < SNAPBUILD_FULL_SNAPSHOT) |
1070 | 0 | return; |
1071 | | |
1072 | | /* |
1073 | | * Decrease the snapshot builder's refcount of the old snapshot, note |
1074 | | * that it still will be used if it has been handed out to the |
1075 | | * reorderbuffer earlier. |
1076 | | */ |
1077 | 0 | if (builder->snapshot) |
1078 | 0 | SnapBuildSnapDecRefcount(builder->snapshot); |
1079 | |
|
1080 | 0 | builder->snapshot = SnapBuildBuildSnapshot(builder); |
1081 | | |
1082 | | /* we might need to execute invalidations, add snapshot */ |
1083 | 0 | if (!ReorderBufferXidHasBaseSnapshot(builder->reorder, xid)) |
1084 | 0 | { |
1085 | 0 | SnapBuildSnapIncRefcount(builder->snapshot); |
1086 | 0 | ReorderBufferSetBaseSnapshot(builder->reorder, xid, lsn, |
1087 | 0 | builder->snapshot); |
1088 | 0 | } |
1089 | | |
1090 | | /* refcount of the snapshot builder for the new snapshot */ |
1091 | 0 | SnapBuildSnapIncRefcount(builder->snapshot); |
1092 | | |
1093 | | /* |
1094 | | * Add a new catalog snapshot and invalidations messages to all |
1095 | | * currently running transactions. |
1096 | | */ |
1097 | 0 | SnapBuildDistributeSnapshotAndInval(builder, lsn, xid); |
1098 | 0 | } |
1099 | 0 | } |
1100 | | |
1101 | | /* |
1102 | | * Check the reorder buffer and the snapshot to see if the given transaction has |
1103 | | * modified catalogs. |
1104 | | */ |
1105 | | static inline bool |
1106 | | SnapBuildXidHasCatalogChanges(SnapBuild *builder, TransactionId xid, |
1107 | | uint32 xinfo) |
1108 | 0 | { |
1109 | 0 | if (ReorderBufferXidHasCatalogChanges(builder->reorder, xid)) |
1110 | 0 | return true; |
1111 | | |
1112 | | /* |
1113 | | * The transactions that have changed catalogs must have invalidation |
1114 | | * info. |
1115 | | */ |
1116 | 0 | if (!(xinfo & XACT_XINFO_HAS_INVALS)) |
1117 | 0 | return false; |
1118 | | |
1119 | | /* Check the catchange XID array */ |
1120 | 0 | return ((builder->catchange.xcnt > 0) && |
1121 | 0 | (bsearch(&xid, builder->catchange.xip, builder->catchange.xcnt, |
1122 | 0 | sizeof(TransactionId), xidComparator) != NULL)); |
1123 | 0 | } |
1124 | | |
1125 | | /* ----------------------------------- |
1126 | | * Snapshot building functions dealing with xlog records |
1127 | | * ----------------------------------- |
1128 | | */ |
1129 | | |
1130 | | /* |
1131 | | * Process a running xacts record, and use its information to first build a |
1132 | | * historic snapshot and later to release resources that aren't needed |
1133 | | * anymore. |
1134 | | */ |
1135 | | void |
1136 | | SnapBuildProcessRunningXacts(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) |
1137 | 0 | { |
1138 | 0 | ReorderBufferTXN *txn; |
1139 | 0 | TransactionId xmin; |
1140 | | |
1141 | | /* |
1142 | | * If we're not consistent yet, inspect the record to see whether it |
1143 | | * allows to get closer to being consistent. If we are consistent, dump |
1144 | | * our snapshot so others or we, after a restart, can use it. |
1145 | | */ |
1146 | 0 | if (builder->state < SNAPBUILD_CONSISTENT) |
1147 | 0 | { |
1148 | | /* returns false if there's no point in performing cleanup just yet */ |
1149 | 0 | if (!SnapBuildFindSnapshot(builder, lsn, running)) |
1150 | 0 | return; |
1151 | 0 | } |
1152 | 0 | else |
1153 | 0 | SnapBuildSerialize(builder, lsn); |
1154 | | |
1155 | | /* |
1156 | | * Update range of interesting xids based on the running xacts |
1157 | | * information. We don't increase ->xmax using it, because once we are in |
1158 | | * a consistent state we can do that ourselves and much more efficiently |
1159 | | * so, because we only need to do it for catalog transactions since we |
1160 | | * only ever look at those. |
1161 | | * |
1162 | | * NB: We only increase xmax when a catalog modifying transaction commits |
1163 | | * (see SnapBuildCommitTxn). Because of this, xmax can be lower than |
1164 | | * xmin, which looks odd but is correct and actually more efficient, since |
1165 | | * we hit fast paths in heapam_visibility.c. |
1166 | | */ |
1167 | 0 | builder->xmin = running->oldestRunningXid; |
1168 | | |
1169 | | /* Remove transactions we don't need to keep track off anymore */ |
1170 | 0 | SnapBuildPurgeOlderTxn(builder); |
1171 | | |
1172 | | /* |
1173 | | * Advance the xmin limit for the current replication slot, to allow |
1174 | | * vacuum to clean up the tuples this slot has been protecting. |
1175 | | * |
1176 | | * The reorderbuffer might have an xmin among the currently running |
1177 | | * snapshots; use it if so. If not, we need only consider the snapshots |
1178 | | * we'll produce later, which can't be less than the oldest running xid in |
1179 | | * the record we're reading now. |
1180 | | */ |
1181 | 0 | xmin = ReorderBufferGetOldestXmin(builder->reorder); |
1182 | 0 | if (xmin == InvalidTransactionId) |
1183 | 0 | xmin = running->oldestRunningXid; |
1184 | 0 | elog(DEBUG3, "xmin: %u, xmax: %u, oldest running: %u, oldest xmin: %u", |
1185 | 0 | builder->xmin, builder->xmax, running->oldestRunningXid, xmin); |
1186 | 0 | LogicalIncreaseXminForSlot(lsn, xmin); |
1187 | | |
1188 | | /* |
1189 | | * Also tell the slot where we can restart decoding from. We don't want to |
1190 | | * do that after every commit because changing that implies an fsync of |
1191 | | * the logical slot's state file, so we only do it every time we see a |
1192 | | * running xacts record. |
1193 | | * |
1194 | | * Do so by looking for the oldest in progress transaction (determined by |
1195 | | * the first LSN of any of its relevant records). Every transaction |
1196 | | * remembers the last location we stored the snapshot to disk before its |
1197 | | * beginning. That point is where we can restart from. |
1198 | | */ |
1199 | | |
1200 | | /* |
1201 | | * Can't know about a serialized snapshot's location if we're not |
1202 | | * consistent. |
1203 | | */ |
1204 | 0 | if (builder->state < SNAPBUILD_CONSISTENT) |
1205 | 0 | return; |
1206 | | |
1207 | 0 | txn = ReorderBufferGetOldestTXN(builder->reorder); |
1208 | | |
1209 | | /* |
1210 | | * oldest ongoing txn might have started when we didn't yet serialize |
1211 | | * anything because we hadn't reached a consistent state yet. |
1212 | | */ |
1213 | 0 | if (txn != NULL && txn->restart_decoding_lsn != InvalidXLogRecPtr) |
1214 | 0 | LogicalIncreaseRestartDecodingForSlot(lsn, txn->restart_decoding_lsn); |
1215 | | |
1216 | | /* |
1217 | | * No in-progress transaction, can reuse the last serialized snapshot if |
1218 | | * we have one. |
1219 | | */ |
1220 | 0 | else if (txn == NULL && |
1221 | 0 | builder->reorder->current_restart_decoding_lsn != InvalidXLogRecPtr && |
1222 | 0 | builder->last_serialized_snapshot != InvalidXLogRecPtr) |
1223 | 0 | LogicalIncreaseRestartDecodingForSlot(lsn, |
1224 | 0 | builder->last_serialized_snapshot); |
1225 | 0 | } |
1226 | | |
1227 | | |
1228 | | /* |
1229 | | * Build the start of a snapshot that's capable of decoding the catalog. |
1230 | | * |
1231 | | * Helper function for SnapBuildProcessRunningXacts() while we're not yet |
1232 | | * consistent. |
1233 | | * |
1234 | | * Returns true if there is a point in performing internal maintenance/cleanup |
1235 | | * using the xl_running_xacts record. |
1236 | | */ |
1237 | | static bool |
1238 | | SnapBuildFindSnapshot(SnapBuild *builder, XLogRecPtr lsn, xl_running_xacts *running) |
1239 | 0 | { |
1240 | | /* --- |
1241 | | * Build catalog decoding snapshot incrementally using information about |
1242 | | * the currently running transactions. There are several ways to do that: |
1243 | | * |
1244 | | * a) There were no running transactions when the xl_running_xacts record |
1245 | | * was inserted, jump to CONSISTENT immediately. We might find such a |
1246 | | * state while waiting on c)'s sub-states. |
1247 | | * |
1248 | | * b) This (in a previous run) or another decoding slot serialized a |
1249 | | * snapshot to disk that we can use. Can't use this method while finding |
1250 | | * the start point for decoding changes as the restart LSN would be an |
1251 | | * arbitrary LSN but we need to find the start point to extract changes |
1252 | | * where we won't see the data for partial transactions. Also, we cannot |
1253 | | * use this method when a slot needs a full snapshot for export or direct |
1254 | | * use, as that snapshot will only contain catalog modifying transactions. |
1255 | | * |
1256 | | * c) First incrementally build a snapshot for catalog tuples |
1257 | | * (BUILDING_SNAPSHOT), that requires all, already in-progress, |
1258 | | * transactions to finish. Every transaction starting after that |
1259 | | * (FULL_SNAPSHOT state), has enough information to be decoded. But |
1260 | | * for older running transactions no viable snapshot exists yet, so |
1261 | | * CONSISTENT will only be reached once all of those have finished. |
1262 | | * --- |
1263 | | */ |
1264 | | |
1265 | | /* |
1266 | | * xl_running_xacts record is older than what we can use, we might not |
1267 | | * have all necessary catalog rows anymore. |
1268 | | */ |
1269 | 0 | if (TransactionIdIsNormal(builder->initial_xmin_horizon) && |
1270 | 0 | NormalTransactionIdPrecedes(running->oldestRunningXid, |
1271 | 0 | builder->initial_xmin_horizon)) |
1272 | 0 | { |
1273 | 0 | ereport(DEBUG1, |
1274 | 0 | errmsg_internal("skipping snapshot at %X/%08X while building logical decoding snapshot, xmin horizon too low", |
1275 | 0 | LSN_FORMAT_ARGS(lsn)), |
1276 | 0 | errdetail_internal("initial xmin horizon of %u vs the snapshot's %u", |
1277 | 0 | builder->initial_xmin_horizon, running->oldestRunningXid)); |
1278 | | |
1279 | | |
1280 | 0 | SnapBuildWaitSnapshot(running, builder->initial_xmin_horizon); |
1281 | |
|
1282 | 0 | return true; |
1283 | 0 | } |
1284 | | |
1285 | | /* |
1286 | | * a) No transaction were running, we can jump to consistent. |
1287 | | * |
1288 | | * This is not affected by races around xl_running_xacts, because we can |
1289 | | * miss transaction commits, but currently not transactions starting. |
1290 | | * |
1291 | | * NB: We might have already started to incrementally assemble a snapshot, |
1292 | | * so we need to be careful to deal with that. |
1293 | | */ |
1294 | 0 | if (running->oldestRunningXid == running->nextXid) |
1295 | 0 | { |
1296 | 0 | if (builder->start_decoding_at == InvalidXLogRecPtr || |
1297 | 0 | builder->start_decoding_at <= lsn) |
1298 | | /* can decode everything after this */ |
1299 | 0 | builder->start_decoding_at = lsn + 1; |
1300 | | |
1301 | | /* As no transactions were running xmin/xmax can be trivially set. */ |
1302 | 0 | builder->xmin = running->nextXid; /* < are finished */ |
1303 | 0 | builder->xmax = running->nextXid; /* >= are running */ |
1304 | | |
1305 | | /* so we can safely use the faster comparisons */ |
1306 | 0 | Assert(TransactionIdIsNormal(builder->xmin)); |
1307 | 0 | Assert(TransactionIdIsNormal(builder->xmax)); |
1308 | |
|
1309 | 0 | builder->state = SNAPBUILD_CONSISTENT; |
1310 | 0 | builder->next_phase_at = InvalidTransactionId; |
1311 | |
|
1312 | 0 | ereport(LOG, |
1313 | 0 | errmsg("logical decoding found consistent point at %X/%08X", |
1314 | 0 | LSN_FORMAT_ARGS(lsn)), |
1315 | 0 | errdetail("There are no running transactions.")); |
1316 | | |
1317 | 0 | return false; |
1318 | 0 | } |
1319 | | |
1320 | | /* |
1321 | | * b) valid on disk state and while neither building full snapshot nor |
1322 | | * creating a slot. |
1323 | | */ |
1324 | 0 | else if (!builder->building_full_snapshot && |
1325 | 0 | !builder->in_slot_creation && |
1326 | 0 | SnapBuildRestore(builder, lsn)) |
1327 | 0 | { |
1328 | | /* there won't be any state to cleanup */ |
1329 | 0 | return false; |
1330 | 0 | } |
1331 | | |
1332 | | /* |
1333 | | * c) transition from START to BUILDING_SNAPSHOT. |
1334 | | * |
1335 | | * In START state, and a xl_running_xacts record with running xacts is |
1336 | | * encountered. In that case, switch to BUILDING_SNAPSHOT state, and |
1337 | | * record xl_running_xacts->nextXid. Once all running xacts have finished |
1338 | | * (i.e. they're all >= nextXid), we have a complete catalog snapshot. It |
1339 | | * might look that we could use xl_running_xacts's ->xids information to |
1340 | | * get there quicker, but that is problematic because transactions marked |
1341 | | * as running, might already have inserted their commit record - it's |
1342 | | * infeasible to change that with locking. |
1343 | | */ |
1344 | 0 | else if (builder->state == SNAPBUILD_START) |
1345 | 0 | { |
1346 | 0 | builder->state = SNAPBUILD_BUILDING_SNAPSHOT; |
1347 | 0 | builder->next_phase_at = running->nextXid; |
1348 | | |
1349 | | /* |
1350 | | * Start with an xmin/xmax that's correct for future, when all the |
1351 | | * currently running transactions have finished. We'll update both |
1352 | | * while waiting for the pending transactions to finish. |
1353 | | */ |
1354 | 0 | builder->xmin = running->nextXid; /* < are finished */ |
1355 | 0 | builder->xmax = running->nextXid; /* >= are running */ |
1356 | | |
1357 | | /* so we can safely use the faster comparisons */ |
1358 | 0 | Assert(TransactionIdIsNormal(builder->xmin)); |
1359 | 0 | Assert(TransactionIdIsNormal(builder->xmax)); |
1360 | |
|
1361 | 0 | ereport(LOG, |
1362 | 0 | errmsg("logical decoding found initial starting point at %X/%08X", |
1363 | 0 | LSN_FORMAT_ARGS(lsn)), |
1364 | 0 | errdetail("Waiting for transactions (approximately %d) older than %u to end.", |
1365 | 0 | running->xcnt, running->nextXid)); |
1366 | | |
1367 | 0 | SnapBuildWaitSnapshot(running, running->nextXid); |
1368 | 0 | } |
1369 | | |
1370 | | /* |
1371 | | * c) transition from BUILDING_SNAPSHOT to FULL_SNAPSHOT. |
1372 | | * |
1373 | | * In BUILDING_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid |
1374 | | * is >= than nextXid from when we switched to BUILDING_SNAPSHOT. This |
1375 | | * means all transactions starting afterwards have enough information to |
1376 | | * be decoded. Switch to FULL_SNAPSHOT. |
1377 | | */ |
1378 | 0 | else if (builder->state == SNAPBUILD_BUILDING_SNAPSHOT && |
1379 | 0 | TransactionIdPrecedesOrEquals(builder->next_phase_at, |
1380 | 0 | running->oldestRunningXid)) |
1381 | 0 | { |
1382 | 0 | builder->state = SNAPBUILD_FULL_SNAPSHOT; |
1383 | 0 | builder->next_phase_at = running->nextXid; |
1384 | |
|
1385 | 0 | ereport(LOG, |
1386 | 0 | errmsg("logical decoding found initial consistent point at %X/%08X", |
1387 | 0 | LSN_FORMAT_ARGS(lsn)), |
1388 | 0 | errdetail("Waiting for transactions (approximately %d) older than %u to end.", |
1389 | 0 | running->xcnt, running->nextXid)); |
1390 | | |
1391 | 0 | SnapBuildWaitSnapshot(running, running->nextXid); |
1392 | 0 | } |
1393 | | |
1394 | | /* |
1395 | | * c) transition from FULL_SNAPSHOT to CONSISTENT. |
1396 | | * |
1397 | | * In FULL_SNAPSHOT state, and this xl_running_xacts' oldestRunningXid is |
1398 | | * >= than nextXid from when we switched to FULL_SNAPSHOT. This means all |
1399 | | * transactions that are currently in progress have a catalog snapshot, |
1400 | | * and all their changes have been collected. Switch to CONSISTENT. |
1401 | | */ |
1402 | 0 | else if (builder->state == SNAPBUILD_FULL_SNAPSHOT && |
1403 | 0 | TransactionIdPrecedesOrEquals(builder->next_phase_at, |
1404 | 0 | running->oldestRunningXid)) |
1405 | 0 | { |
1406 | 0 | builder->state = SNAPBUILD_CONSISTENT; |
1407 | 0 | builder->next_phase_at = InvalidTransactionId; |
1408 | |
|
1409 | 0 | ereport(LOG, |
1410 | 0 | errmsg("logical decoding found consistent point at %X/%08X", |
1411 | 0 | LSN_FORMAT_ARGS(lsn)), |
1412 | 0 | errdetail("There are no old transactions anymore.")); |
1413 | 0 | } |
1414 | | |
1415 | | /* |
1416 | | * We already started to track running xacts and need to wait for all |
1417 | | * in-progress ones to finish. We fall through to the normal processing of |
1418 | | * records so incremental cleanup can be performed. |
1419 | | */ |
1420 | 0 | return true; |
1421 | 0 | } |
1422 | | |
1423 | | /* --- |
1424 | | * Iterate through xids in record, wait for all older than the cutoff to |
1425 | | * finish. Then, if possible, log a new xl_running_xacts record. |
1426 | | * |
1427 | | * This isn't required for the correctness of decoding, but to: |
1428 | | * a) allow isolationtester to notice that we're currently waiting for |
1429 | | * something. |
1430 | | * b) log a new xl_running_xacts record where it'd be helpful, without having |
1431 | | * to wait for bgwriter or checkpointer. |
1432 | | * --- |
1433 | | */ |
1434 | | static void |
1435 | | SnapBuildWaitSnapshot(xl_running_xacts *running, TransactionId cutoff) |
1436 | 0 | { |
1437 | 0 | int off; |
1438 | |
|
1439 | 0 | for (off = 0; off < running->xcnt; off++) |
1440 | 0 | { |
1441 | 0 | TransactionId xid = running->xids[off]; |
1442 | | |
1443 | | /* |
1444 | | * Upper layers should prevent that we ever need to wait on ourselves. |
1445 | | * Check anyway, since failing to do so would either result in an |
1446 | | * endless wait or an Assert() failure. |
1447 | | */ |
1448 | 0 | if (TransactionIdIsCurrentTransactionId(xid)) |
1449 | 0 | elog(ERROR, "waiting for ourselves"); |
1450 | | |
1451 | 0 | if (TransactionIdFollows(xid, cutoff)) |
1452 | 0 | continue; |
1453 | | |
1454 | 0 | XactLockTableWait(xid, NULL, NULL, XLTW_None); |
1455 | 0 | } |
1456 | | |
1457 | | /* |
1458 | | * All transactions we needed to finish finished - try to ensure there is |
1459 | | * another xl_running_xacts record in a timely manner, without having to |
1460 | | * wait for bgwriter or checkpointer to log one. During recovery we can't |
1461 | | * enforce that, so we'll have to wait. |
1462 | | */ |
1463 | 0 | if (!RecoveryInProgress()) |
1464 | 0 | { |
1465 | 0 | LogStandbySnapshot(); |
1466 | 0 | } |
1467 | 0 | } |
1468 | | |
1469 | | #define SnapBuildOnDiskConstantSize \ |
1470 | 0 | offsetof(SnapBuildOnDisk, builder) |
1471 | | #define SnapBuildOnDiskNotChecksummedSize \ |
1472 | | offsetof(SnapBuildOnDisk, version) |
1473 | | |
1474 | 0 | #define SNAPBUILD_MAGIC 0x51A1E001 |
1475 | 0 | #define SNAPBUILD_VERSION 6 |
1476 | | |
1477 | | /* |
1478 | | * Store/Load a snapshot from disk, depending on the snapshot builder's state. |
1479 | | * |
1480 | | * Supposed to be used by external (i.e. not snapbuild.c) code that just read |
1481 | | * a record that's a potential location for a serialized snapshot. |
1482 | | */ |
1483 | | void |
1484 | | SnapBuildSerializationPoint(SnapBuild *builder, XLogRecPtr lsn) |
1485 | 0 | { |
1486 | 0 | if (builder->state < SNAPBUILD_CONSISTENT) |
1487 | 0 | SnapBuildRestore(builder, lsn); |
1488 | 0 | else |
1489 | 0 | SnapBuildSerialize(builder, lsn); |
1490 | 0 | } |
1491 | | |
1492 | | /* |
1493 | | * Serialize the snapshot 'builder' at the location 'lsn' if it hasn't already |
1494 | | * been done by another decoding process. |
1495 | | */ |
1496 | | static void |
1497 | | SnapBuildSerialize(SnapBuild *builder, XLogRecPtr lsn) |
1498 | 0 | { |
1499 | 0 | Size needed_length; |
1500 | 0 | SnapBuildOnDisk *ondisk = NULL; |
1501 | 0 | TransactionId *catchange_xip = NULL; |
1502 | 0 | MemoryContext old_ctx; |
1503 | 0 | size_t catchange_xcnt; |
1504 | 0 | char *ondisk_c; |
1505 | 0 | int fd; |
1506 | 0 | char tmppath[MAXPGPATH]; |
1507 | 0 | char path[MAXPGPATH]; |
1508 | 0 | int ret; |
1509 | 0 | struct stat stat_buf; |
1510 | 0 | Size sz; |
1511 | |
|
1512 | 0 | Assert(lsn != InvalidXLogRecPtr); |
1513 | 0 | Assert(builder->last_serialized_snapshot == InvalidXLogRecPtr || |
1514 | 0 | builder->last_serialized_snapshot <= lsn); |
1515 | | |
1516 | | /* |
1517 | | * no point in serializing if we cannot continue to work immediately after |
1518 | | * restoring the snapshot |
1519 | | */ |
1520 | 0 | if (builder->state < SNAPBUILD_CONSISTENT) |
1521 | 0 | return; |
1522 | | |
1523 | | /* consistent snapshots have no next phase */ |
1524 | 0 | Assert(builder->next_phase_at == InvalidTransactionId); |
1525 | | |
1526 | | /* |
1527 | | * We identify snapshots by the LSN they are valid for. We don't need to |
1528 | | * include timelines in the name as each LSN maps to exactly one timeline |
1529 | | * unless the user used pg_resetwal or similar. If a user did so, there's |
1530 | | * no hope continuing to decode anyway. |
1531 | | */ |
1532 | 0 | sprintf(path, "%s/%X-%X.snap", |
1533 | 0 | PG_LOGICAL_SNAPSHOTS_DIR, |
1534 | 0 | LSN_FORMAT_ARGS(lsn)); |
1535 | | |
1536 | | /* |
1537 | | * first check whether some other backend already has written the snapshot |
1538 | | * for this LSN. It's perfectly fine if there's none, so we accept ENOENT |
1539 | | * as a valid state. Everything else is an unexpected error. |
1540 | | */ |
1541 | 0 | ret = stat(path, &stat_buf); |
1542 | |
|
1543 | 0 | if (ret != 0 && errno != ENOENT) |
1544 | 0 | ereport(ERROR, |
1545 | 0 | (errcode_for_file_access(), |
1546 | 0 | errmsg("could not stat file \"%s\": %m", path))); |
1547 | | |
1548 | 0 | else if (ret == 0) |
1549 | 0 | { |
1550 | | /* |
1551 | | * somebody else has already serialized to this point, don't overwrite |
1552 | | * but remember location, so we don't need to read old data again. |
1553 | | * |
1554 | | * To be sure it has been synced to disk after the rename() from the |
1555 | | * tempfile filename to the real filename, we just repeat the fsync. |
1556 | | * That ought to be cheap because in most scenarios it should already |
1557 | | * be safely on disk. |
1558 | | */ |
1559 | 0 | fsync_fname(path, false); |
1560 | 0 | fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true); |
1561 | |
|
1562 | 0 | builder->last_serialized_snapshot = lsn; |
1563 | 0 | goto out; |
1564 | 0 | } |
1565 | | |
1566 | | /* |
1567 | | * there is an obvious race condition here between the time we stat(2) the |
1568 | | * file and us writing the file. But we rename the file into place |
1569 | | * atomically and all files created need to contain the same data anyway, |
1570 | | * so this is perfectly fine, although a bit of a resource waste. Locking |
1571 | | * seems like pointless complication. |
1572 | | */ |
1573 | 0 | elog(DEBUG1, "serializing snapshot to %s", path); |
1574 | | |
1575 | | /* to make sure only we will write to this tempfile, include pid */ |
1576 | 0 | sprintf(tmppath, "%s/%X-%X.snap.%d.tmp", |
1577 | 0 | PG_LOGICAL_SNAPSHOTS_DIR, |
1578 | 0 | LSN_FORMAT_ARGS(lsn), MyProcPid); |
1579 | | |
1580 | | /* |
1581 | | * Unlink temporary file if it already exists, needs to have been before a |
1582 | | * crash/error since we won't enter this function twice from within a |
1583 | | * single decoding slot/backend and the temporary file contains the pid of |
1584 | | * the current process. |
1585 | | */ |
1586 | 0 | if (unlink(tmppath) != 0 && errno != ENOENT) |
1587 | 0 | ereport(ERROR, |
1588 | 0 | (errcode_for_file_access(), |
1589 | 0 | errmsg("could not remove file \"%s\": %m", tmppath))); |
1590 | | |
1591 | 0 | old_ctx = MemoryContextSwitchTo(builder->context); |
1592 | | |
1593 | | /* Get the catalog modifying transactions that are yet not committed */ |
1594 | 0 | catchange_xip = ReorderBufferGetCatalogChangesXacts(builder->reorder); |
1595 | 0 | catchange_xcnt = dclist_count(&builder->reorder->catchange_txns); |
1596 | |
|
1597 | 0 | needed_length = sizeof(SnapBuildOnDisk) + |
1598 | 0 | sizeof(TransactionId) * (builder->committed.xcnt + catchange_xcnt); |
1599 | |
|
1600 | 0 | ondisk_c = palloc0(needed_length); |
1601 | 0 | ondisk = (SnapBuildOnDisk *) ondisk_c; |
1602 | 0 | ondisk->magic = SNAPBUILD_MAGIC; |
1603 | 0 | ondisk->version = SNAPBUILD_VERSION; |
1604 | 0 | ondisk->length = needed_length; |
1605 | 0 | INIT_CRC32C(ondisk->checksum); |
1606 | 0 | COMP_CRC32C(ondisk->checksum, |
1607 | 0 | ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, |
1608 | 0 | SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); |
1609 | 0 | ondisk_c += sizeof(SnapBuildOnDisk); |
1610 | |
|
1611 | 0 | memcpy(&ondisk->builder, builder, sizeof(SnapBuild)); |
1612 | | /* NULL-ify memory-only data */ |
1613 | 0 | ondisk->builder.context = NULL; |
1614 | 0 | ondisk->builder.snapshot = NULL; |
1615 | 0 | ondisk->builder.reorder = NULL; |
1616 | 0 | ondisk->builder.committed.xip = NULL; |
1617 | 0 | ondisk->builder.catchange.xip = NULL; |
1618 | | /* update catchange only on disk data */ |
1619 | 0 | ondisk->builder.catchange.xcnt = catchange_xcnt; |
1620 | |
|
1621 | 0 | COMP_CRC32C(ondisk->checksum, |
1622 | 0 | &ondisk->builder, |
1623 | 0 | sizeof(SnapBuild)); |
1624 | | |
1625 | | /* copy committed xacts */ |
1626 | 0 | if (builder->committed.xcnt > 0) |
1627 | 0 | { |
1628 | 0 | sz = sizeof(TransactionId) * builder->committed.xcnt; |
1629 | 0 | memcpy(ondisk_c, builder->committed.xip, sz); |
1630 | 0 | COMP_CRC32C(ondisk->checksum, ondisk_c, sz); |
1631 | 0 | ondisk_c += sz; |
1632 | 0 | } |
1633 | | |
1634 | | /* copy catalog modifying xacts */ |
1635 | 0 | if (catchange_xcnt > 0) |
1636 | 0 | { |
1637 | 0 | sz = sizeof(TransactionId) * catchange_xcnt; |
1638 | 0 | memcpy(ondisk_c, catchange_xip, sz); |
1639 | 0 | COMP_CRC32C(ondisk->checksum, ondisk_c, sz); |
1640 | 0 | ondisk_c += sz; |
1641 | 0 | } |
1642 | |
|
1643 | 0 | FIN_CRC32C(ondisk->checksum); |
1644 | | |
1645 | | /* we have valid data now, open tempfile and write it there */ |
1646 | 0 | fd = OpenTransientFile(tmppath, |
1647 | 0 | O_CREAT | O_EXCL | O_WRONLY | PG_BINARY); |
1648 | 0 | if (fd < 0) |
1649 | 0 | ereport(ERROR, |
1650 | 0 | (errcode_for_file_access(), |
1651 | 0 | errmsg("could not open file \"%s\": %m", tmppath))); |
1652 | | |
1653 | 0 | errno = 0; |
1654 | 0 | pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_WRITE); |
1655 | 0 | if ((write(fd, ondisk, needed_length)) != needed_length) |
1656 | 0 | { |
1657 | 0 | int save_errno = errno; |
1658 | |
|
1659 | 0 | CloseTransientFile(fd); |
1660 | | |
1661 | | /* if write didn't set errno, assume problem is no disk space */ |
1662 | 0 | errno = save_errno ? save_errno : ENOSPC; |
1663 | 0 | ereport(ERROR, |
1664 | 0 | (errcode_for_file_access(), |
1665 | 0 | errmsg("could not write to file \"%s\": %m", tmppath))); |
1666 | 0 | } |
1667 | 0 | pgstat_report_wait_end(); |
1668 | | |
1669 | | /* |
1670 | | * fsync the file before renaming so that even if we crash after this we |
1671 | | * have either a fully valid file or nothing. |
1672 | | * |
1673 | | * It's safe to just ERROR on fsync() here because we'll retry the whole |
1674 | | * operation including the writes. |
1675 | | * |
1676 | | * TODO: Do the fsync() via checkpoints/restartpoints, doing it here has |
1677 | | * some noticeable overhead since it's performed synchronously during |
1678 | | * decoding? |
1679 | | */ |
1680 | 0 | pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_SYNC); |
1681 | 0 | if (pg_fsync(fd) != 0) |
1682 | 0 | { |
1683 | 0 | int save_errno = errno; |
1684 | |
|
1685 | 0 | CloseTransientFile(fd); |
1686 | 0 | errno = save_errno; |
1687 | 0 | ereport(ERROR, |
1688 | 0 | (errcode_for_file_access(), |
1689 | 0 | errmsg("could not fsync file \"%s\": %m", tmppath))); |
1690 | 0 | } |
1691 | 0 | pgstat_report_wait_end(); |
1692 | |
|
1693 | 0 | if (CloseTransientFile(fd) != 0) |
1694 | 0 | ereport(ERROR, |
1695 | 0 | (errcode_for_file_access(), |
1696 | 0 | errmsg("could not close file \"%s\": %m", tmppath))); |
1697 | | |
1698 | 0 | fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true); |
1699 | | |
1700 | | /* |
1701 | | * We may overwrite the work from some other backend, but that's ok, our |
1702 | | * snapshot is valid as well, we'll just have done some superfluous work. |
1703 | | */ |
1704 | 0 | if (rename(tmppath, path) != 0) |
1705 | 0 | { |
1706 | 0 | ereport(ERROR, |
1707 | 0 | (errcode_for_file_access(), |
1708 | 0 | errmsg("could not rename file \"%s\" to \"%s\": %m", |
1709 | 0 | tmppath, path))); |
1710 | 0 | } |
1711 | | |
1712 | | /* make sure we persist */ |
1713 | 0 | fsync_fname(path, false); |
1714 | 0 | fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true); |
1715 | | |
1716 | | /* |
1717 | | * Now there's no way we can lose the dumped state anymore, remember this |
1718 | | * as a serialization point. |
1719 | | */ |
1720 | 0 | builder->last_serialized_snapshot = lsn; |
1721 | |
|
1722 | 0 | MemoryContextSwitchTo(old_ctx); |
1723 | |
|
1724 | 0 | out: |
1725 | 0 | ReorderBufferSetRestartPoint(builder->reorder, |
1726 | 0 | builder->last_serialized_snapshot); |
1727 | | /* be tidy */ |
1728 | 0 | if (ondisk) |
1729 | 0 | pfree(ondisk); |
1730 | 0 | if (catchange_xip) |
1731 | 0 | pfree(catchange_xip); |
1732 | 0 | } |
1733 | | |
1734 | | /* |
1735 | | * Restore the logical snapshot file contents to 'ondisk'. |
1736 | | * |
1737 | | * 'context' is the memory context where the catalog modifying/committed xid |
1738 | | * will live. |
1739 | | * If 'missing_ok' is true, will not throw an error if the file is not found. |
1740 | | */ |
1741 | | bool |
1742 | | SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn, |
1743 | | MemoryContext context, bool missing_ok) |
1744 | 0 | { |
1745 | 0 | int fd; |
1746 | 0 | pg_crc32c checksum; |
1747 | 0 | Size sz; |
1748 | 0 | char path[MAXPGPATH]; |
1749 | |
|
1750 | 0 | sprintf(path, "%s/%X-%X.snap", |
1751 | 0 | PG_LOGICAL_SNAPSHOTS_DIR, |
1752 | 0 | LSN_FORMAT_ARGS(lsn)); |
1753 | |
|
1754 | 0 | fd = OpenTransientFile(path, O_RDONLY | PG_BINARY); |
1755 | |
|
1756 | 0 | if (fd < 0) |
1757 | 0 | { |
1758 | 0 | if (missing_ok && errno == ENOENT) |
1759 | 0 | return false; |
1760 | | |
1761 | 0 | ereport(ERROR, |
1762 | 0 | (errcode_for_file_access(), |
1763 | 0 | errmsg("could not open file \"%s\": %m", path))); |
1764 | 0 | } |
1765 | | |
1766 | | /* ---- |
1767 | | * Make sure the snapshot had been stored safely to disk, that's normally |
1768 | | * cheap. |
1769 | | * Note that we do not need PANIC here, nobody will be able to use the |
1770 | | * slot without fsyncing, and saving it won't succeed without an fsync() |
1771 | | * either... |
1772 | | * ---- |
1773 | | */ |
1774 | 0 | fsync_fname(path, false); |
1775 | 0 | fsync_fname(PG_LOGICAL_SNAPSHOTS_DIR, true); |
1776 | | |
1777 | | /* read statically sized portion of snapshot */ |
1778 | 0 | SnapBuildRestoreContents(fd, ondisk, SnapBuildOnDiskConstantSize, path); |
1779 | |
|
1780 | 0 | if (ondisk->magic != SNAPBUILD_MAGIC) |
1781 | 0 | ereport(ERROR, |
1782 | 0 | (errcode(ERRCODE_DATA_CORRUPTED), |
1783 | 0 | errmsg("snapbuild state file \"%s\" has wrong magic number: %u instead of %u", |
1784 | 0 | path, ondisk->magic, SNAPBUILD_MAGIC))); |
1785 | | |
1786 | 0 | if (ondisk->version != SNAPBUILD_VERSION) |
1787 | 0 | ereport(ERROR, |
1788 | 0 | (errcode(ERRCODE_DATA_CORRUPTED), |
1789 | 0 | errmsg("snapbuild state file \"%s\" has unsupported version: %u instead of %u", |
1790 | 0 | path, ondisk->version, SNAPBUILD_VERSION))); |
1791 | | |
1792 | 0 | INIT_CRC32C(checksum); |
1793 | 0 | COMP_CRC32C(checksum, |
1794 | 0 | ((char *) ondisk) + SnapBuildOnDiskNotChecksummedSize, |
1795 | 0 | SnapBuildOnDiskConstantSize - SnapBuildOnDiskNotChecksummedSize); |
1796 | | |
1797 | | /* read SnapBuild */ |
1798 | 0 | SnapBuildRestoreContents(fd, &ondisk->builder, sizeof(SnapBuild), path); |
1799 | 0 | COMP_CRC32C(checksum, &ondisk->builder, sizeof(SnapBuild)); |
1800 | | |
1801 | | /* restore committed xacts information */ |
1802 | 0 | if (ondisk->builder.committed.xcnt > 0) |
1803 | 0 | { |
1804 | 0 | sz = sizeof(TransactionId) * ondisk->builder.committed.xcnt; |
1805 | 0 | ondisk->builder.committed.xip = MemoryContextAllocZero(context, sz); |
1806 | 0 | SnapBuildRestoreContents(fd, ondisk->builder.committed.xip, sz, path); |
1807 | 0 | COMP_CRC32C(checksum, ondisk->builder.committed.xip, sz); |
1808 | 0 | } |
1809 | | |
1810 | | /* restore catalog modifying xacts information */ |
1811 | 0 | if (ondisk->builder.catchange.xcnt > 0) |
1812 | 0 | { |
1813 | 0 | sz = sizeof(TransactionId) * ondisk->builder.catchange.xcnt; |
1814 | 0 | ondisk->builder.catchange.xip = MemoryContextAllocZero(context, sz); |
1815 | 0 | SnapBuildRestoreContents(fd, ondisk->builder.catchange.xip, sz, path); |
1816 | 0 | COMP_CRC32C(checksum, ondisk->builder.catchange.xip, sz); |
1817 | 0 | } |
1818 | |
|
1819 | 0 | if (CloseTransientFile(fd) != 0) |
1820 | 0 | ereport(ERROR, |
1821 | 0 | (errcode_for_file_access(), |
1822 | 0 | errmsg("could not close file \"%s\": %m", path))); |
1823 | | |
1824 | 0 | FIN_CRC32C(checksum); |
1825 | | |
1826 | | /* verify checksum of what we've read */ |
1827 | 0 | if (!EQ_CRC32C(checksum, ondisk->checksum)) |
1828 | 0 | ereport(ERROR, |
1829 | 0 | (errcode(ERRCODE_DATA_CORRUPTED), |
1830 | 0 | errmsg("checksum mismatch for snapbuild state file \"%s\": is %u, should be %u", |
1831 | 0 | path, checksum, ondisk->checksum))); |
1832 | | |
1833 | 0 | return true; |
1834 | 0 | } |
1835 | | |
1836 | | /* |
1837 | | * Restore a snapshot into 'builder' if previously one has been stored at the |
1838 | | * location indicated by 'lsn'. Returns true if successful, false otherwise. |
1839 | | */ |
1840 | | static bool |
1841 | | SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn) |
1842 | 0 | { |
1843 | 0 | SnapBuildOnDisk ondisk; |
1844 | | |
1845 | | /* no point in loading a snapshot if we're already there */ |
1846 | 0 | if (builder->state == SNAPBUILD_CONSISTENT) |
1847 | 0 | return false; |
1848 | | |
1849 | | /* validate and restore the snapshot to 'ondisk' */ |
1850 | 0 | if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true)) |
1851 | 0 | return false; |
1852 | | |
1853 | | /* |
1854 | | * ok, we now have a sensible snapshot here, figure out if it has more |
1855 | | * information than we have. |
1856 | | */ |
1857 | | |
1858 | | /* |
1859 | | * We are only interested in consistent snapshots for now, comparing |
1860 | | * whether one incomplete snapshot is more "advanced" seems to be |
1861 | | * unnecessarily complex. |
1862 | | */ |
1863 | 0 | if (ondisk.builder.state < SNAPBUILD_CONSISTENT) |
1864 | 0 | goto snapshot_not_interesting; |
1865 | | |
1866 | | /* |
1867 | | * Don't use a snapshot that requires an xmin that we cannot guarantee to |
1868 | | * be available. |
1869 | | */ |
1870 | 0 | if (TransactionIdPrecedes(ondisk.builder.xmin, builder->initial_xmin_horizon)) |
1871 | 0 | goto snapshot_not_interesting; |
1872 | | |
1873 | | /* |
1874 | | * Consistent snapshots have no next phase. Reset next_phase_at as it is |
1875 | | * possible that an old value may remain. |
1876 | | */ |
1877 | 0 | Assert(ondisk.builder.next_phase_at == InvalidTransactionId); |
1878 | 0 | builder->next_phase_at = InvalidTransactionId; |
1879 | | |
1880 | | /* ok, we think the snapshot is sensible, copy over everything important */ |
1881 | 0 | builder->xmin = ondisk.builder.xmin; |
1882 | 0 | builder->xmax = ondisk.builder.xmax; |
1883 | 0 | builder->state = ondisk.builder.state; |
1884 | |
|
1885 | 0 | builder->committed.xcnt = ondisk.builder.committed.xcnt; |
1886 | | /* We only allocated/stored xcnt, not xcnt_space xids ! */ |
1887 | | /* don't overwrite preallocated xip, if we don't have anything here */ |
1888 | 0 | if (builder->committed.xcnt > 0) |
1889 | 0 | { |
1890 | 0 | pfree(builder->committed.xip); |
1891 | 0 | builder->committed.xcnt_space = ondisk.builder.committed.xcnt; |
1892 | 0 | builder->committed.xip = ondisk.builder.committed.xip; |
1893 | 0 | } |
1894 | 0 | ondisk.builder.committed.xip = NULL; |
1895 | | |
1896 | | /* set catalog modifying transactions */ |
1897 | 0 | if (builder->catchange.xip) |
1898 | 0 | pfree(builder->catchange.xip); |
1899 | 0 | builder->catchange.xcnt = ondisk.builder.catchange.xcnt; |
1900 | 0 | builder->catchange.xip = ondisk.builder.catchange.xip; |
1901 | 0 | ondisk.builder.catchange.xip = NULL; |
1902 | | |
1903 | | /* our snapshot is not interesting anymore, build a new one */ |
1904 | 0 | if (builder->snapshot != NULL) |
1905 | 0 | { |
1906 | 0 | SnapBuildSnapDecRefcount(builder->snapshot); |
1907 | 0 | } |
1908 | 0 | builder->snapshot = SnapBuildBuildSnapshot(builder); |
1909 | 0 | SnapBuildSnapIncRefcount(builder->snapshot); |
1910 | |
|
1911 | 0 | ReorderBufferSetRestartPoint(builder->reorder, lsn); |
1912 | |
|
1913 | 0 | Assert(builder->state == SNAPBUILD_CONSISTENT); |
1914 | |
|
1915 | 0 | ereport(LOG, |
1916 | 0 | errmsg("logical decoding found consistent point at %X/%08X", |
1917 | 0 | LSN_FORMAT_ARGS(lsn)), |
1918 | 0 | errdetail("Logical decoding will begin using saved snapshot.")); |
1919 | 0 | return true; |
1920 | | |
1921 | 0 | snapshot_not_interesting: |
1922 | 0 | if (ondisk.builder.committed.xip != NULL) |
1923 | 0 | pfree(ondisk.builder.committed.xip); |
1924 | 0 | if (ondisk.builder.catchange.xip != NULL) |
1925 | 0 | pfree(ondisk.builder.catchange.xip); |
1926 | 0 | return false; |
1927 | 0 | } |
1928 | | |
1929 | | /* |
1930 | | * Read the contents of the serialized snapshot to 'dest'. |
1931 | | */ |
1932 | | static void |
1933 | | SnapBuildRestoreContents(int fd, void *dest, Size size, const char *path) |
1934 | 0 | { |
1935 | 0 | int readBytes; |
1936 | |
|
1937 | 0 | pgstat_report_wait_start(WAIT_EVENT_SNAPBUILD_READ); |
1938 | 0 | readBytes = read(fd, dest, size); |
1939 | 0 | pgstat_report_wait_end(); |
1940 | 0 | if (readBytes != size) |
1941 | 0 | { |
1942 | 0 | int save_errno = errno; |
1943 | |
|
1944 | 0 | CloseTransientFile(fd); |
1945 | |
|
1946 | 0 | if (readBytes < 0) |
1947 | 0 | { |
1948 | 0 | errno = save_errno; |
1949 | 0 | ereport(ERROR, |
1950 | 0 | (errcode_for_file_access(), |
1951 | 0 | errmsg("could not read file \"%s\": %m", path))); |
1952 | 0 | } |
1953 | 0 | else |
1954 | 0 | ereport(ERROR, |
1955 | 0 | (errcode(ERRCODE_DATA_CORRUPTED), |
1956 | 0 | errmsg("could not read file \"%s\": read %d of %zu", |
1957 | 0 | path, readBytes, size))); |
1958 | 0 | } |
1959 | 0 | } |
1960 | | |
1961 | | /* |
1962 | | * Remove all serialized snapshots that are not required anymore because no |
1963 | | * slot can need them. This doesn't actually have to run during a checkpoint, |
1964 | | * but it's a convenient point to schedule this. |
1965 | | * |
1966 | | * NB: We run this during checkpoints even if logical decoding is disabled so |
1967 | | * we cleanup old slots at some point after it got disabled. |
1968 | | */ |
1969 | | void |
1970 | | CheckPointSnapBuild(void) |
1971 | 0 | { |
1972 | 0 | XLogRecPtr cutoff; |
1973 | 0 | XLogRecPtr redo; |
1974 | 0 | DIR *snap_dir; |
1975 | 0 | struct dirent *snap_de; |
1976 | 0 | char path[MAXPGPATH + sizeof(PG_LOGICAL_SNAPSHOTS_DIR)]; |
1977 | | |
1978 | | /* |
1979 | | * We start off with a minimum of the last redo pointer. No new |
1980 | | * replication slot will start before that, so that's a safe upper bound |
1981 | | * for removal. |
1982 | | */ |
1983 | 0 | redo = GetRedoRecPtr(); |
1984 | | |
1985 | | /* now check for the restart ptrs from existing slots */ |
1986 | 0 | cutoff = ReplicationSlotsComputeLogicalRestartLSN(); |
1987 | | |
1988 | | /* don't start earlier than the restart lsn */ |
1989 | 0 | if (redo < cutoff) |
1990 | 0 | cutoff = redo; |
1991 | |
|
1992 | 0 | snap_dir = AllocateDir(PG_LOGICAL_SNAPSHOTS_DIR); |
1993 | 0 | while ((snap_de = ReadDir(snap_dir, PG_LOGICAL_SNAPSHOTS_DIR)) != NULL) |
1994 | 0 | { |
1995 | 0 | uint32 hi; |
1996 | 0 | uint32 lo; |
1997 | 0 | XLogRecPtr lsn; |
1998 | 0 | PGFileType de_type; |
1999 | |
|
2000 | 0 | if (strcmp(snap_de->d_name, ".") == 0 || |
2001 | 0 | strcmp(snap_de->d_name, "..") == 0) |
2002 | 0 | continue; |
2003 | | |
2004 | 0 | snprintf(path, sizeof(path), "%s/%s", PG_LOGICAL_SNAPSHOTS_DIR, snap_de->d_name); |
2005 | 0 | de_type = get_dirent_type(path, snap_de, false, DEBUG1); |
2006 | |
|
2007 | 0 | if (de_type != PGFILETYPE_ERROR && de_type != PGFILETYPE_REG) |
2008 | 0 | { |
2009 | 0 | elog(DEBUG1, "only regular files expected: %s", path); |
2010 | 0 | continue; |
2011 | 0 | } |
2012 | | |
2013 | | /* |
2014 | | * temporary filenames from SnapBuildSerialize() include the LSN and |
2015 | | * everything but are postfixed by .$pid.tmp. We can just remove them |
2016 | | * the same as other files because there can be none that are |
2017 | | * currently being written that are older than cutoff. |
2018 | | * |
2019 | | * We just log a message if a file doesn't fit the pattern, it's |
2020 | | * probably some editors lock/state file or similar... |
2021 | | */ |
2022 | 0 | if (sscanf(snap_de->d_name, "%X-%X.snap", &hi, &lo) != 2) |
2023 | 0 | { |
2024 | 0 | ereport(LOG, |
2025 | 0 | (errmsg("could not parse file name \"%s\"", path))); |
2026 | 0 | continue; |
2027 | 0 | } |
2028 | | |
2029 | 0 | lsn = ((uint64) hi) << 32 | lo; |
2030 | | |
2031 | | /* check whether we still need it */ |
2032 | 0 | if (lsn < cutoff || cutoff == InvalidXLogRecPtr) |
2033 | 0 | { |
2034 | 0 | elog(DEBUG1, "removing snapbuild snapshot %s", path); |
2035 | | |
2036 | | /* |
2037 | | * It's not particularly harmful, though strange, if we can't |
2038 | | * remove the file here. Don't prevent the checkpoint from |
2039 | | * completing, that'd be a cure worse than the disease. |
2040 | | */ |
2041 | 0 | if (unlink(path) < 0) |
2042 | 0 | { |
2043 | 0 | ereport(LOG, |
2044 | 0 | (errcode_for_file_access(), |
2045 | 0 | errmsg("could not remove file \"%s\": %m", |
2046 | 0 | path))); |
2047 | 0 | continue; |
2048 | 0 | } |
2049 | 0 | } |
2050 | 0 | } |
2051 | 0 | FreeDir(snap_dir); |
2052 | 0 | } |
2053 | | |
2054 | | /* |
2055 | | * Check if a logical snapshot at the specified point has been serialized. |
2056 | | */ |
2057 | | bool |
2058 | | SnapBuildSnapshotExists(XLogRecPtr lsn) |
2059 | 0 | { |
2060 | 0 | char path[MAXPGPATH]; |
2061 | 0 | int ret; |
2062 | 0 | struct stat stat_buf; |
2063 | |
|
2064 | 0 | sprintf(path, "%s/%08X-%08X.snap", |
2065 | 0 | PG_LOGICAL_SNAPSHOTS_DIR, |
2066 | 0 | LSN_FORMAT_ARGS(lsn)); |
2067 | |
|
2068 | 0 | ret = stat(path, &stat_buf); |
2069 | |
|
2070 | 0 | if (ret != 0 && errno != ENOENT) |
2071 | 0 | ereport(ERROR, |
2072 | 0 | (errcode_for_file_access(), |
2073 | 0 | errmsg("could not stat file \"%s\": %m", path))); |
2074 | | |
2075 | 0 | return ret == 0; |
2076 | 0 | } |