/src/postgres/src/backend/access/transam/xlogreader.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * xlogreader.c |
4 | | * Generic XLog reading facility |
5 | | * |
6 | | * Portions Copyright (c) 2013-2025, PostgreSQL Global Development Group |
7 | | * |
8 | | * IDENTIFICATION |
9 | | * src/backend/access/transam/xlogreader.c |
10 | | * |
11 | | * NOTES |
12 | | * See xlogreader.h for more notes on this facility. |
13 | | * |
14 | | * This file is compiled as both front-end and backend code, so it |
15 | | * may not use ereport, server-defined static variables, etc. |
16 | | *------------------------------------------------------------------------- |
17 | | */ |
18 | | #include "postgres.h" |
19 | | |
20 | | #include <unistd.h> |
21 | | #ifdef USE_LZ4 |
22 | | #include <lz4.h> |
23 | | #endif |
24 | | #ifdef USE_ZSTD |
25 | | #include <zstd.h> |
26 | | #endif |
27 | | |
28 | | #include "access/transam.h" |
29 | | #include "access/xlog_internal.h" |
30 | | #include "access/xlogreader.h" |
31 | | #include "access/xlogrecord.h" |
32 | | #include "catalog/pg_control.h" |
33 | | #include "common/pg_lzcompress.h" |
34 | | #include "replication/origin.h" |
35 | | |
36 | | #ifndef FRONTEND |
37 | | #include "pgstat.h" |
38 | | #include "storage/bufmgr.h" |
39 | | #else |
40 | | #include "common/logging.h" |
41 | | #endif |
42 | | |
43 | | static void report_invalid_record(XLogReaderState *state, const char *fmt,...) |
44 | | pg_attribute_printf(2, 3); |
45 | | static void allocate_recordbuf(XLogReaderState *state, uint32 reclength); |
46 | | static int ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, |
47 | | int reqLen); |
48 | | static void XLogReaderInvalReadState(XLogReaderState *state); |
49 | | static XLogPageReadResult XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking); |
50 | | static bool ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, |
51 | | XLogRecPtr PrevRecPtr, XLogRecord *record, bool randAccess); |
52 | | static bool ValidXLogRecord(XLogReaderState *state, XLogRecord *record, |
53 | | XLogRecPtr recptr); |
54 | | static void ResetDecoder(XLogReaderState *state); |
55 | | static void WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, |
56 | | int segsize, const char *waldir); |
57 | | |
58 | | /* size of the buffer allocated for error message. */ |
59 | 0 | #define MAX_ERRORMSG_LEN 1000 |
60 | | |
61 | | /* |
62 | | * Default size; large enough that typical users of XLogReader won't often need |
63 | | * to use the 'oversized' memory allocation code path. |
64 | | */ |
65 | 0 | #define DEFAULT_DECODE_BUFFER_SIZE (64 * 1024) |
66 | | |
67 | | /* |
68 | | * Construct a string in state->errormsg_buf explaining what's wrong with |
69 | | * the current record being read. |
70 | | */ |
71 | | static void |
72 | | report_invalid_record(XLogReaderState *state, const char *fmt,...) |
73 | 0 | { |
74 | 0 | va_list args; |
75 | |
|
76 | 0 | fmt = _(fmt); |
77 | |
|
78 | 0 | va_start(args, fmt); |
79 | 0 | vsnprintf(state->errormsg_buf, MAX_ERRORMSG_LEN, fmt, args); |
80 | 0 | va_end(args); |
81 | |
|
82 | 0 | state->errormsg_deferred = true; |
83 | 0 | } |
84 | | |
85 | | /* |
86 | | * Set the size of the decoding buffer. A pointer to a caller supplied memory |
87 | | * region may also be passed in, in which case non-oversized records will be |
88 | | * decoded there. |
89 | | */ |
90 | | void |
91 | | XLogReaderSetDecodeBuffer(XLogReaderState *state, void *buffer, size_t size) |
92 | 0 | { |
93 | 0 | Assert(state->decode_buffer == NULL); |
94 | |
|
95 | 0 | state->decode_buffer = buffer; |
96 | 0 | state->decode_buffer_size = size; |
97 | 0 | state->decode_buffer_tail = buffer; |
98 | 0 | state->decode_buffer_head = buffer; |
99 | 0 | } |
100 | | |
101 | | /* |
102 | | * Allocate and initialize a new XLogReader. |
103 | | * |
104 | | * Returns NULL if the xlogreader couldn't be allocated. |
105 | | */ |
106 | | XLogReaderState * |
107 | | XLogReaderAllocate(int wal_segment_size, const char *waldir, |
108 | | XLogReaderRoutine *routine, void *private_data) |
109 | 0 | { |
110 | 0 | XLogReaderState *state; |
111 | |
|
112 | 0 | state = (XLogReaderState *) |
113 | 0 | palloc_extended(sizeof(XLogReaderState), |
114 | 0 | MCXT_ALLOC_NO_OOM | MCXT_ALLOC_ZERO); |
115 | 0 | if (!state) |
116 | 0 | return NULL; |
117 | | |
118 | | /* initialize caller-provided support functions */ |
119 | 0 | state->routine = *routine; |
120 | | |
121 | | /* |
122 | | * Permanently allocate readBuf. We do it this way, rather than just |
123 | | * making a static array, for two reasons: (1) no need to waste the |
124 | | * storage in most instantiations of the backend; (2) a static char array |
125 | | * isn't guaranteed to have any particular alignment, whereas |
126 | | * palloc_extended() will provide MAXALIGN'd storage. |
127 | | */ |
128 | 0 | state->readBuf = (char *) palloc_extended(XLOG_BLCKSZ, |
129 | 0 | MCXT_ALLOC_NO_OOM); |
130 | 0 | if (!state->readBuf) |
131 | 0 | { |
132 | 0 | pfree(state); |
133 | 0 | return NULL; |
134 | 0 | } |
135 | | |
136 | | /* Initialize segment info. */ |
137 | 0 | WALOpenSegmentInit(&state->seg, &state->segcxt, wal_segment_size, |
138 | 0 | waldir); |
139 | | |
140 | | /* system_identifier initialized to zeroes above */ |
141 | 0 | state->private_data = private_data; |
142 | | /* ReadRecPtr, EndRecPtr and readLen initialized to zeroes above */ |
143 | 0 | state->errormsg_buf = palloc_extended(MAX_ERRORMSG_LEN + 1, |
144 | 0 | MCXT_ALLOC_NO_OOM); |
145 | 0 | if (!state->errormsg_buf) |
146 | 0 | { |
147 | 0 | pfree(state->readBuf); |
148 | 0 | pfree(state); |
149 | 0 | return NULL; |
150 | 0 | } |
151 | 0 | state->errormsg_buf[0] = '\0'; |
152 | | |
153 | | /* |
154 | | * Allocate an initial readRecordBuf of minimal size, which can later be |
155 | | * enlarged if necessary. |
156 | | */ |
157 | 0 | allocate_recordbuf(state, 0); |
158 | 0 | return state; |
159 | 0 | } |
160 | | |
161 | | void |
162 | | XLogReaderFree(XLogReaderState *state) |
163 | 0 | { |
164 | 0 | if (state->seg.ws_file != -1) |
165 | 0 | state->routine.segment_close(state); |
166 | |
|
167 | 0 | if (state->decode_buffer && state->free_decode_buffer) |
168 | 0 | pfree(state->decode_buffer); |
169 | |
|
170 | 0 | pfree(state->errormsg_buf); |
171 | 0 | if (state->readRecordBuf) |
172 | 0 | pfree(state->readRecordBuf); |
173 | 0 | pfree(state->readBuf); |
174 | 0 | pfree(state); |
175 | 0 | } |
176 | | |
177 | | /* |
178 | | * Allocate readRecordBuf to fit a record of at least the given length. |
179 | | * |
180 | | * readRecordBufSize is set to the new buffer size. |
181 | | * |
182 | | * To avoid useless small increases, round its size to a multiple of |
183 | | * XLOG_BLCKSZ, and make sure it's at least 5*Max(BLCKSZ, XLOG_BLCKSZ) to start |
184 | | * with. (That is enough for all "normal" records, but very large commit or |
185 | | * abort records might need more space.) |
186 | | * |
187 | | * Note: This routine should *never* be called for xl_tot_len until the header |
188 | | * of the record has been fully validated. |
189 | | */ |
190 | | static void |
191 | | allocate_recordbuf(XLogReaderState *state, uint32 reclength) |
192 | 0 | { |
193 | 0 | uint32 newSize = reclength; |
194 | |
|
195 | 0 | newSize += XLOG_BLCKSZ - (newSize % XLOG_BLCKSZ); |
196 | 0 | newSize = Max(newSize, 5 * Max(BLCKSZ, XLOG_BLCKSZ)); |
197 | |
|
198 | 0 | if (state->readRecordBuf) |
199 | 0 | pfree(state->readRecordBuf); |
200 | 0 | state->readRecordBuf = (char *) palloc(newSize); |
201 | 0 | state->readRecordBufSize = newSize; |
202 | 0 | } |
203 | | |
204 | | /* |
205 | | * Initialize the passed segment structs. |
206 | | */ |
207 | | static void |
208 | | WALOpenSegmentInit(WALOpenSegment *seg, WALSegmentContext *segcxt, |
209 | | int segsize, const char *waldir) |
210 | 0 | { |
211 | 0 | seg->ws_file = -1; |
212 | 0 | seg->ws_segno = 0; |
213 | 0 | seg->ws_tli = 0; |
214 | |
|
215 | 0 | segcxt->ws_segsize = segsize; |
216 | 0 | if (waldir) |
217 | 0 | snprintf(segcxt->ws_dir, MAXPGPATH, "%s", waldir); |
218 | 0 | } |
219 | | |
220 | | /* |
221 | | * Begin reading WAL at 'RecPtr'. |
222 | | * |
223 | | * 'RecPtr' should point to the beginning of a valid WAL record. Pointing at |
224 | | * the beginning of a page is also OK, if there is a new record right after |
225 | | * the page header, i.e. not a continuation. |
226 | | * |
227 | | * This does not make any attempt to read the WAL yet, and hence cannot fail. |
228 | | * If the starting address is not correct, the first call to XLogReadRecord() |
229 | | * will error out. |
230 | | */ |
231 | | void |
232 | | XLogBeginRead(XLogReaderState *state, XLogRecPtr RecPtr) |
233 | 0 | { |
234 | 0 | Assert(!XLogRecPtrIsInvalid(RecPtr)); |
235 | |
|
236 | 0 | ResetDecoder(state); |
237 | | |
238 | | /* Begin at the passed-in record pointer. */ |
239 | 0 | state->EndRecPtr = RecPtr; |
240 | 0 | state->NextRecPtr = RecPtr; |
241 | 0 | state->ReadRecPtr = InvalidXLogRecPtr; |
242 | 0 | state->DecodeRecPtr = InvalidXLogRecPtr; |
243 | 0 | } |
244 | | |
245 | | /* |
246 | | * Release the last record that was returned by XLogNextRecord(), if any, to |
247 | | * free up space. Returns the LSN past the end of the record. |
248 | | */ |
249 | | XLogRecPtr |
250 | | XLogReleasePreviousRecord(XLogReaderState *state) |
251 | 0 | { |
252 | 0 | DecodedXLogRecord *record; |
253 | 0 | XLogRecPtr next_lsn; |
254 | |
|
255 | 0 | if (!state->record) |
256 | 0 | return InvalidXLogRecPtr; |
257 | | |
258 | | /* |
259 | | * Remove it from the decoded record queue. It must be the oldest item |
260 | | * decoded, decode_queue_head. |
261 | | */ |
262 | 0 | record = state->record; |
263 | 0 | next_lsn = record->next_lsn; |
264 | 0 | Assert(record == state->decode_queue_head); |
265 | 0 | state->record = NULL; |
266 | 0 | state->decode_queue_head = record->next; |
267 | | |
268 | | /* It might also be the newest item decoded, decode_queue_tail. */ |
269 | 0 | if (state->decode_queue_tail == record) |
270 | 0 | state->decode_queue_tail = NULL; |
271 | | |
272 | | /* Release the space. */ |
273 | 0 | if (unlikely(record->oversized)) |
274 | 0 | { |
275 | | /* It's not in the decode buffer, so free it to release space. */ |
276 | 0 | pfree(record); |
277 | 0 | } |
278 | 0 | else |
279 | 0 | { |
280 | | /* It must be the head (oldest) record in the decode buffer. */ |
281 | 0 | Assert(state->decode_buffer_head == (char *) record); |
282 | | |
283 | | /* |
284 | | * We need to update head to point to the next record that is in the |
285 | | * decode buffer, if any, being careful to skip oversized ones |
286 | | * (they're not in the decode buffer). |
287 | | */ |
288 | 0 | record = record->next; |
289 | 0 | while (unlikely(record && record->oversized)) |
290 | 0 | record = record->next; |
291 | |
|
292 | 0 | if (record) |
293 | 0 | { |
294 | | /* Adjust head to release space up to the next record. */ |
295 | 0 | state->decode_buffer_head = (char *) record; |
296 | 0 | } |
297 | 0 | else |
298 | 0 | { |
299 | | /* |
300 | | * Otherwise we might as well just reset head and tail to the |
301 | | * start of the buffer space, because we're empty. This means |
302 | | * we'll keep overwriting the same piece of memory if we're not |
303 | | * doing any prefetching. |
304 | | */ |
305 | 0 | state->decode_buffer_head = state->decode_buffer; |
306 | 0 | state->decode_buffer_tail = state->decode_buffer; |
307 | 0 | } |
308 | 0 | } |
309 | |
|
310 | 0 | return next_lsn; |
311 | 0 | } |
312 | | |
313 | | /* |
314 | | * Attempt to read an XLOG record. |
315 | | * |
316 | | * XLogBeginRead() or XLogFindNextRecord() and then XLogReadAhead() must be |
317 | | * called before the first call to XLogNextRecord(). This functions returns |
318 | | * records and errors that were put into an internal queue by XLogReadAhead(). |
319 | | * |
320 | | * On success, a record is returned. |
321 | | * |
322 | | * The returned record (or *errormsg) points to an internal buffer that's |
323 | | * valid until the next call to XLogNextRecord. |
324 | | */ |
325 | | DecodedXLogRecord * |
326 | | XLogNextRecord(XLogReaderState *state, char **errormsg) |
327 | 0 | { |
328 | | /* Release the last record returned by XLogNextRecord(). */ |
329 | 0 | XLogReleasePreviousRecord(state); |
330 | |
|
331 | 0 | if (state->decode_queue_head == NULL) |
332 | 0 | { |
333 | 0 | *errormsg = NULL; |
334 | 0 | if (state->errormsg_deferred) |
335 | 0 | { |
336 | 0 | if (state->errormsg_buf[0] != '\0') |
337 | 0 | *errormsg = state->errormsg_buf; |
338 | 0 | state->errormsg_deferred = false; |
339 | 0 | } |
340 | | |
341 | | /* |
342 | | * state->EndRecPtr is expected to have been set by the last call to |
343 | | * XLogBeginRead() or XLogNextRecord(), and is the location of the |
344 | | * error. |
345 | | */ |
346 | 0 | Assert(!XLogRecPtrIsInvalid(state->EndRecPtr)); |
347 | |
|
348 | 0 | return NULL; |
349 | 0 | } |
350 | | |
351 | | /* |
352 | | * Record this as the most recent record returned, so that we'll release |
353 | | * it next time. This also exposes it to the traditional |
354 | | * XLogRecXXX(xlogreader) macros, which work with the decoder rather than |
355 | | * the record for historical reasons. |
356 | | */ |
357 | 0 | state->record = state->decode_queue_head; |
358 | | |
359 | | /* |
360 | | * Update the pointers to the beginning and one-past-the-end of this |
361 | | * record, again for the benefit of historical code that expected the |
362 | | * decoder to track this rather than accessing these fields of the record |
363 | | * itself. |
364 | | */ |
365 | 0 | state->ReadRecPtr = state->record->lsn; |
366 | 0 | state->EndRecPtr = state->record->next_lsn; |
367 | |
|
368 | 0 | *errormsg = NULL; |
369 | |
|
370 | 0 | return state->record; |
371 | 0 | } |
372 | | |
373 | | /* |
374 | | * Attempt to read an XLOG record. |
375 | | * |
376 | | * XLogBeginRead() or XLogFindNextRecord() must be called before the first call |
377 | | * to XLogReadRecord(). |
378 | | * |
379 | | * If the page_read callback fails to read the requested data, NULL is |
380 | | * returned. The callback is expected to have reported the error; errormsg |
381 | | * is set to NULL. |
382 | | * |
383 | | * If the reading fails for some other reason, NULL is also returned, and |
384 | | * *errormsg is set to a string with details of the failure. |
385 | | * |
386 | | * The returned pointer (or *errormsg) points to an internal buffer that's |
387 | | * valid until the next call to XLogReadRecord. |
388 | | */ |
389 | | XLogRecord * |
390 | | XLogReadRecord(XLogReaderState *state, char **errormsg) |
391 | 0 | { |
392 | 0 | DecodedXLogRecord *decoded; |
393 | | |
394 | | /* |
395 | | * Release last returned record, if there is one. We need to do this so |
396 | | * that we can check for empty decode queue accurately. |
397 | | */ |
398 | 0 | XLogReleasePreviousRecord(state); |
399 | | |
400 | | /* |
401 | | * Call XLogReadAhead() in blocking mode to make sure there is something |
402 | | * in the queue, though we don't use the result. |
403 | | */ |
404 | 0 | if (!XLogReaderHasQueuedRecordOrError(state)) |
405 | 0 | XLogReadAhead(state, false /* nonblocking */ ); |
406 | | |
407 | | /* Consume the head record or error. */ |
408 | 0 | decoded = XLogNextRecord(state, errormsg); |
409 | 0 | if (decoded) |
410 | 0 | { |
411 | | /* |
412 | | * This function returns a pointer to the record's header, not the |
413 | | * actual decoded record. The caller will access the decoded record |
414 | | * through the XLogRecGetXXX() macros, which reach the decoded |
415 | | * recorded as xlogreader->record. |
416 | | */ |
417 | 0 | Assert(state->record == decoded); |
418 | 0 | return &decoded->header; |
419 | 0 | } |
420 | | |
421 | 0 | return NULL; |
422 | 0 | } |
423 | | |
424 | | /* |
425 | | * Allocate space for a decoded record. The only member of the returned |
426 | | * object that is initialized is the 'oversized' flag, indicating that the |
427 | | * decoded record wouldn't fit in the decode buffer and must eventually be |
428 | | * freed explicitly. |
429 | | * |
430 | | * The caller is responsible for adjusting decode_buffer_tail with the real |
431 | | * size after successfully decoding a record into this space. This way, if |
432 | | * decoding fails, then there is nothing to undo unless the 'oversized' flag |
433 | | * was set and pfree() must be called. |
434 | | * |
435 | | * Return NULL if there is no space in the decode buffer and allow_oversized |
436 | | * is false, or if memory allocation fails for an oversized buffer. |
437 | | */ |
438 | | static DecodedXLogRecord * |
439 | | XLogReadRecordAlloc(XLogReaderState *state, size_t xl_tot_len, bool allow_oversized) |
440 | 0 | { |
441 | 0 | size_t required_space = DecodeXLogRecordRequiredSpace(xl_tot_len); |
442 | 0 | DecodedXLogRecord *decoded = NULL; |
443 | | |
444 | | /* Allocate a circular decode buffer if we don't have one already. */ |
445 | 0 | if (unlikely(state->decode_buffer == NULL)) |
446 | 0 | { |
447 | 0 | if (state->decode_buffer_size == 0) |
448 | 0 | state->decode_buffer_size = DEFAULT_DECODE_BUFFER_SIZE; |
449 | 0 | state->decode_buffer = palloc(state->decode_buffer_size); |
450 | 0 | state->decode_buffer_head = state->decode_buffer; |
451 | 0 | state->decode_buffer_tail = state->decode_buffer; |
452 | 0 | state->free_decode_buffer = true; |
453 | 0 | } |
454 | | |
455 | | /* Try to allocate space in the circular decode buffer. */ |
456 | 0 | if (state->decode_buffer_tail >= state->decode_buffer_head) |
457 | 0 | { |
458 | | /* Empty, or tail is to the right of head. */ |
459 | 0 | if (required_space <= |
460 | 0 | state->decode_buffer_size - |
461 | 0 | (state->decode_buffer_tail - state->decode_buffer)) |
462 | 0 | { |
463 | | /*- |
464 | | * There is space between tail and end. |
465 | | * |
466 | | * +-----+--------------------+-----+ |
467 | | * | |////////////////////|here!| |
468 | | * +-----+--------------------+-----+ |
469 | | * ^ ^ |
470 | | * | | |
471 | | * h t |
472 | | */ |
473 | 0 | decoded = (DecodedXLogRecord *) state->decode_buffer_tail; |
474 | 0 | decoded->oversized = false; |
475 | 0 | return decoded; |
476 | 0 | } |
477 | 0 | else if (required_space < |
478 | 0 | state->decode_buffer_head - state->decode_buffer) |
479 | 0 | { |
480 | | /*- |
481 | | * There is space between start and head. |
482 | | * |
483 | | * +-----+--------------------+-----+ |
484 | | * |here!|////////////////////| | |
485 | | * +-----+--------------------+-----+ |
486 | | * ^ ^ |
487 | | * | | |
488 | | * h t |
489 | | */ |
490 | 0 | decoded = (DecodedXLogRecord *) state->decode_buffer; |
491 | 0 | decoded->oversized = false; |
492 | 0 | return decoded; |
493 | 0 | } |
494 | 0 | } |
495 | 0 | else |
496 | 0 | { |
497 | | /* Tail is to the left of head. */ |
498 | 0 | if (required_space < |
499 | 0 | state->decode_buffer_head - state->decode_buffer_tail) |
500 | 0 | { |
501 | | /*- |
502 | | * There is space between tail and head. |
503 | | * |
504 | | * +-----+--------------------+-----+ |
505 | | * |/////|here! |/////| |
506 | | * +-----+--------------------+-----+ |
507 | | * ^ ^ |
508 | | * | | |
509 | | * t h |
510 | | */ |
511 | 0 | decoded = (DecodedXLogRecord *) state->decode_buffer_tail; |
512 | 0 | decoded->oversized = false; |
513 | 0 | return decoded; |
514 | 0 | } |
515 | 0 | } |
516 | | |
517 | | /* Not enough space in the decode buffer. Are we allowed to allocate? */ |
518 | 0 | if (allow_oversized) |
519 | 0 | { |
520 | 0 | decoded = palloc(required_space); |
521 | 0 | decoded->oversized = true; |
522 | 0 | return decoded; |
523 | 0 | } |
524 | | |
525 | 0 | return NULL; |
526 | 0 | } |
527 | | |
528 | | static XLogPageReadResult |
529 | | XLogDecodeNextRecord(XLogReaderState *state, bool nonblocking) |
530 | 0 | { |
531 | 0 | XLogRecPtr RecPtr; |
532 | 0 | XLogRecord *record; |
533 | 0 | XLogRecPtr targetPagePtr; |
534 | 0 | bool randAccess; |
535 | 0 | uint32 len, |
536 | 0 | total_len; |
537 | 0 | uint32 targetRecOff; |
538 | 0 | uint32 pageHeaderSize; |
539 | 0 | bool assembled; |
540 | 0 | bool gotheader; |
541 | 0 | int readOff; |
542 | 0 | DecodedXLogRecord *decoded; |
543 | 0 | char *errormsg; /* not used */ |
544 | | |
545 | | /* |
546 | | * randAccess indicates whether to verify the previous-record pointer of |
547 | | * the record we're reading. We only do this if we're reading |
548 | | * sequentially, which is what we initially assume. |
549 | | */ |
550 | 0 | randAccess = false; |
551 | | |
552 | | /* reset error state */ |
553 | 0 | state->errormsg_buf[0] = '\0'; |
554 | 0 | decoded = NULL; |
555 | |
|
556 | 0 | state->abortedRecPtr = InvalidXLogRecPtr; |
557 | 0 | state->missingContrecPtr = InvalidXLogRecPtr; |
558 | |
|
559 | 0 | RecPtr = state->NextRecPtr; |
560 | |
|
561 | 0 | if (state->DecodeRecPtr != InvalidXLogRecPtr) |
562 | 0 | { |
563 | | /* read the record after the one we just read */ |
564 | | |
565 | | /* |
566 | | * NextRecPtr is pointing to end+1 of the previous WAL record. If |
567 | | * we're at a page boundary, no more records can fit on the current |
568 | | * page. We must skip over the page header, but we can't do that until |
569 | | * we've read in the page, since the header size is variable. |
570 | | */ |
571 | 0 | } |
572 | 0 | else |
573 | 0 | { |
574 | | /* |
575 | | * Caller supplied a position to start at. |
576 | | * |
577 | | * In this case, NextRecPtr should already be pointing either to a |
578 | | * valid record starting position or alternatively to the beginning of |
579 | | * a page. See the header comments for XLogBeginRead. |
580 | | */ |
581 | 0 | Assert(RecPtr % XLOG_BLCKSZ == 0 || XRecOffIsValid(RecPtr)); |
582 | 0 | randAccess = true; |
583 | 0 | } |
584 | |
|
585 | 0 | restart: |
586 | 0 | state->nonblocking = nonblocking; |
587 | 0 | state->currRecPtr = RecPtr; |
588 | 0 | assembled = false; |
589 | |
|
590 | 0 | targetPagePtr = RecPtr - (RecPtr % XLOG_BLCKSZ); |
591 | 0 | targetRecOff = RecPtr % XLOG_BLCKSZ; |
592 | | |
593 | | /* |
594 | | * Read the page containing the record into state->readBuf. Request enough |
595 | | * byte to cover the whole record header, or at least the part of it that |
596 | | * fits on the same page. |
597 | | */ |
598 | 0 | readOff = ReadPageInternal(state, targetPagePtr, |
599 | 0 | Min(targetRecOff + SizeOfXLogRecord, XLOG_BLCKSZ)); |
600 | 0 | if (readOff == XLREAD_WOULDBLOCK) |
601 | 0 | return XLREAD_WOULDBLOCK; |
602 | 0 | else if (readOff < 0) |
603 | 0 | goto err; |
604 | | |
605 | | /* |
606 | | * ReadPageInternal always returns at least the page header, so we can |
607 | | * examine it now. |
608 | | */ |
609 | 0 | pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); |
610 | 0 | if (targetRecOff == 0) |
611 | 0 | { |
612 | | /* |
613 | | * At page start, so skip over page header. |
614 | | */ |
615 | 0 | RecPtr += pageHeaderSize; |
616 | 0 | targetRecOff = pageHeaderSize; |
617 | 0 | } |
618 | 0 | else if (targetRecOff < pageHeaderSize) |
619 | 0 | { |
620 | 0 | report_invalid_record(state, "invalid record offset at %X/%X: expected at least %u, got %u", |
621 | 0 | LSN_FORMAT_ARGS(RecPtr), |
622 | 0 | pageHeaderSize, targetRecOff); |
623 | 0 | goto err; |
624 | 0 | } |
625 | | |
626 | 0 | if ((((XLogPageHeader) state->readBuf)->xlp_info & XLP_FIRST_IS_CONTRECORD) && |
627 | 0 | targetRecOff == pageHeaderSize) |
628 | 0 | { |
629 | 0 | report_invalid_record(state, "contrecord is requested by %X/%X", |
630 | 0 | LSN_FORMAT_ARGS(RecPtr)); |
631 | 0 | goto err; |
632 | 0 | } |
633 | | |
634 | | /* ReadPageInternal has verified the page header */ |
635 | 0 | Assert(pageHeaderSize <= readOff); |
636 | | |
637 | | /* |
638 | | * Read the record length. |
639 | | * |
640 | | * NB: Even though we use an XLogRecord pointer here, the whole record |
641 | | * header might not fit on this page. xl_tot_len is the first field of the |
642 | | * struct, so it must be on this page (the records are MAXALIGNed), but we |
643 | | * cannot access any other fields until we've verified that we got the |
644 | | * whole header. |
645 | | */ |
646 | 0 | record = (XLogRecord *) (state->readBuf + RecPtr % XLOG_BLCKSZ); |
647 | 0 | total_len = record->xl_tot_len; |
648 | | |
649 | | /* |
650 | | * If the whole record header is on this page, validate it immediately. |
651 | | * Otherwise do just a basic sanity check on xl_tot_len, and validate the |
652 | | * rest of the header after reading it from the next page. The xl_tot_len |
653 | | * check is necessary here to ensure that we enter the "Need to reassemble |
654 | | * record" code path below; otherwise we might fail to apply |
655 | | * ValidXLogRecordHeader at all. |
656 | | */ |
657 | 0 | if (targetRecOff <= XLOG_BLCKSZ - SizeOfXLogRecord) |
658 | 0 | { |
659 | 0 | if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, record, |
660 | 0 | randAccess)) |
661 | 0 | goto err; |
662 | 0 | gotheader = true; |
663 | 0 | } |
664 | 0 | else |
665 | 0 | { |
666 | | /* There may be no next page if it's too small. */ |
667 | 0 | if (total_len < SizeOfXLogRecord) |
668 | 0 | { |
669 | 0 | report_invalid_record(state, |
670 | 0 | "invalid record length at %X/%X: expected at least %u, got %u", |
671 | 0 | LSN_FORMAT_ARGS(RecPtr), |
672 | 0 | (uint32) SizeOfXLogRecord, total_len); |
673 | 0 | goto err; |
674 | 0 | } |
675 | | /* We'll validate the header once we have the next page. */ |
676 | 0 | gotheader = false; |
677 | 0 | } |
678 | | |
679 | | /* |
680 | | * Try to find space to decode this record, if we can do so without |
681 | | * calling palloc. If we can't, we'll try again below after we've |
682 | | * validated that total_len isn't garbage bytes from a recycled WAL page. |
683 | | */ |
684 | 0 | decoded = XLogReadRecordAlloc(state, |
685 | 0 | total_len, |
686 | 0 | false /* allow_oversized */ ); |
687 | 0 | if (decoded == NULL && nonblocking) |
688 | 0 | { |
689 | | /* |
690 | | * There is no space in the circular decode buffer, and the caller is |
691 | | * only reading ahead. The caller should consume existing records to |
692 | | * make space. |
693 | | */ |
694 | 0 | return XLREAD_WOULDBLOCK; |
695 | 0 | } |
696 | | |
697 | 0 | len = XLOG_BLCKSZ - RecPtr % XLOG_BLCKSZ; |
698 | 0 | if (total_len > len) |
699 | 0 | { |
700 | | /* Need to reassemble record */ |
701 | 0 | char *contdata; |
702 | 0 | XLogPageHeader pageHeader; |
703 | 0 | char *buffer; |
704 | 0 | uint32 gotlen; |
705 | |
|
706 | 0 | assembled = true; |
707 | | |
708 | | /* |
709 | | * We always have space for a couple of pages, enough to validate a |
710 | | * boundary-spanning record header. |
711 | | */ |
712 | 0 | Assert(state->readRecordBufSize >= XLOG_BLCKSZ * 2); |
713 | 0 | Assert(state->readRecordBufSize >= len); |
714 | | |
715 | | /* Copy the first fragment of the record from the first page. */ |
716 | 0 | memcpy(state->readRecordBuf, |
717 | 0 | state->readBuf + RecPtr % XLOG_BLCKSZ, len); |
718 | 0 | buffer = state->readRecordBuf + len; |
719 | 0 | gotlen = len; |
720 | |
|
721 | 0 | do |
722 | 0 | { |
723 | | /* Calculate pointer to beginning of next page */ |
724 | 0 | targetPagePtr += XLOG_BLCKSZ; |
725 | | |
726 | | /* Wait for the next page to become available */ |
727 | 0 | readOff = ReadPageInternal(state, targetPagePtr, |
728 | 0 | Min(total_len - gotlen + SizeOfXLogShortPHD, |
729 | 0 | XLOG_BLCKSZ)); |
730 | |
|
731 | 0 | if (readOff == XLREAD_WOULDBLOCK) |
732 | 0 | return XLREAD_WOULDBLOCK; |
733 | 0 | else if (readOff < 0) |
734 | 0 | goto err; |
735 | | |
736 | 0 | Assert(SizeOfXLogShortPHD <= readOff); |
737 | |
|
738 | 0 | pageHeader = (XLogPageHeader) state->readBuf; |
739 | | |
740 | | /* |
741 | | * If we were expecting a continuation record and got an |
742 | | * "overwrite contrecord" flag, that means the continuation record |
743 | | * was overwritten with a different record. Restart the read by |
744 | | * assuming the address to read is the location where we found |
745 | | * this flag; but keep track of the LSN of the record we were |
746 | | * reading, for later verification. |
747 | | */ |
748 | 0 | if (pageHeader->xlp_info & XLP_FIRST_IS_OVERWRITE_CONTRECORD) |
749 | 0 | { |
750 | 0 | state->overwrittenRecPtr = RecPtr; |
751 | 0 | RecPtr = targetPagePtr; |
752 | 0 | goto restart; |
753 | 0 | } |
754 | | |
755 | | /* Check that the continuation on next page looks valid */ |
756 | 0 | if (!(pageHeader->xlp_info & XLP_FIRST_IS_CONTRECORD)) |
757 | 0 | { |
758 | 0 | report_invalid_record(state, |
759 | 0 | "there is no contrecord flag at %X/%X", |
760 | 0 | LSN_FORMAT_ARGS(RecPtr)); |
761 | 0 | goto err; |
762 | 0 | } |
763 | | |
764 | | /* |
765 | | * Cross-check that xlp_rem_len agrees with how much of the record |
766 | | * we expect there to be left. |
767 | | */ |
768 | 0 | if (pageHeader->xlp_rem_len == 0 || |
769 | 0 | total_len != (pageHeader->xlp_rem_len + gotlen)) |
770 | 0 | { |
771 | 0 | report_invalid_record(state, |
772 | 0 | "invalid contrecord length %u (expected %lld) at %X/%X", |
773 | 0 | pageHeader->xlp_rem_len, |
774 | 0 | ((long long) total_len) - gotlen, |
775 | 0 | LSN_FORMAT_ARGS(RecPtr)); |
776 | 0 | goto err; |
777 | 0 | } |
778 | | |
779 | | /* Append the continuation from this page to the buffer */ |
780 | 0 | pageHeaderSize = XLogPageHeaderSize(pageHeader); |
781 | |
|
782 | 0 | if (readOff < pageHeaderSize) |
783 | 0 | readOff = ReadPageInternal(state, targetPagePtr, |
784 | 0 | pageHeaderSize); |
785 | |
|
786 | 0 | Assert(pageHeaderSize <= readOff); |
787 | |
|
788 | 0 | contdata = (char *) state->readBuf + pageHeaderSize; |
789 | 0 | len = XLOG_BLCKSZ - pageHeaderSize; |
790 | 0 | if (pageHeader->xlp_rem_len < len) |
791 | 0 | len = pageHeader->xlp_rem_len; |
792 | |
|
793 | 0 | if (readOff < pageHeaderSize + len) |
794 | 0 | readOff = ReadPageInternal(state, targetPagePtr, |
795 | 0 | pageHeaderSize + len); |
796 | |
|
797 | 0 | memcpy(buffer, contdata, len); |
798 | 0 | buffer += len; |
799 | 0 | gotlen += len; |
800 | | |
801 | | /* If we just reassembled the record header, validate it. */ |
802 | 0 | if (!gotheader) |
803 | 0 | { |
804 | 0 | record = (XLogRecord *) state->readRecordBuf; |
805 | 0 | if (!ValidXLogRecordHeader(state, RecPtr, state->DecodeRecPtr, |
806 | 0 | record, randAccess)) |
807 | 0 | goto err; |
808 | 0 | gotheader = true; |
809 | 0 | } |
810 | | |
811 | | /* |
812 | | * We might need a bigger buffer. We have validated the record |
813 | | * header, in the case that it split over a page boundary. We've |
814 | | * also cross-checked total_len against xlp_rem_len on the second |
815 | | * page, and verified xlp_pageaddr on both. |
816 | | */ |
817 | 0 | if (total_len > state->readRecordBufSize) |
818 | 0 | { |
819 | 0 | char save_copy[XLOG_BLCKSZ * 2]; |
820 | | |
821 | | /* |
822 | | * Save and restore the data we already had. It can't be more |
823 | | * than two pages. |
824 | | */ |
825 | 0 | Assert(gotlen <= lengthof(save_copy)); |
826 | 0 | Assert(gotlen <= state->readRecordBufSize); |
827 | 0 | memcpy(save_copy, state->readRecordBuf, gotlen); |
828 | 0 | allocate_recordbuf(state, total_len); |
829 | 0 | memcpy(state->readRecordBuf, save_copy, gotlen); |
830 | 0 | buffer = state->readRecordBuf + gotlen; |
831 | 0 | } |
832 | 0 | } while (gotlen < total_len); |
833 | 0 | Assert(gotheader); |
834 | |
|
835 | 0 | record = (XLogRecord *) state->readRecordBuf; |
836 | 0 | if (!ValidXLogRecord(state, record, RecPtr)) |
837 | 0 | goto err; |
838 | | |
839 | 0 | pageHeaderSize = XLogPageHeaderSize((XLogPageHeader) state->readBuf); |
840 | 0 | state->DecodeRecPtr = RecPtr; |
841 | 0 | state->NextRecPtr = targetPagePtr + pageHeaderSize |
842 | 0 | + MAXALIGN(pageHeader->xlp_rem_len); |
843 | 0 | } |
844 | 0 | else |
845 | 0 | { |
846 | | /* Wait for the record data to become available */ |
847 | 0 | readOff = ReadPageInternal(state, targetPagePtr, |
848 | 0 | Min(targetRecOff + total_len, XLOG_BLCKSZ)); |
849 | 0 | if (readOff == XLREAD_WOULDBLOCK) |
850 | 0 | return XLREAD_WOULDBLOCK; |
851 | 0 | else if (readOff < 0) |
852 | 0 | goto err; |
853 | | |
854 | | /* Record does not cross a page boundary */ |
855 | 0 | if (!ValidXLogRecord(state, record, RecPtr)) |
856 | 0 | goto err; |
857 | | |
858 | 0 | state->NextRecPtr = RecPtr + MAXALIGN(total_len); |
859 | |
|
860 | 0 | state->DecodeRecPtr = RecPtr; |
861 | 0 | } |
862 | | |
863 | | /* |
864 | | * Special processing if it's an XLOG SWITCH record |
865 | | */ |
866 | 0 | if (record->xl_rmid == RM_XLOG_ID && |
867 | 0 | (record->xl_info & ~XLR_INFO_MASK) == XLOG_SWITCH) |
868 | 0 | { |
869 | | /* Pretend it extends to end of segment */ |
870 | 0 | state->NextRecPtr += state->segcxt.ws_segsize - 1; |
871 | 0 | state->NextRecPtr -= XLogSegmentOffset(state->NextRecPtr, state->segcxt.ws_segsize); |
872 | 0 | } |
873 | | |
874 | | /* |
875 | | * If we got here without a DecodedXLogRecord, it means we needed to |
876 | | * validate total_len before trusting it, but by now we've done that. |
877 | | */ |
878 | 0 | if (decoded == NULL) |
879 | 0 | { |
880 | 0 | Assert(!nonblocking); |
881 | 0 | decoded = XLogReadRecordAlloc(state, |
882 | 0 | total_len, |
883 | 0 | true /* allow_oversized */ ); |
884 | | /* allocation should always happen under allow_oversized */ |
885 | 0 | Assert(decoded != NULL); |
886 | 0 | } |
887 | |
|
888 | 0 | if (DecodeXLogRecord(state, decoded, record, RecPtr, &errormsg)) |
889 | 0 | { |
890 | | /* Record the location of the next record. */ |
891 | 0 | decoded->next_lsn = state->NextRecPtr; |
892 | | |
893 | | /* |
894 | | * If it's in the decode buffer, mark the decode buffer space as |
895 | | * occupied. |
896 | | */ |
897 | 0 | if (!decoded->oversized) |
898 | 0 | { |
899 | | /* The new decode buffer head must be MAXALIGNed. */ |
900 | 0 | Assert(decoded->size == MAXALIGN(decoded->size)); |
901 | 0 | if ((char *) decoded == state->decode_buffer) |
902 | 0 | state->decode_buffer_tail = state->decode_buffer + decoded->size; |
903 | 0 | else |
904 | 0 | state->decode_buffer_tail += decoded->size; |
905 | 0 | } |
906 | | |
907 | | /* Insert it into the queue of decoded records. */ |
908 | 0 | Assert(state->decode_queue_tail != decoded); |
909 | 0 | if (state->decode_queue_tail) |
910 | 0 | state->decode_queue_tail->next = decoded; |
911 | 0 | state->decode_queue_tail = decoded; |
912 | 0 | if (!state->decode_queue_head) |
913 | 0 | state->decode_queue_head = decoded; |
914 | 0 | return XLREAD_SUCCESS; |
915 | 0 | } |
916 | | |
917 | 0 | err: |
918 | 0 | if (assembled) |
919 | 0 | { |
920 | | /* |
921 | | * We get here when a record that spans multiple pages needs to be |
922 | | * assembled, but something went wrong -- perhaps a contrecord piece |
923 | | * was lost. If caller is WAL replay, it will know where the aborted |
924 | | * record was and where to direct followup WAL to be written, marking |
925 | | * the next piece with XLP_FIRST_IS_OVERWRITE_CONTRECORD, which will |
926 | | * in turn signal downstream WAL consumers that the broken WAL record |
927 | | * is to be ignored. |
928 | | */ |
929 | 0 | state->abortedRecPtr = RecPtr; |
930 | 0 | state->missingContrecPtr = targetPagePtr; |
931 | | |
932 | | /* |
933 | | * If we got here without reporting an error, make sure an error is |
934 | | * queued so that XLogPrefetcherReadRecord() doesn't bring us back a |
935 | | * second time and clobber the above state. |
936 | | */ |
937 | 0 | state->errormsg_deferred = true; |
938 | 0 | } |
939 | |
|
940 | 0 | if (decoded && decoded->oversized) |
941 | 0 | pfree(decoded); |
942 | | |
943 | | /* |
944 | | * Invalidate the read state. We might read from a different source after |
945 | | * failure. |
946 | | */ |
947 | 0 | XLogReaderInvalReadState(state); |
948 | | |
949 | | /* |
950 | | * If an error was written to errormsg_buf, it'll be returned to the |
951 | | * caller of XLogReadRecord() after all successfully decoded records from |
952 | | * the read queue. |
953 | | */ |
954 | |
|
955 | 0 | return XLREAD_FAIL; |
956 | 0 | } |
957 | | |
958 | | /* |
959 | | * Try to decode the next available record, and return it. The record will |
960 | | * also be returned to XLogNextRecord(), which must be called to 'consume' |
961 | | * each record. |
962 | | * |
963 | | * If nonblocking is true, may return NULL due to lack of data or WAL decoding |
964 | | * space. |
965 | | */ |
966 | | DecodedXLogRecord * |
967 | | XLogReadAhead(XLogReaderState *state, bool nonblocking) |
968 | 0 | { |
969 | 0 | XLogPageReadResult result; |
970 | |
|
971 | 0 | if (state->errormsg_deferred) |
972 | 0 | return NULL; |
973 | | |
974 | 0 | result = XLogDecodeNextRecord(state, nonblocking); |
975 | 0 | if (result == XLREAD_SUCCESS) |
976 | 0 | { |
977 | 0 | Assert(state->decode_queue_tail != NULL); |
978 | 0 | return state->decode_queue_tail; |
979 | 0 | } |
980 | | |
981 | 0 | return NULL; |
982 | 0 | } |
983 | | |
984 | | /* |
985 | | * Read a single xlog page including at least [pageptr, reqLen] of valid data |
986 | | * via the page_read() callback. |
987 | | * |
988 | | * Returns XLREAD_FAIL if the required page cannot be read for some |
989 | | * reason; errormsg_buf is set in that case (unless the error occurs in the |
990 | | * page_read callback). |
991 | | * |
992 | | * Returns XLREAD_WOULDBLOCK if the requested data can't be read without |
993 | | * waiting. This can be returned only if the installed page_read callback |
994 | | * respects the state->nonblocking flag, and cannot read the requested data |
995 | | * immediately. |
996 | | * |
997 | | * We fetch the page from a reader-local cache if we know we have the required |
998 | | * data and if there hasn't been any error since caching the data. |
999 | | */ |
1000 | | static int |
1001 | | ReadPageInternal(XLogReaderState *state, XLogRecPtr pageptr, int reqLen) |
1002 | 0 | { |
1003 | 0 | int readLen; |
1004 | 0 | uint32 targetPageOff; |
1005 | 0 | XLogSegNo targetSegNo; |
1006 | 0 | XLogPageHeader hdr; |
1007 | |
|
1008 | 0 | Assert((pageptr % XLOG_BLCKSZ) == 0); |
1009 | |
|
1010 | 0 | XLByteToSeg(pageptr, targetSegNo, state->segcxt.ws_segsize); |
1011 | 0 | targetPageOff = XLogSegmentOffset(pageptr, state->segcxt.ws_segsize); |
1012 | | |
1013 | | /* check whether we have all the requested data already */ |
1014 | 0 | if (targetSegNo == state->seg.ws_segno && |
1015 | 0 | targetPageOff == state->segoff && reqLen <= state->readLen) |
1016 | 0 | return state->readLen; |
1017 | | |
1018 | | /* |
1019 | | * Invalidate contents of internal buffer before read attempt. Just set |
1020 | | * the length to 0, rather than a full XLogReaderInvalReadState(), so we |
1021 | | * don't forget the segment we last successfully read. |
1022 | | */ |
1023 | 0 | state->readLen = 0; |
1024 | | |
1025 | | /* |
1026 | | * Data is not in our buffer. |
1027 | | * |
1028 | | * Every time we actually read the segment, even if we looked at parts of |
1029 | | * it before, we need to do verification as the page_read callback might |
1030 | | * now be rereading data from a different source. |
1031 | | * |
1032 | | * Whenever switching to a new WAL segment, we read the first page of the |
1033 | | * file and validate its header, even if that's not where the target |
1034 | | * record is. This is so that we can check the additional identification |
1035 | | * info that is present in the first page's "long" header. |
1036 | | */ |
1037 | 0 | if (targetSegNo != state->seg.ws_segno && targetPageOff != 0) |
1038 | 0 | { |
1039 | 0 | XLogRecPtr targetSegmentPtr = pageptr - targetPageOff; |
1040 | |
|
1041 | 0 | readLen = state->routine.page_read(state, targetSegmentPtr, XLOG_BLCKSZ, |
1042 | 0 | state->currRecPtr, |
1043 | 0 | state->readBuf); |
1044 | 0 | if (readLen == XLREAD_WOULDBLOCK) |
1045 | 0 | return XLREAD_WOULDBLOCK; |
1046 | 0 | else if (readLen < 0) |
1047 | 0 | goto err; |
1048 | | |
1049 | | /* we can be sure to have enough WAL available, we scrolled back */ |
1050 | 0 | Assert(readLen == XLOG_BLCKSZ); |
1051 | |
|
1052 | 0 | if (!XLogReaderValidatePageHeader(state, targetSegmentPtr, |
1053 | 0 | state->readBuf)) |
1054 | 0 | goto err; |
1055 | 0 | } |
1056 | | |
1057 | | /* |
1058 | | * First, read the requested data length, but at least a short page header |
1059 | | * so that we can validate it. |
1060 | | */ |
1061 | 0 | readLen = state->routine.page_read(state, pageptr, Max(reqLen, SizeOfXLogShortPHD), |
1062 | 0 | state->currRecPtr, |
1063 | 0 | state->readBuf); |
1064 | 0 | if (readLen == XLREAD_WOULDBLOCK) |
1065 | 0 | return XLREAD_WOULDBLOCK; |
1066 | 0 | else if (readLen < 0) |
1067 | 0 | goto err; |
1068 | | |
1069 | 0 | Assert(readLen <= XLOG_BLCKSZ); |
1070 | | |
1071 | | /* Do we have enough data to check the header length? */ |
1072 | 0 | if (readLen <= SizeOfXLogShortPHD) |
1073 | 0 | goto err; |
1074 | | |
1075 | 0 | Assert(readLen >= reqLen); |
1076 | |
|
1077 | 0 | hdr = (XLogPageHeader) state->readBuf; |
1078 | | |
1079 | | /* still not enough */ |
1080 | 0 | if (readLen < XLogPageHeaderSize(hdr)) |
1081 | 0 | { |
1082 | 0 | readLen = state->routine.page_read(state, pageptr, XLogPageHeaderSize(hdr), |
1083 | 0 | state->currRecPtr, |
1084 | 0 | state->readBuf); |
1085 | 0 | if (readLen == XLREAD_WOULDBLOCK) |
1086 | 0 | return XLREAD_WOULDBLOCK; |
1087 | 0 | else if (readLen < 0) |
1088 | 0 | goto err; |
1089 | 0 | } |
1090 | | |
1091 | | /* |
1092 | | * Now that we know we have the full header, validate it. |
1093 | | */ |
1094 | 0 | if (!XLogReaderValidatePageHeader(state, pageptr, (char *) hdr)) |
1095 | 0 | goto err; |
1096 | | |
1097 | | /* update read state information */ |
1098 | 0 | state->seg.ws_segno = targetSegNo; |
1099 | 0 | state->segoff = targetPageOff; |
1100 | 0 | state->readLen = readLen; |
1101 | |
|
1102 | 0 | return readLen; |
1103 | | |
1104 | 0 | err: |
1105 | 0 | XLogReaderInvalReadState(state); |
1106 | |
|
1107 | 0 | return XLREAD_FAIL; |
1108 | 0 | } |
1109 | | |
1110 | | /* |
1111 | | * Invalidate the xlogreader's read state to force a re-read. |
1112 | | */ |
1113 | | static void |
1114 | | XLogReaderInvalReadState(XLogReaderState *state) |
1115 | 0 | { |
1116 | 0 | state->seg.ws_segno = 0; |
1117 | 0 | state->segoff = 0; |
1118 | 0 | state->readLen = 0; |
1119 | 0 | } |
1120 | | |
1121 | | /* |
1122 | | * Validate an XLOG record header. |
1123 | | * |
1124 | | * This is just a convenience subroutine to avoid duplicated code in |
1125 | | * XLogReadRecord. It's not intended for use from anywhere else. |
1126 | | */ |
1127 | | static bool |
1128 | | ValidXLogRecordHeader(XLogReaderState *state, XLogRecPtr RecPtr, |
1129 | | XLogRecPtr PrevRecPtr, XLogRecord *record, |
1130 | | bool randAccess) |
1131 | 0 | { |
1132 | 0 | if (record->xl_tot_len < SizeOfXLogRecord) |
1133 | 0 | { |
1134 | 0 | report_invalid_record(state, |
1135 | 0 | "invalid record length at %X/%X: expected at least %u, got %u", |
1136 | 0 | LSN_FORMAT_ARGS(RecPtr), |
1137 | 0 | (uint32) SizeOfXLogRecord, record->xl_tot_len); |
1138 | 0 | return false; |
1139 | 0 | } |
1140 | 0 | if (!RmgrIdIsValid(record->xl_rmid)) |
1141 | 0 | { |
1142 | 0 | report_invalid_record(state, |
1143 | 0 | "invalid resource manager ID %u at %X/%X", |
1144 | 0 | record->xl_rmid, LSN_FORMAT_ARGS(RecPtr)); |
1145 | 0 | return false; |
1146 | 0 | } |
1147 | 0 | if (randAccess) |
1148 | 0 | { |
1149 | | /* |
1150 | | * We can't exactly verify the prev-link, but surely it should be less |
1151 | | * than the record's own address. |
1152 | | */ |
1153 | 0 | if (!(record->xl_prev < RecPtr)) |
1154 | 0 | { |
1155 | 0 | report_invalid_record(state, |
1156 | 0 | "record with incorrect prev-link %X/%X at %X/%X", |
1157 | 0 | LSN_FORMAT_ARGS(record->xl_prev), |
1158 | 0 | LSN_FORMAT_ARGS(RecPtr)); |
1159 | 0 | return false; |
1160 | 0 | } |
1161 | 0 | } |
1162 | 0 | else |
1163 | 0 | { |
1164 | | /* |
1165 | | * Record's prev-link should exactly match our previous location. This |
1166 | | * check guards against torn WAL pages where a stale but valid-looking |
1167 | | * WAL record starts on a sector boundary. |
1168 | | */ |
1169 | 0 | if (record->xl_prev != PrevRecPtr) |
1170 | 0 | { |
1171 | 0 | report_invalid_record(state, |
1172 | 0 | "record with incorrect prev-link %X/%X at %X/%X", |
1173 | 0 | LSN_FORMAT_ARGS(record->xl_prev), |
1174 | 0 | LSN_FORMAT_ARGS(RecPtr)); |
1175 | 0 | return false; |
1176 | 0 | } |
1177 | 0 | } |
1178 | | |
1179 | 0 | return true; |
1180 | 0 | } |
1181 | | |
1182 | | |
1183 | | /* |
1184 | | * CRC-check an XLOG record. We do not believe the contents of an XLOG |
1185 | | * record (other than to the minimal extent of computing the amount of |
1186 | | * data to read in) until we've checked the CRCs. |
1187 | | * |
1188 | | * We assume all of the record (that is, xl_tot_len bytes) has been read |
1189 | | * into memory at *record. Also, ValidXLogRecordHeader() has accepted the |
1190 | | * record's header, which means in particular that xl_tot_len is at least |
1191 | | * SizeOfXLogRecord. |
1192 | | */ |
1193 | | static bool |
1194 | | ValidXLogRecord(XLogReaderState *state, XLogRecord *record, XLogRecPtr recptr) |
1195 | 0 | { |
1196 | 0 | pg_crc32c crc; |
1197 | |
|
1198 | 0 | Assert(record->xl_tot_len >= SizeOfXLogRecord); |
1199 | | |
1200 | | /* Calculate the CRC */ |
1201 | 0 | INIT_CRC32C(crc); |
1202 | 0 | COMP_CRC32C(crc, ((char *) record) + SizeOfXLogRecord, record->xl_tot_len - SizeOfXLogRecord); |
1203 | | /* include the record header last */ |
1204 | 0 | COMP_CRC32C(crc, (char *) record, offsetof(XLogRecord, xl_crc)); |
1205 | 0 | FIN_CRC32C(crc); |
1206 | |
|
1207 | 0 | if (!EQ_CRC32C(record->xl_crc, crc)) |
1208 | 0 | { |
1209 | 0 | report_invalid_record(state, |
1210 | 0 | "incorrect resource manager data checksum in record at %X/%X", |
1211 | 0 | LSN_FORMAT_ARGS(recptr)); |
1212 | 0 | return false; |
1213 | 0 | } |
1214 | | |
1215 | 0 | return true; |
1216 | 0 | } |
1217 | | |
1218 | | /* |
1219 | | * Validate a page header. |
1220 | | * |
1221 | | * Check if 'phdr' is valid as the header of the XLog page at position |
1222 | | * 'recptr'. |
1223 | | */ |
1224 | | bool |
1225 | | XLogReaderValidatePageHeader(XLogReaderState *state, XLogRecPtr recptr, |
1226 | | char *phdr) |
1227 | 0 | { |
1228 | 0 | XLogSegNo segno; |
1229 | 0 | int32 offset; |
1230 | 0 | XLogPageHeader hdr = (XLogPageHeader) phdr; |
1231 | |
|
1232 | 0 | Assert((recptr % XLOG_BLCKSZ) == 0); |
1233 | |
|
1234 | 0 | XLByteToSeg(recptr, segno, state->segcxt.ws_segsize); |
1235 | 0 | offset = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); |
1236 | |
|
1237 | 0 | if (hdr->xlp_magic != XLOG_PAGE_MAGIC) |
1238 | 0 | { |
1239 | 0 | char fname[MAXFNAMELEN]; |
1240 | |
|
1241 | 0 | XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); |
1242 | |
|
1243 | 0 | report_invalid_record(state, |
1244 | 0 | "invalid magic number %04X in WAL segment %s, LSN %X/%X, offset %u", |
1245 | 0 | hdr->xlp_magic, |
1246 | 0 | fname, |
1247 | 0 | LSN_FORMAT_ARGS(recptr), |
1248 | 0 | offset); |
1249 | 0 | return false; |
1250 | 0 | } |
1251 | | |
1252 | 0 | if ((hdr->xlp_info & ~XLP_ALL_FLAGS) != 0) |
1253 | 0 | { |
1254 | 0 | char fname[MAXFNAMELEN]; |
1255 | |
|
1256 | 0 | XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); |
1257 | |
|
1258 | 0 | report_invalid_record(state, |
1259 | 0 | "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", |
1260 | 0 | hdr->xlp_info, |
1261 | 0 | fname, |
1262 | 0 | LSN_FORMAT_ARGS(recptr), |
1263 | 0 | offset); |
1264 | 0 | return false; |
1265 | 0 | } |
1266 | | |
1267 | 0 | if (hdr->xlp_info & XLP_LONG_HEADER) |
1268 | 0 | { |
1269 | 0 | XLogLongPageHeader longhdr = (XLogLongPageHeader) hdr; |
1270 | |
|
1271 | 0 | if (state->system_identifier && |
1272 | 0 | longhdr->xlp_sysid != state->system_identifier) |
1273 | 0 | { |
1274 | 0 | report_invalid_record(state, |
1275 | 0 | "WAL file is from different database system: WAL file database system identifier is %" PRIu64 ", pg_control database system identifier is %" PRIu64, |
1276 | 0 | longhdr->xlp_sysid, |
1277 | 0 | state->system_identifier); |
1278 | 0 | return false; |
1279 | 0 | } |
1280 | 0 | else if (longhdr->xlp_seg_size != state->segcxt.ws_segsize) |
1281 | 0 | { |
1282 | 0 | report_invalid_record(state, |
1283 | 0 | "WAL file is from different database system: incorrect segment size in page header"); |
1284 | 0 | return false; |
1285 | 0 | } |
1286 | 0 | else if (longhdr->xlp_xlog_blcksz != XLOG_BLCKSZ) |
1287 | 0 | { |
1288 | 0 | report_invalid_record(state, |
1289 | 0 | "WAL file is from different database system: incorrect XLOG_BLCKSZ in page header"); |
1290 | 0 | return false; |
1291 | 0 | } |
1292 | 0 | } |
1293 | 0 | else if (offset == 0) |
1294 | 0 | { |
1295 | 0 | char fname[MAXFNAMELEN]; |
1296 | |
|
1297 | 0 | XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); |
1298 | | |
1299 | | /* hmm, first page of file doesn't have a long header? */ |
1300 | 0 | report_invalid_record(state, |
1301 | 0 | "invalid info bits %04X in WAL segment %s, LSN %X/%X, offset %u", |
1302 | 0 | hdr->xlp_info, |
1303 | 0 | fname, |
1304 | 0 | LSN_FORMAT_ARGS(recptr), |
1305 | 0 | offset); |
1306 | 0 | return false; |
1307 | 0 | } |
1308 | | |
1309 | | /* |
1310 | | * Check that the address on the page agrees with what we expected. This |
1311 | | * check typically fails when an old WAL segment is recycled, and hasn't |
1312 | | * yet been overwritten with new data yet. |
1313 | | */ |
1314 | 0 | if (hdr->xlp_pageaddr != recptr) |
1315 | 0 | { |
1316 | 0 | char fname[MAXFNAMELEN]; |
1317 | |
|
1318 | 0 | XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); |
1319 | |
|
1320 | 0 | report_invalid_record(state, |
1321 | 0 | "unexpected pageaddr %X/%X in WAL segment %s, LSN %X/%X, offset %u", |
1322 | 0 | LSN_FORMAT_ARGS(hdr->xlp_pageaddr), |
1323 | 0 | fname, |
1324 | 0 | LSN_FORMAT_ARGS(recptr), |
1325 | 0 | offset); |
1326 | 0 | return false; |
1327 | 0 | } |
1328 | | |
1329 | | /* |
1330 | | * Since child timelines are always assigned a TLI greater than their |
1331 | | * immediate parent's TLI, we should never see TLI go backwards across |
1332 | | * successive pages of a consistent WAL sequence. |
1333 | | * |
1334 | | * Sometimes we re-read a segment that's already been (partially) read. So |
1335 | | * we only verify TLIs for pages that are later than the last remembered |
1336 | | * LSN. |
1337 | | */ |
1338 | 0 | if (recptr > state->latestPagePtr) |
1339 | 0 | { |
1340 | 0 | if (hdr->xlp_tli < state->latestPageTLI) |
1341 | 0 | { |
1342 | 0 | char fname[MAXFNAMELEN]; |
1343 | |
|
1344 | 0 | XLogFileName(fname, state->seg.ws_tli, segno, state->segcxt.ws_segsize); |
1345 | |
|
1346 | 0 | report_invalid_record(state, |
1347 | 0 | "out-of-sequence timeline ID %u (after %u) in WAL segment %s, LSN %X/%X, offset %u", |
1348 | 0 | hdr->xlp_tli, |
1349 | 0 | state->latestPageTLI, |
1350 | 0 | fname, |
1351 | 0 | LSN_FORMAT_ARGS(recptr), |
1352 | 0 | offset); |
1353 | 0 | return false; |
1354 | 0 | } |
1355 | 0 | } |
1356 | 0 | state->latestPagePtr = recptr; |
1357 | 0 | state->latestPageTLI = hdr->xlp_tli; |
1358 | |
|
1359 | 0 | return true; |
1360 | 0 | } |
1361 | | |
1362 | | /* |
1363 | | * Forget about an error produced by XLogReaderValidatePageHeader(). |
1364 | | */ |
1365 | | void |
1366 | | XLogReaderResetError(XLogReaderState *state) |
1367 | 0 | { |
1368 | 0 | state->errormsg_buf[0] = '\0'; |
1369 | 0 | state->errormsg_deferred = false; |
1370 | 0 | } |
1371 | | |
1372 | | /* |
1373 | | * Find the first record with an lsn >= RecPtr. |
1374 | | * |
1375 | | * This is different from XLogBeginRead() in that RecPtr doesn't need to point |
1376 | | * to a valid record boundary. Useful for checking whether RecPtr is a valid |
1377 | | * xlog address for reading, and to find the first valid address after some |
1378 | | * address when dumping records for debugging purposes. |
1379 | | * |
1380 | | * This positions the reader, like XLogBeginRead(), so that the next call to |
1381 | | * XLogReadRecord() will read the next valid record. |
1382 | | */ |
1383 | | XLogRecPtr |
1384 | | XLogFindNextRecord(XLogReaderState *state, XLogRecPtr RecPtr) |
1385 | 0 | { |
1386 | 0 | XLogRecPtr tmpRecPtr; |
1387 | 0 | XLogRecPtr found = InvalidXLogRecPtr; |
1388 | 0 | XLogPageHeader header; |
1389 | 0 | char *errormsg; |
1390 | |
|
1391 | 0 | Assert(!XLogRecPtrIsInvalid(RecPtr)); |
1392 | | |
1393 | | /* Make sure ReadPageInternal() can't return XLREAD_WOULDBLOCK. */ |
1394 | 0 | state->nonblocking = false; |
1395 | | |
1396 | | /* |
1397 | | * skip over potential continuation data, keeping in mind that it may span |
1398 | | * multiple pages |
1399 | | */ |
1400 | 0 | tmpRecPtr = RecPtr; |
1401 | 0 | while (true) |
1402 | 0 | { |
1403 | 0 | XLogRecPtr targetPagePtr; |
1404 | 0 | int targetRecOff; |
1405 | 0 | uint32 pageHeaderSize; |
1406 | 0 | int readLen; |
1407 | | |
1408 | | /* |
1409 | | * Compute targetRecOff. It should typically be equal or greater than |
1410 | | * short page-header since a valid record can't start anywhere before |
1411 | | * that, except when caller has explicitly specified the offset that |
1412 | | * falls somewhere there or when we are skipping multi-page |
1413 | | * continuation record. It doesn't matter though because |
1414 | | * ReadPageInternal() is prepared to handle that and will read at |
1415 | | * least short page-header worth of data |
1416 | | */ |
1417 | 0 | targetRecOff = tmpRecPtr % XLOG_BLCKSZ; |
1418 | | |
1419 | | /* scroll back to page boundary */ |
1420 | 0 | targetPagePtr = tmpRecPtr - targetRecOff; |
1421 | | |
1422 | | /* Read the page containing the record */ |
1423 | 0 | readLen = ReadPageInternal(state, targetPagePtr, targetRecOff); |
1424 | 0 | if (readLen < 0) |
1425 | 0 | goto err; |
1426 | | |
1427 | 0 | header = (XLogPageHeader) state->readBuf; |
1428 | |
|
1429 | 0 | pageHeaderSize = XLogPageHeaderSize(header); |
1430 | | |
1431 | | /* make sure we have enough data for the page header */ |
1432 | 0 | readLen = ReadPageInternal(state, targetPagePtr, pageHeaderSize); |
1433 | 0 | if (readLen < 0) |
1434 | 0 | goto err; |
1435 | | |
1436 | | /* skip over potential continuation data */ |
1437 | 0 | if (header->xlp_info & XLP_FIRST_IS_CONTRECORD) |
1438 | 0 | { |
1439 | | /* |
1440 | | * If the length of the remaining continuation data is more than |
1441 | | * what can fit in this page, the continuation record crosses over |
1442 | | * this page. Read the next page and try again. xlp_rem_len in the |
1443 | | * next page header will contain the remaining length of the |
1444 | | * continuation data |
1445 | | * |
1446 | | * Note that record headers are MAXALIGN'ed |
1447 | | */ |
1448 | 0 | if (MAXALIGN(header->xlp_rem_len) >= (XLOG_BLCKSZ - pageHeaderSize)) |
1449 | 0 | tmpRecPtr = targetPagePtr + XLOG_BLCKSZ; |
1450 | 0 | else |
1451 | 0 | { |
1452 | | /* |
1453 | | * The previous continuation record ends in this page. Set |
1454 | | * tmpRecPtr to point to the first valid record |
1455 | | */ |
1456 | 0 | tmpRecPtr = targetPagePtr + pageHeaderSize |
1457 | 0 | + MAXALIGN(header->xlp_rem_len); |
1458 | 0 | break; |
1459 | 0 | } |
1460 | 0 | } |
1461 | 0 | else |
1462 | 0 | { |
1463 | 0 | tmpRecPtr = targetPagePtr + pageHeaderSize; |
1464 | 0 | break; |
1465 | 0 | } |
1466 | 0 | } |
1467 | | |
1468 | | /* |
1469 | | * we know now that tmpRecPtr is an address pointing to a valid XLogRecord |
1470 | | * because either we're at the first record after the beginning of a page |
1471 | | * or we just jumped over the remaining data of a continuation. |
1472 | | */ |
1473 | 0 | XLogBeginRead(state, tmpRecPtr); |
1474 | 0 | while (XLogReadRecord(state, &errormsg) != NULL) |
1475 | 0 | { |
1476 | | /* past the record we've found, break out */ |
1477 | 0 | if (RecPtr <= state->ReadRecPtr) |
1478 | 0 | { |
1479 | | /* Rewind the reader to the beginning of the last record. */ |
1480 | 0 | found = state->ReadRecPtr; |
1481 | 0 | XLogBeginRead(state, found); |
1482 | 0 | return found; |
1483 | 0 | } |
1484 | 0 | } |
1485 | | |
1486 | 0 | err: |
1487 | 0 | XLogReaderInvalReadState(state); |
1488 | |
|
1489 | 0 | return InvalidXLogRecPtr; |
1490 | 0 | } |
1491 | | |
1492 | | /* |
1493 | | * Helper function to ease writing of XLogReaderRoutine->page_read callbacks. |
1494 | | * If this function is used, caller must supply a segment_open callback in |
1495 | | * 'state', as that is used here. |
1496 | | * |
1497 | | * Read 'count' bytes into 'buf', starting at location 'startptr', from WAL |
1498 | | * fetched from timeline 'tli'. |
1499 | | * |
1500 | | * Returns true if succeeded, false if an error occurs, in which case |
1501 | | * 'errinfo' receives error details. |
1502 | | */ |
1503 | | bool |
1504 | | WALRead(XLogReaderState *state, |
1505 | | char *buf, XLogRecPtr startptr, Size count, TimeLineID tli, |
1506 | | WALReadError *errinfo) |
1507 | 0 | { |
1508 | 0 | char *p; |
1509 | 0 | XLogRecPtr recptr; |
1510 | 0 | Size nbytes; |
1511 | 0 | #ifndef FRONTEND |
1512 | 0 | instr_time io_start; |
1513 | 0 | #endif |
1514 | |
|
1515 | 0 | p = buf; |
1516 | 0 | recptr = startptr; |
1517 | 0 | nbytes = count; |
1518 | |
|
1519 | 0 | while (nbytes > 0) |
1520 | 0 | { |
1521 | 0 | uint32 startoff; |
1522 | 0 | int segbytes; |
1523 | 0 | int readbytes; |
1524 | |
|
1525 | 0 | startoff = XLogSegmentOffset(recptr, state->segcxt.ws_segsize); |
1526 | | |
1527 | | /* |
1528 | | * If the data we want is not in a segment we have open, close what we |
1529 | | * have (if anything) and open the next one, using the caller's |
1530 | | * provided segment_open callback. |
1531 | | */ |
1532 | 0 | if (state->seg.ws_file < 0 || |
1533 | 0 | !XLByteInSeg(recptr, state->seg.ws_segno, state->segcxt.ws_segsize) || |
1534 | 0 | tli != state->seg.ws_tli) |
1535 | 0 | { |
1536 | 0 | XLogSegNo nextSegNo; |
1537 | |
|
1538 | 0 | if (state->seg.ws_file >= 0) |
1539 | 0 | state->routine.segment_close(state); |
1540 | |
|
1541 | 0 | XLByteToSeg(recptr, nextSegNo, state->segcxt.ws_segsize); |
1542 | 0 | state->routine.segment_open(state, nextSegNo, &tli); |
1543 | | |
1544 | | /* This shouldn't happen -- indicates a bug in segment_open */ |
1545 | 0 | Assert(state->seg.ws_file >= 0); |
1546 | | |
1547 | | /* Update the current segment info. */ |
1548 | 0 | state->seg.ws_tli = tli; |
1549 | 0 | state->seg.ws_segno = nextSegNo; |
1550 | 0 | } |
1551 | | |
1552 | | /* How many bytes are within this segment? */ |
1553 | 0 | if (nbytes > (state->segcxt.ws_segsize - startoff)) |
1554 | 0 | segbytes = state->segcxt.ws_segsize - startoff; |
1555 | 0 | else |
1556 | 0 | segbytes = nbytes; |
1557 | |
|
1558 | 0 | #ifndef FRONTEND |
1559 | | /* Measure I/O timing when reading segment */ |
1560 | 0 | io_start = pgstat_prepare_io_time(track_wal_io_timing); |
1561 | |
|
1562 | 0 | pgstat_report_wait_start(WAIT_EVENT_WAL_READ); |
1563 | 0 | #endif |
1564 | | |
1565 | | /* Reset errno first; eases reporting non-errno-affecting errors */ |
1566 | 0 | errno = 0; |
1567 | 0 | readbytes = pg_pread(state->seg.ws_file, p, segbytes, (off_t) startoff); |
1568 | |
|
1569 | 0 | #ifndef FRONTEND |
1570 | 0 | pgstat_report_wait_end(); |
1571 | |
|
1572 | 0 | pgstat_count_io_op_time(IOOBJECT_WAL, IOCONTEXT_NORMAL, IOOP_READ, |
1573 | 0 | io_start, 1, readbytes); |
1574 | 0 | #endif |
1575 | |
|
1576 | 0 | if (readbytes <= 0) |
1577 | 0 | { |
1578 | 0 | errinfo->wre_errno = errno; |
1579 | 0 | errinfo->wre_req = segbytes; |
1580 | 0 | errinfo->wre_read = readbytes; |
1581 | 0 | errinfo->wre_off = startoff; |
1582 | 0 | errinfo->wre_seg = state->seg; |
1583 | 0 | return false; |
1584 | 0 | } |
1585 | | |
1586 | | /* Update state for read */ |
1587 | 0 | recptr += readbytes; |
1588 | 0 | nbytes -= readbytes; |
1589 | 0 | p += readbytes; |
1590 | 0 | } |
1591 | | |
1592 | 0 | return true; |
1593 | 0 | } |
1594 | | |
1595 | | /* ---------------------------------------- |
1596 | | * Functions for decoding the data and block references in a record. |
1597 | | * ---------------------------------------- |
1598 | | */ |
1599 | | |
1600 | | /* |
1601 | | * Private function to reset the state, forgetting all decoded records, if we |
1602 | | * are asked to move to a new read position. |
1603 | | */ |
1604 | | static void |
1605 | | ResetDecoder(XLogReaderState *state) |
1606 | 0 | { |
1607 | 0 | DecodedXLogRecord *r; |
1608 | | |
1609 | | /* Reset the decoded record queue, freeing any oversized records. */ |
1610 | 0 | while ((r = state->decode_queue_head) != NULL) |
1611 | 0 | { |
1612 | 0 | state->decode_queue_head = r->next; |
1613 | 0 | if (r->oversized) |
1614 | 0 | pfree(r); |
1615 | 0 | } |
1616 | 0 | state->decode_queue_tail = NULL; |
1617 | 0 | state->decode_queue_head = NULL; |
1618 | 0 | state->record = NULL; |
1619 | | |
1620 | | /* Reset the decode buffer to empty. */ |
1621 | 0 | state->decode_buffer_tail = state->decode_buffer; |
1622 | 0 | state->decode_buffer_head = state->decode_buffer; |
1623 | | |
1624 | | /* Clear error state. */ |
1625 | 0 | state->errormsg_buf[0] = '\0'; |
1626 | 0 | state->errormsg_deferred = false; |
1627 | 0 | } |
1628 | | |
1629 | | /* |
1630 | | * Compute the maximum possible amount of padding that could be required to |
1631 | | * decode a record, given xl_tot_len from the record's header. This is the |
1632 | | * amount of output buffer space that we need to decode a record, though we |
1633 | | * might not finish up using it all. |
1634 | | * |
1635 | | * This computation is pessimistic and assumes the maximum possible number of |
1636 | | * blocks, due to lack of better information. |
1637 | | */ |
1638 | | size_t |
1639 | | DecodeXLogRecordRequiredSpace(size_t xl_tot_len) |
1640 | 0 | { |
1641 | 0 | size_t size = 0; |
1642 | | |
1643 | | /* Account for the fixed size part of the decoded record struct. */ |
1644 | 0 | size += offsetof(DecodedXLogRecord, blocks[0]); |
1645 | | /* Account for the flexible blocks array of maximum possible size. */ |
1646 | 0 | size += sizeof(DecodedBkpBlock) * (XLR_MAX_BLOCK_ID + 1); |
1647 | | /* Account for all the raw main and block data. */ |
1648 | 0 | size += xl_tot_len; |
1649 | | /* We might insert padding before main_data. */ |
1650 | 0 | size += (MAXIMUM_ALIGNOF - 1); |
1651 | | /* We might insert padding before each block's data. */ |
1652 | 0 | size += (MAXIMUM_ALIGNOF - 1) * (XLR_MAX_BLOCK_ID + 1); |
1653 | | /* We might insert padding at the end. */ |
1654 | 0 | size += (MAXIMUM_ALIGNOF - 1); |
1655 | |
|
1656 | 0 | return size; |
1657 | 0 | } |
1658 | | |
1659 | | /* |
1660 | | * Decode a record. "decoded" must point to a MAXALIGNed memory area that has |
1661 | | * space for at least DecodeXLogRecordRequiredSpace(record) bytes. On |
1662 | | * success, decoded->size contains the actual space occupied by the decoded |
1663 | | * record, which may turn out to be less. |
1664 | | * |
1665 | | * Only decoded->oversized member must be initialized already, and will not be |
1666 | | * modified. Other members will be initialized as required. |
1667 | | * |
1668 | | * On error, a human-readable error message is returned in *errormsg, and |
1669 | | * the return value is false. |
1670 | | */ |
1671 | | bool |
1672 | | DecodeXLogRecord(XLogReaderState *state, |
1673 | | DecodedXLogRecord *decoded, |
1674 | | XLogRecord *record, |
1675 | | XLogRecPtr lsn, |
1676 | | char **errormsg) |
1677 | 0 | { |
1678 | | /* |
1679 | | * read next _size bytes from record buffer, but check for overrun first. |
1680 | | */ |
1681 | 0 | #define COPY_HEADER_FIELD(_dst, _size) \ |
1682 | 0 | do { \ |
1683 | 0 | if (remaining < _size) \ |
1684 | 0 | goto shortdata_err; \ |
1685 | 0 | memcpy(_dst, ptr, _size); \ |
1686 | 0 | ptr += _size; \ |
1687 | 0 | remaining -= _size; \ |
1688 | 0 | } while(0) |
1689 | |
|
1690 | 0 | char *ptr; |
1691 | 0 | char *out; |
1692 | 0 | uint32 remaining; |
1693 | 0 | uint32 datatotal; |
1694 | 0 | RelFileLocator *rlocator = NULL; |
1695 | 0 | uint8 block_id; |
1696 | |
|
1697 | 0 | decoded->header = *record; |
1698 | 0 | decoded->lsn = lsn; |
1699 | 0 | decoded->next = NULL; |
1700 | 0 | decoded->record_origin = InvalidRepOriginId; |
1701 | 0 | decoded->toplevel_xid = InvalidTransactionId; |
1702 | 0 | decoded->main_data = NULL; |
1703 | 0 | decoded->main_data_len = 0; |
1704 | 0 | decoded->max_block_id = -1; |
1705 | 0 | ptr = (char *) record; |
1706 | 0 | ptr += SizeOfXLogRecord; |
1707 | 0 | remaining = record->xl_tot_len - SizeOfXLogRecord; |
1708 | | |
1709 | | /* Decode the headers */ |
1710 | 0 | datatotal = 0; |
1711 | 0 | while (remaining > datatotal) |
1712 | 0 | { |
1713 | 0 | COPY_HEADER_FIELD(&block_id, sizeof(uint8)); |
1714 | | |
1715 | 0 | if (block_id == XLR_BLOCK_ID_DATA_SHORT) |
1716 | 0 | { |
1717 | | /* XLogRecordDataHeaderShort */ |
1718 | 0 | uint8 main_data_len; |
1719 | |
|
1720 | 0 | COPY_HEADER_FIELD(&main_data_len, sizeof(uint8)); |
1721 | | |
1722 | 0 | decoded->main_data_len = main_data_len; |
1723 | 0 | datatotal += main_data_len; |
1724 | 0 | break; /* by convention, the main data fragment is |
1725 | | * always last */ |
1726 | 0 | } |
1727 | 0 | else if (block_id == XLR_BLOCK_ID_DATA_LONG) |
1728 | 0 | { |
1729 | | /* XLogRecordDataHeaderLong */ |
1730 | 0 | uint32 main_data_len; |
1731 | |
|
1732 | 0 | COPY_HEADER_FIELD(&main_data_len, sizeof(uint32)); |
1733 | 0 | decoded->main_data_len = main_data_len; |
1734 | 0 | datatotal += main_data_len; |
1735 | 0 | break; /* by convention, the main data fragment is |
1736 | | * always last */ |
1737 | 0 | } |
1738 | 0 | else if (block_id == XLR_BLOCK_ID_ORIGIN) |
1739 | 0 | { |
1740 | 0 | COPY_HEADER_FIELD(&decoded->record_origin, sizeof(RepOriginId)); |
1741 | 0 | } |
1742 | 0 | else if (block_id == XLR_BLOCK_ID_TOPLEVEL_XID) |
1743 | 0 | { |
1744 | 0 | COPY_HEADER_FIELD(&decoded->toplevel_xid, sizeof(TransactionId)); |
1745 | 0 | } |
1746 | 0 | else if (block_id <= XLR_MAX_BLOCK_ID) |
1747 | 0 | { |
1748 | | /* XLogRecordBlockHeader */ |
1749 | 0 | DecodedBkpBlock *blk; |
1750 | 0 | uint8 fork_flags; |
1751 | | |
1752 | | /* mark any intervening block IDs as not in use */ |
1753 | 0 | for (int i = decoded->max_block_id + 1; i < block_id; ++i) |
1754 | 0 | decoded->blocks[i].in_use = false; |
1755 | |
|
1756 | 0 | if (block_id <= decoded->max_block_id) |
1757 | 0 | { |
1758 | 0 | report_invalid_record(state, |
1759 | 0 | "out-of-order block_id %u at %X/%X", |
1760 | 0 | block_id, |
1761 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1762 | 0 | goto err; |
1763 | 0 | } |
1764 | 0 | decoded->max_block_id = block_id; |
1765 | |
|
1766 | 0 | blk = &decoded->blocks[block_id]; |
1767 | 0 | blk->in_use = true; |
1768 | 0 | blk->apply_image = false; |
1769 | |
|
1770 | 0 | COPY_HEADER_FIELD(&fork_flags, sizeof(uint8)); |
1771 | 0 | blk->forknum = fork_flags & BKPBLOCK_FORK_MASK; |
1772 | 0 | blk->flags = fork_flags; |
1773 | 0 | blk->has_image = ((fork_flags & BKPBLOCK_HAS_IMAGE) != 0); |
1774 | 0 | blk->has_data = ((fork_flags & BKPBLOCK_HAS_DATA) != 0); |
1775 | |
|
1776 | 0 | blk->prefetch_buffer = InvalidBuffer; |
1777 | |
|
1778 | 0 | COPY_HEADER_FIELD(&blk->data_len, sizeof(uint16)); |
1779 | | /* cross-check that the HAS_DATA flag is set iff data_length > 0 */ |
1780 | 0 | if (blk->has_data && blk->data_len == 0) |
1781 | 0 | { |
1782 | 0 | report_invalid_record(state, |
1783 | 0 | "BKPBLOCK_HAS_DATA set, but no data included at %X/%X", |
1784 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1785 | 0 | goto err; |
1786 | 0 | } |
1787 | 0 | if (!blk->has_data && blk->data_len != 0) |
1788 | 0 | { |
1789 | 0 | report_invalid_record(state, |
1790 | 0 | "BKPBLOCK_HAS_DATA not set, but data length is %u at %X/%X", |
1791 | 0 | (unsigned int) blk->data_len, |
1792 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1793 | 0 | goto err; |
1794 | 0 | } |
1795 | 0 | datatotal += blk->data_len; |
1796 | |
|
1797 | 0 | if (blk->has_image) |
1798 | 0 | { |
1799 | 0 | COPY_HEADER_FIELD(&blk->bimg_len, sizeof(uint16)); |
1800 | 0 | COPY_HEADER_FIELD(&blk->hole_offset, sizeof(uint16)); |
1801 | 0 | COPY_HEADER_FIELD(&blk->bimg_info, sizeof(uint8)); |
1802 | | |
1803 | 0 | blk->apply_image = ((blk->bimg_info & BKPIMAGE_APPLY) != 0); |
1804 | |
|
1805 | 0 | if (BKPIMAGE_COMPRESSED(blk->bimg_info)) |
1806 | 0 | { |
1807 | 0 | if (blk->bimg_info & BKPIMAGE_HAS_HOLE) |
1808 | 0 | COPY_HEADER_FIELD(&blk->hole_length, sizeof(uint16)); |
1809 | 0 | else |
1810 | 0 | blk->hole_length = 0; |
1811 | 0 | } |
1812 | 0 | else |
1813 | 0 | blk->hole_length = BLCKSZ - blk->bimg_len; |
1814 | 0 | datatotal += blk->bimg_len; |
1815 | | |
1816 | | /* |
1817 | | * cross-check that hole_offset > 0, hole_length > 0 and |
1818 | | * bimg_len < BLCKSZ if the HAS_HOLE flag is set. |
1819 | | */ |
1820 | 0 | if ((blk->bimg_info & BKPIMAGE_HAS_HOLE) && |
1821 | 0 | (blk->hole_offset == 0 || |
1822 | 0 | blk->hole_length == 0 || |
1823 | 0 | blk->bimg_len == BLCKSZ)) |
1824 | 0 | { |
1825 | 0 | report_invalid_record(state, |
1826 | 0 | "BKPIMAGE_HAS_HOLE set, but hole offset %u length %u block image length %u at %X/%X", |
1827 | 0 | (unsigned int) blk->hole_offset, |
1828 | 0 | (unsigned int) blk->hole_length, |
1829 | 0 | (unsigned int) blk->bimg_len, |
1830 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1831 | 0 | goto err; |
1832 | 0 | } |
1833 | | |
1834 | | /* |
1835 | | * cross-check that hole_offset == 0 and hole_length == 0 if |
1836 | | * the HAS_HOLE flag is not set. |
1837 | | */ |
1838 | 0 | if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) && |
1839 | 0 | (blk->hole_offset != 0 || blk->hole_length != 0)) |
1840 | 0 | { |
1841 | 0 | report_invalid_record(state, |
1842 | 0 | "BKPIMAGE_HAS_HOLE not set, but hole offset %u length %u at %X/%X", |
1843 | 0 | (unsigned int) blk->hole_offset, |
1844 | 0 | (unsigned int) blk->hole_length, |
1845 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1846 | 0 | goto err; |
1847 | 0 | } |
1848 | | |
1849 | | /* |
1850 | | * Cross-check that bimg_len < BLCKSZ if it is compressed. |
1851 | | */ |
1852 | 0 | if (BKPIMAGE_COMPRESSED(blk->bimg_info) && |
1853 | 0 | blk->bimg_len == BLCKSZ) |
1854 | 0 | { |
1855 | 0 | report_invalid_record(state, |
1856 | 0 | "BKPIMAGE_COMPRESSED set, but block image length %u at %X/%X", |
1857 | 0 | (unsigned int) blk->bimg_len, |
1858 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1859 | 0 | goto err; |
1860 | 0 | } |
1861 | | |
1862 | | /* |
1863 | | * cross-check that bimg_len = BLCKSZ if neither HAS_HOLE is |
1864 | | * set nor COMPRESSED(). |
1865 | | */ |
1866 | 0 | if (!(blk->bimg_info & BKPIMAGE_HAS_HOLE) && |
1867 | 0 | !BKPIMAGE_COMPRESSED(blk->bimg_info) && |
1868 | 0 | blk->bimg_len != BLCKSZ) |
1869 | 0 | { |
1870 | 0 | report_invalid_record(state, |
1871 | 0 | "neither BKPIMAGE_HAS_HOLE nor BKPIMAGE_COMPRESSED set, but block image length is %u at %X/%X", |
1872 | 0 | (unsigned int) blk->data_len, |
1873 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1874 | 0 | goto err; |
1875 | 0 | } |
1876 | 0 | } |
1877 | 0 | if (!(fork_flags & BKPBLOCK_SAME_REL)) |
1878 | 0 | { |
1879 | 0 | COPY_HEADER_FIELD(&blk->rlocator, sizeof(RelFileLocator)); |
1880 | 0 | rlocator = &blk->rlocator; |
1881 | 0 | } |
1882 | 0 | else |
1883 | 0 | { |
1884 | 0 | if (rlocator == NULL) |
1885 | 0 | { |
1886 | 0 | report_invalid_record(state, |
1887 | 0 | "BKPBLOCK_SAME_REL set but no previous rel at %X/%X", |
1888 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1889 | 0 | goto err; |
1890 | 0 | } |
1891 | | |
1892 | 0 | blk->rlocator = *rlocator; |
1893 | 0 | } |
1894 | 0 | COPY_HEADER_FIELD(&blk->blkno, sizeof(BlockNumber)); |
1895 | 0 | } |
1896 | 0 | else |
1897 | 0 | { |
1898 | 0 | report_invalid_record(state, |
1899 | 0 | "invalid block_id %u at %X/%X", |
1900 | 0 | block_id, LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1901 | 0 | goto err; |
1902 | 0 | } |
1903 | 0 | } |
1904 | | |
1905 | 0 | if (remaining != datatotal) |
1906 | 0 | goto shortdata_err; |
1907 | | |
1908 | | /* |
1909 | | * Ok, we've parsed the fragment headers, and verified that the total |
1910 | | * length of the payload in the fragments is equal to the amount of data |
1911 | | * left. Copy the data of each fragment to contiguous space after the |
1912 | | * blocks array, inserting alignment padding before the data fragments so |
1913 | | * they can be cast to struct pointers by REDO routines. |
1914 | | */ |
1915 | 0 | out = ((char *) decoded) + |
1916 | 0 | offsetof(DecodedXLogRecord, blocks) + |
1917 | 0 | sizeof(decoded->blocks[0]) * (decoded->max_block_id + 1); |
1918 | | |
1919 | | /* block data first */ |
1920 | 0 | for (block_id = 0; block_id <= decoded->max_block_id; block_id++) |
1921 | 0 | { |
1922 | 0 | DecodedBkpBlock *blk = &decoded->blocks[block_id]; |
1923 | |
|
1924 | 0 | if (!blk->in_use) |
1925 | 0 | continue; |
1926 | | |
1927 | 0 | Assert(blk->has_image || !blk->apply_image); |
1928 | |
|
1929 | 0 | if (blk->has_image) |
1930 | 0 | { |
1931 | | /* no need to align image */ |
1932 | 0 | blk->bkp_image = out; |
1933 | 0 | memcpy(out, ptr, blk->bimg_len); |
1934 | 0 | ptr += blk->bimg_len; |
1935 | 0 | out += blk->bimg_len; |
1936 | 0 | } |
1937 | 0 | if (blk->has_data) |
1938 | 0 | { |
1939 | 0 | out = (char *) MAXALIGN(out); |
1940 | 0 | blk->data = out; |
1941 | 0 | memcpy(blk->data, ptr, blk->data_len); |
1942 | 0 | ptr += blk->data_len; |
1943 | 0 | out += blk->data_len; |
1944 | 0 | } |
1945 | 0 | } |
1946 | | |
1947 | | /* and finally, the main data */ |
1948 | 0 | if (decoded->main_data_len > 0) |
1949 | 0 | { |
1950 | 0 | out = (char *) MAXALIGN(out); |
1951 | 0 | decoded->main_data = out; |
1952 | 0 | memcpy(decoded->main_data, ptr, decoded->main_data_len); |
1953 | 0 | ptr += decoded->main_data_len; |
1954 | 0 | out += decoded->main_data_len; |
1955 | 0 | } |
1956 | | |
1957 | | /* Report the actual size we used. */ |
1958 | 0 | decoded->size = MAXALIGN(out - (char *) decoded); |
1959 | 0 | Assert(DecodeXLogRecordRequiredSpace(record->xl_tot_len) >= |
1960 | 0 | decoded->size); |
1961 | |
|
1962 | 0 | return true; |
1963 | | |
1964 | 0 | shortdata_err: |
1965 | 0 | report_invalid_record(state, |
1966 | 0 | "record with invalid length at %X/%X", |
1967 | 0 | LSN_FORMAT_ARGS(state->ReadRecPtr)); |
1968 | 0 | err: |
1969 | 0 | *errormsg = state->errormsg_buf; |
1970 | |
|
1971 | 0 | return false; |
1972 | 0 | } |
1973 | | |
1974 | | /* |
1975 | | * Returns information about the block that a block reference refers to. |
1976 | | * |
1977 | | * This is like XLogRecGetBlockTagExtended, except that the block reference |
1978 | | * must exist and there's no access to prefetch_buffer. |
1979 | | */ |
1980 | | void |
1981 | | XLogRecGetBlockTag(XLogReaderState *record, uint8 block_id, |
1982 | | RelFileLocator *rlocator, ForkNumber *forknum, |
1983 | | BlockNumber *blknum) |
1984 | 0 | { |
1985 | 0 | if (!XLogRecGetBlockTagExtended(record, block_id, rlocator, forknum, |
1986 | 0 | blknum, NULL)) |
1987 | 0 | { |
1988 | 0 | #ifndef FRONTEND |
1989 | 0 | elog(ERROR, "could not locate backup block with ID %d in WAL record", |
1990 | 0 | block_id); |
1991 | | #else |
1992 | | pg_fatal("could not locate backup block with ID %d in WAL record", |
1993 | | block_id); |
1994 | | #endif |
1995 | 0 | } |
1996 | 0 | } |
1997 | | |
1998 | | /* |
1999 | | * Returns information about the block that a block reference refers to, |
2000 | | * optionally including the buffer that the block may already be in. |
2001 | | * |
2002 | | * If the WAL record contains a block reference with the given ID, *rlocator, |
2003 | | * *forknum, *blknum and *prefetch_buffer are filled in (if not NULL), and |
2004 | | * returns true. Otherwise returns false. |
2005 | | */ |
2006 | | bool |
2007 | | XLogRecGetBlockTagExtended(XLogReaderState *record, uint8 block_id, |
2008 | | RelFileLocator *rlocator, ForkNumber *forknum, |
2009 | | BlockNumber *blknum, |
2010 | | Buffer *prefetch_buffer) |
2011 | 0 | { |
2012 | 0 | DecodedBkpBlock *bkpb; |
2013 | |
|
2014 | 0 | if (!XLogRecHasBlockRef(record, block_id)) |
2015 | 0 | return false; |
2016 | | |
2017 | 0 | bkpb = &record->record->blocks[block_id]; |
2018 | 0 | if (rlocator) |
2019 | 0 | *rlocator = bkpb->rlocator; |
2020 | 0 | if (forknum) |
2021 | 0 | *forknum = bkpb->forknum; |
2022 | 0 | if (blknum) |
2023 | 0 | *blknum = bkpb->blkno; |
2024 | 0 | if (prefetch_buffer) |
2025 | 0 | *prefetch_buffer = bkpb->prefetch_buffer; |
2026 | 0 | return true; |
2027 | 0 | } |
2028 | | |
2029 | | /* |
2030 | | * Returns the data associated with a block reference, or NULL if there is |
2031 | | * no data (e.g. because a full-page image was taken instead). The returned |
2032 | | * pointer points to a MAXALIGNed buffer. |
2033 | | */ |
2034 | | char * |
2035 | | XLogRecGetBlockData(XLogReaderState *record, uint8 block_id, Size *len) |
2036 | 0 | { |
2037 | 0 | DecodedBkpBlock *bkpb; |
2038 | |
|
2039 | 0 | if (block_id > record->record->max_block_id || |
2040 | 0 | !record->record->blocks[block_id].in_use) |
2041 | 0 | return NULL; |
2042 | | |
2043 | 0 | bkpb = &record->record->blocks[block_id]; |
2044 | |
|
2045 | 0 | if (!bkpb->has_data) |
2046 | 0 | { |
2047 | 0 | if (len) |
2048 | 0 | *len = 0; |
2049 | 0 | return NULL; |
2050 | 0 | } |
2051 | 0 | else |
2052 | 0 | { |
2053 | 0 | if (len) |
2054 | 0 | *len = bkpb->data_len; |
2055 | 0 | return bkpb->data; |
2056 | 0 | } |
2057 | 0 | } |
2058 | | |
2059 | | /* |
2060 | | * Restore a full-page image from a backup block attached to an XLOG record. |
2061 | | * |
2062 | | * Returns true if a full-page image is restored, and false on failure with |
2063 | | * an error to be consumed by the caller. |
2064 | | */ |
2065 | | bool |
2066 | | RestoreBlockImage(XLogReaderState *record, uint8 block_id, char *page) |
2067 | 0 | { |
2068 | 0 | DecodedBkpBlock *bkpb; |
2069 | 0 | char *ptr; |
2070 | 0 | PGAlignedBlock tmp; |
2071 | |
|
2072 | 0 | if (block_id > record->record->max_block_id || |
2073 | 0 | !record->record->blocks[block_id].in_use) |
2074 | 0 | { |
2075 | 0 | report_invalid_record(record, |
2076 | 0 | "could not restore image at %X/%X with invalid block %d specified", |
2077 | 0 | LSN_FORMAT_ARGS(record->ReadRecPtr), |
2078 | 0 | block_id); |
2079 | 0 | return false; |
2080 | 0 | } |
2081 | 0 | if (!record->record->blocks[block_id].has_image) |
2082 | 0 | { |
2083 | 0 | report_invalid_record(record, "could not restore image at %X/%X with invalid state, block %d", |
2084 | 0 | LSN_FORMAT_ARGS(record->ReadRecPtr), |
2085 | 0 | block_id); |
2086 | 0 | return false; |
2087 | 0 | } |
2088 | | |
2089 | 0 | bkpb = &record->record->blocks[block_id]; |
2090 | 0 | ptr = bkpb->bkp_image; |
2091 | |
|
2092 | 0 | if (BKPIMAGE_COMPRESSED(bkpb->bimg_info)) |
2093 | 0 | { |
2094 | | /* If a backup block image is compressed, decompress it */ |
2095 | 0 | bool decomp_success = true; |
2096 | |
|
2097 | 0 | if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_PGLZ) != 0) |
2098 | 0 | { |
2099 | 0 | if (pglz_decompress(ptr, bkpb->bimg_len, tmp.data, |
2100 | 0 | BLCKSZ - bkpb->hole_length, true) < 0) |
2101 | 0 | decomp_success = false; |
2102 | 0 | } |
2103 | 0 | else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_LZ4) != 0) |
2104 | 0 | { |
2105 | | #ifdef USE_LZ4 |
2106 | | if (LZ4_decompress_safe(ptr, tmp.data, |
2107 | | bkpb->bimg_len, BLCKSZ - bkpb->hole_length) <= 0) |
2108 | | decomp_success = false; |
2109 | | #else |
2110 | 0 | report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", |
2111 | 0 | LSN_FORMAT_ARGS(record->ReadRecPtr), |
2112 | 0 | "LZ4", |
2113 | 0 | block_id); |
2114 | 0 | return false; |
2115 | 0 | #endif |
2116 | 0 | } |
2117 | 0 | else if ((bkpb->bimg_info & BKPIMAGE_COMPRESS_ZSTD) != 0) |
2118 | 0 | { |
2119 | | #ifdef USE_ZSTD |
2120 | | size_t decomp_result = ZSTD_decompress(tmp.data, |
2121 | | BLCKSZ - bkpb->hole_length, |
2122 | | ptr, bkpb->bimg_len); |
2123 | | |
2124 | | if (ZSTD_isError(decomp_result)) |
2125 | | decomp_success = false; |
2126 | | #else |
2127 | 0 | report_invalid_record(record, "could not restore image at %X/%X compressed with %s not supported by build, block %d", |
2128 | 0 | LSN_FORMAT_ARGS(record->ReadRecPtr), |
2129 | 0 | "zstd", |
2130 | 0 | block_id); |
2131 | 0 | return false; |
2132 | 0 | #endif |
2133 | 0 | } |
2134 | 0 | else |
2135 | 0 | { |
2136 | 0 | report_invalid_record(record, "could not restore image at %X/%X compressed with unknown method, block %d", |
2137 | 0 | LSN_FORMAT_ARGS(record->ReadRecPtr), |
2138 | 0 | block_id); |
2139 | 0 | return false; |
2140 | 0 | } |
2141 | | |
2142 | 0 | if (!decomp_success) |
2143 | 0 | { |
2144 | 0 | report_invalid_record(record, "could not decompress image at %X/%X, block %d", |
2145 | 0 | LSN_FORMAT_ARGS(record->ReadRecPtr), |
2146 | 0 | block_id); |
2147 | 0 | return false; |
2148 | 0 | } |
2149 | | |
2150 | 0 | ptr = tmp.data; |
2151 | 0 | } |
2152 | | |
2153 | | /* generate page, taking into account hole if necessary */ |
2154 | 0 | if (bkpb->hole_length == 0) |
2155 | 0 | { |
2156 | 0 | memcpy(page, ptr, BLCKSZ); |
2157 | 0 | } |
2158 | 0 | else |
2159 | 0 | { |
2160 | 0 | memcpy(page, ptr, bkpb->hole_offset); |
2161 | | /* must zero-fill the hole */ |
2162 | 0 | MemSet(page + bkpb->hole_offset, 0, bkpb->hole_length); |
2163 | 0 | memcpy(page + (bkpb->hole_offset + bkpb->hole_length), |
2164 | 0 | ptr + bkpb->hole_offset, |
2165 | 0 | BLCKSZ - (bkpb->hole_offset + bkpb->hole_length)); |
2166 | 0 | } |
2167 | |
|
2168 | 0 | return true; |
2169 | 0 | } |
2170 | | |
2171 | | #ifndef FRONTEND |
2172 | | |
2173 | | /* |
2174 | | * Extract the FullTransactionId from a WAL record. |
2175 | | */ |
2176 | | FullTransactionId |
2177 | | XLogRecGetFullXid(XLogReaderState *record) |
2178 | 0 | { |
2179 | | /* |
2180 | | * This function is only safe during replay, because it depends on the |
2181 | | * replay state. See AdvanceNextFullTransactionIdPastXid() for more. |
2182 | | */ |
2183 | 0 | Assert(AmStartupProcess() || !IsUnderPostmaster); |
2184 | |
|
2185 | 0 | return FullTransactionIdFromAllowableAt(TransamVariables->nextXid, |
2186 | 0 | XLogRecGetXid(record)); |
2187 | 0 | } |
2188 | | |
2189 | | #endif |