Coverage Report

Created: 2025-06-13 06:06

/src/postgres/src/backend/access/spgist/spgxlog.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * spgxlog.c
4
 *    WAL replay logic for SP-GiST
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *       src/backend/access/spgist/spgxlog.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
#include "postgres.h"
16
17
#include "access/bufmask.h"
18
#include "access/spgist_private.h"
19
#include "access/spgxlog.h"
20
#include "access/xlogutils.h"
21
#include "storage/standby.h"
22
#include "utils/memutils.h"
23
24
25
static MemoryContext opCtx;   /* working memory for operations */
26
27
28
/*
29
 * Prepare a dummy SpGistState, with just the minimum info needed for replay.
30
 *
31
 * At present, all we need is enough info to support spgFormDeadTuple(),
32
 * plus the isBuild flag.
33
 */
34
static void
35
fillFakeState(SpGistState *state, spgxlogState stateSrc)
36
0
{
37
0
  memset(state, 0, sizeof(*state));
38
39
0
  state->redirectXid = stateSrc.redirectXid;
40
0
  state->isBuild = stateSrc.isBuild;
41
0
  state->deadTupleStorage = palloc0(SGDTSIZE);
42
0
}
43
44
/*
45
 * Add a leaf tuple, or replace an existing placeholder tuple.  This is used
46
 * to replay SpGistPageAddNewItem() operations.  If the offset points at an
47
 * existing tuple, it had better be a placeholder tuple.
48
 */
49
static void
50
addOrReplaceTuple(Page page, Item tuple, int size, OffsetNumber offset)
51
0
{
52
0
  if (offset <= PageGetMaxOffsetNumber(page))
53
0
  {
54
0
    SpGistDeadTuple dt = (SpGistDeadTuple) PageGetItem(page,
55
0
                               PageGetItemId(page, offset));
56
57
0
    if (dt->tupstate != SPGIST_PLACEHOLDER)
58
0
      elog(ERROR, "SPGiST tuple to be replaced is not a placeholder");
59
60
0
    Assert(SpGistPageGetOpaque(page)->nPlaceholder > 0);
61
0
    SpGistPageGetOpaque(page)->nPlaceholder--;
62
63
0
    PageIndexTupleDelete(page, offset);
64
0
  }
65
66
0
  Assert(offset <= PageGetMaxOffsetNumber(page) + 1);
67
68
0
  if (PageAddItem(page, tuple, size, offset, false, false) != offset)
69
0
    elog(ERROR, "failed to add item of size %u to SPGiST index page",
70
0
       size);
71
0
}
72
73
static void
74
spgRedoAddLeaf(XLogReaderState *record)
75
0
{
76
0
  XLogRecPtr  lsn = record->EndRecPtr;
77
0
  char     *ptr = XLogRecGetData(record);
78
0
  spgxlogAddLeaf *xldata = (spgxlogAddLeaf *) ptr;
79
0
  char     *leafTuple;
80
0
  SpGistLeafTupleData leafTupleHdr;
81
0
  Buffer    buffer;
82
0
  Page    page;
83
0
  XLogRedoAction action;
84
85
0
  ptr += sizeof(spgxlogAddLeaf);
86
0
  leafTuple = ptr;
87
  /* the leaf tuple is unaligned, so make a copy to access its header */
88
0
  memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
89
90
  /*
91
   * In normal operation we would have both current and parent pages locked
92
   * simultaneously; but in WAL replay it should be safe to update the leaf
93
   * page before updating the parent.
94
   */
95
0
  if (xldata->newPage)
96
0
  {
97
0
    buffer = XLogInitBufferForRedo(record, 0);
98
0
    SpGistInitBuffer(buffer,
99
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
100
0
    action = BLK_NEEDS_REDO;
101
0
  }
102
0
  else
103
0
    action = XLogReadBufferForRedo(record, 0, &buffer);
104
105
0
  if (action == BLK_NEEDS_REDO)
106
0
  {
107
0
    page = BufferGetPage(buffer);
108
109
    /* insert new tuple */
110
0
    if (xldata->offnumLeaf != xldata->offnumHeadLeaf)
111
0
    {
112
      /* normal cases, tuple was added by SpGistPageAddNewItem */
113
0
      addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
114
0
                xldata->offnumLeaf);
115
116
      /* update head tuple's chain link if needed */
117
0
      if (xldata->offnumHeadLeaf != InvalidOffsetNumber)
118
0
      {
119
0
        SpGistLeafTuple head;
120
121
0
        head = (SpGistLeafTuple) PageGetItem(page,
122
0
                           PageGetItemId(page, xldata->offnumHeadLeaf));
123
0
        Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
124
0
        SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
125
0
      }
126
0
    }
127
0
    else
128
0
    {
129
      /* replacing a DEAD tuple */
130
0
      PageIndexTupleDelete(page, xldata->offnumLeaf);
131
0
      if (PageAddItem(page,
132
0
              (Item) leafTuple, leafTupleHdr.size,
133
0
              xldata->offnumLeaf, false, false) != xldata->offnumLeaf)
134
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
135
0
           leafTupleHdr.size);
136
0
    }
137
138
0
    PageSetLSN(page, lsn);
139
0
    MarkBufferDirty(buffer);
140
0
  }
141
0
  if (BufferIsValid(buffer))
142
0
    UnlockReleaseBuffer(buffer);
143
144
  /* update parent downlink if necessary */
145
0
  if (xldata->offnumParent != InvalidOffsetNumber)
146
0
  {
147
0
    if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
148
0
    {
149
0
      SpGistInnerTuple tuple;
150
0
      BlockNumber blknoLeaf;
151
152
0
      XLogRecGetBlockTag(record, 0, NULL, NULL, &blknoLeaf);
153
154
0
      page = BufferGetPage(buffer);
155
156
0
      tuple = (SpGistInnerTuple) PageGetItem(page,
157
0
                           PageGetItemId(page, xldata->offnumParent));
158
159
0
      spgUpdateNodeLink(tuple, xldata->nodeI,
160
0
                blknoLeaf, xldata->offnumLeaf);
161
162
0
      PageSetLSN(page, lsn);
163
0
      MarkBufferDirty(buffer);
164
0
    }
165
0
    if (BufferIsValid(buffer))
166
0
      UnlockReleaseBuffer(buffer);
167
0
  }
168
0
}
169
170
static void
171
spgRedoMoveLeafs(XLogReaderState *record)
172
0
{
173
0
  XLogRecPtr  lsn = record->EndRecPtr;
174
0
  char     *ptr = XLogRecGetData(record);
175
0
  spgxlogMoveLeafs *xldata = (spgxlogMoveLeafs *) ptr;
176
0
  SpGistState state;
177
0
  OffsetNumber *toDelete;
178
0
  OffsetNumber *toInsert;
179
0
  int     nInsert;
180
0
  Buffer    buffer;
181
0
  Page    page;
182
0
  XLogRedoAction action;
183
0
  BlockNumber blknoDst;
184
185
0
  XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoDst);
186
187
0
  fillFakeState(&state, xldata->stateSrc);
188
189
0
  nInsert = xldata->replaceDead ? 1 : xldata->nMoves + 1;
190
191
0
  ptr += SizeOfSpgxlogMoveLeafs;
192
0
  toDelete = (OffsetNumber *) ptr;
193
0
  ptr += sizeof(OffsetNumber) * xldata->nMoves;
194
0
  toInsert = (OffsetNumber *) ptr;
195
0
  ptr += sizeof(OffsetNumber) * nInsert;
196
197
  /* now ptr points to the list of leaf tuples */
198
199
  /*
200
   * In normal operation we would have all three pages (source, dest, and
201
   * parent) locked simultaneously; but in WAL replay it should be safe to
202
   * update them one at a time, as long as we do it in the right order.
203
   */
204
205
  /* Insert tuples on the dest page (do first, so redirect is valid) */
206
0
  if (xldata->newPage)
207
0
  {
208
0
    buffer = XLogInitBufferForRedo(record, 1);
209
0
    SpGistInitBuffer(buffer,
210
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
211
0
    action = BLK_NEEDS_REDO;
212
0
  }
213
0
  else
214
0
    action = XLogReadBufferForRedo(record, 1, &buffer);
215
216
0
  if (action == BLK_NEEDS_REDO)
217
0
  {
218
0
    int     i;
219
220
0
    page = BufferGetPage(buffer);
221
222
0
    for (i = 0; i < nInsert; i++)
223
0
    {
224
0
      char     *leafTuple;
225
0
      SpGistLeafTupleData leafTupleHdr;
226
227
      /*
228
       * the tuples are not aligned, so must copy to access the size
229
       * field.
230
       */
231
0
      leafTuple = ptr;
232
0
      memcpy(&leafTupleHdr, leafTuple,
233
0
           sizeof(SpGistLeafTupleData));
234
235
0
      addOrReplaceTuple(page, (Item) leafTuple,
236
0
                leafTupleHdr.size, toInsert[i]);
237
0
      ptr += leafTupleHdr.size;
238
0
    }
239
240
0
    PageSetLSN(page, lsn);
241
0
    MarkBufferDirty(buffer);
242
0
  }
243
0
  if (BufferIsValid(buffer))
244
0
    UnlockReleaseBuffer(buffer);
245
246
  /* Delete tuples from the source page, inserting a redirection pointer */
247
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
248
0
  {
249
0
    page = BufferGetPage(buffer);
250
251
0
    spgPageIndexMultiDelete(&state, page, toDelete, xldata->nMoves,
252
0
                state.isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
253
0
                SPGIST_PLACEHOLDER,
254
0
                blknoDst,
255
0
                toInsert[nInsert - 1]);
256
257
0
    PageSetLSN(page, lsn);
258
0
    MarkBufferDirty(buffer);
259
0
  }
260
0
  if (BufferIsValid(buffer))
261
0
    UnlockReleaseBuffer(buffer);
262
263
  /* And update the parent downlink */
264
0
  if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
265
0
  {
266
0
    SpGistInnerTuple tuple;
267
268
0
    page = BufferGetPage(buffer);
269
270
0
    tuple = (SpGistInnerTuple) PageGetItem(page,
271
0
                         PageGetItemId(page, xldata->offnumParent));
272
273
0
    spgUpdateNodeLink(tuple, xldata->nodeI,
274
0
              blknoDst, toInsert[nInsert - 1]);
275
276
0
    PageSetLSN(page, lsn);
277
0
    MarkBufferDirty(buffer);
278
0
  }
279
0
  if (BufferIsValid(buffer))
280
0
    UnlockReleaseBuffer(buffer);
281
0
}
282
283
static void
284
spgRedoAddNode(XLogReaderState *record)
285
0
{
286
0
  XLogRecPtr  lsn = record->EndRecPtr;
287
0
  char     *ptr = XLogRecGetData(record);
288
0
  spgxlogAddNode *xldata = (spgxlogAddNode *) ptr;
289
0
  char     *innerTuple;
290
0
  SpGistInnerTupleData innerTupleHdr;
291
0
  SpGistState state;
292
0
  Buffer    buffer;
293
0
  Page    page;
294
0
  XLogRedoAction action;
295
296
0
  ptr += sizeof(spgxlogAddNode);
297
0
  innerTuple = ptr;
298
  /* the tuple is unaligned, so make a copy to access its header */
299
0
  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
300
301
0
  fillFakeState(&state, xldata->stateSrc);
302
303
0
  if (!XLogRecHasBlockRef(record, 1))
304
0
  {
305
    /* update in place */
306
0
    Assert(xldata->parentBlk == -1);
307
0
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
308
0
    {
309
0
      page = BufferGetPage(buffer);
310
311
0
      PageIndexTupleDelete(page, xldata->offnum);
312
0
      if (PageAddItem(page, (Item) innerTuple, innerTupleHdr.size,
313
0
              xldata->offnum,
314
0
              false, false) != xldata->offnum)
315
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
316
0
           innerTupleHdr.size);
317
318
0
      PageSetLSN(page, lsn);
319
0
      MarkBufferDirty(buffer);
320
0
    }
321
0
    if (BufferIsValid(buffer))
322
0
      UnlockReleaseBuffer(buffer);
323
0
  }
324
0
  else
325
0
  {
326
0
    BlockNumber blkno;
327
0
    BlockNumber blknoNew;
328
329
0
    XLogRecGetBlockTag(record, 0, NULL, NULL, &blkno);
330
0
    XLogRecGetBlockTag(record, 1, NULL, NULL, &blknoNew);
331
332
    /*
333
     * In normal operation we would have all three pages (source, dest,
334
     * and parent) locked simultaneously; but in WAL replay it should be
335
     * safe to update them one at a time, as long as we do it in the right
336
     * order. We must insert the new tuple before replacing the old tuple
337
     * with the redirect tuple.
338
     */
339
340
    /* Install new tuple first so redirect is valid */
341
0
    if (xldata->newPage)
342
0
    {
343
      /* AddNode is not used for nulls pages */
344
0
      buffer = XLogInitBufferForRedo(record, 1);
345
0
      SpGistInitBuffer(buffer, 0);
346
0
      action = BLK_NEEDS_REDO;
347
0
    }
348
0
    else
349
0
      action = XLogReadBufferForRedo(record, 1, &buffer);
350
0
    if (action == BLK_NEEDS_REDO)
351
0
    {
352
0
      page = BufferGetPage(buffer);
353
354
0
      addOrReplaceTuple(page, (Item) innerTuple,
355
0
                innerTupleHdr.size, xldata->offnumNew);
356
357
      /*
358
       * If parent is in this same page, update it now.
359
       */
360
0
      if (xldata->parentBlk == 1)
361
0
      {
362
0
        SpGistInnerTuple parentTuple;
363
364
0
        parentTuple = (SpGistInnerTuple) PageGetItem(page,
365
0
                               PageGetItemId(page, xldata->offnumParent));
366
367
0
        spgUpdateNodeLink(parentTuple, xldata->nodeI,
368
0
                  blknoNew, xldata->offnumNew);
369
0
      }
370
0
      PageSetLSN(page, lsn);
371
0
      MarkBufferDirty(buffer);
372
0
    }
373
0
    if (BufferIsValid(buffer))
374
0
      UnlockReleaseBuffer(buffer);
375
376
    /* Delete old tuple, replacing it with redirect or placeholder tuple */
377
0
    if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
378
0
    {
379
0
      SpGistDeadTuple dt;
380
381
0
      page = BufferGetPage(buffer);
382
383
0
      if (state.isBuild)
384
0
        dt = spgFormDeadTuple(&state, SPGIST_PLACEHOLDER,
385
0
                    InvalidBlockNumber,
386
0
                    InvalidOffsetNumber);
387
0
      else
388
0
        dt = spgFormDeadTuple(&state, SPGIST_REDIRECT,
389
0
                    blknoNew,
390
0
                    xldata->offnumNew);
391
392
0
      PageIndexTupleDelete(page, xldata->offnum);
393
0
      if (PageAddItem(page, (Item) dt, dt->size,
394
0
              xldata->offnum,
395
0
              false, false) != xldata->offnum)
396
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
397
0
           dt->size);
398
399
0
      if (state.isBuild)
400
0
        SpGistPageGetOpaque(page)->nPlaceholder++;
401
0
      else
402
0
        SpGistPageGetOpaque(page)->nRedirection++;
403
404
      /*
405
       * If parent is in this same page, update it now.
406
       */
407
0
      if (xldata->parentBlk == 0)
408
0
      {
409
0
        SpGistInnerTuple parentTuple;
410
411
0
        parentTuple = (SpGistInnerTuple) PageGetItem(page,
412
0
                               PageGetItemId(page, xldata->offnumParent));
413
414
0
        spgUpdateNodeLink(parentTuple, xldata->nodeI,
415
0
                  blknoNew, xldata->offnumNew);
416
0
      }
417
0
      PageSetLSN(page, lsn);
418
0
      MarkBufferDirty(buffer);
419
0
    }
420
0
    if (BufferIsValid(buffer))
421
0
      UnlockReleaseBuffer(buffer);
422
423
    /*
424
     * Update parent downlink (if we didn't do it as part of the source or
425
     * destination page update already).
426
     */
427
0
    if (xldata->parentBlk == 2)
428
0
    {
429
0
      if (XLogReadBufferForRedo(record, 2, &buffer) == BLK_NEEDS_REDO)
430
0
      {
431
0
        SpGistInnerTuple parentTuple;
432
433
0
        page = BufferGetPage(buffer);
434
435
0
        parentTuple = (SpGistInnerTuple) PageGetItem(page,
436
0
                               PageGetItemId(page, xldata->offnumParent));
437
438
0
        spgUpdateNodeLink(parentTuple, xldata->nodeI,
439
0
                  blknoNew, xldata->offnumNew);
440
441
0
        PageSetLSN(page, lsn);
442
0
        MarkBufferDirty(buffer);
443
0
      }
444
0
      if (BufferIsValid(buffer))
445
0
        UnlockReleaseBuffer(buffer);
446
0
    }
447
0
  }
448
0
}
449
450
static void
451
spgRedoSplitTuple(XLogReaderState *record)
452
0
{
453
0
  XLogRecPtr  lsn = record->EndRecPtr;
454
0
  char     *ptr = XLogRecGetData(record);
455
0
  spgxlogSplitTuple *xldata = (spgxlogSplitTuple *) ptr;
456
0
  char     *prefixTuple;
457
0
  SpGistInnerTupleData prefixTupleHdr;
458
0
  char     *postfixTuple;
459
0
  SpGistInnerTupleData postfixTupleHdr;
460
0
  Buffer    buffer;
461
0
  Page    page;
462
0
  XLogRedoAction action;
463
464
0
  ptr += sizeof(spgxlogSplitTuple);
465
0
  prefixTuple = ptr;
466
  /* the prefix tuple is unaligned, so make a copy to access its header */
467
0
  memcpy(&prefixTupleHdr, prefixTuple, sizeof(SpGistInnerTupleData));
468
0
  ptr += prefixTupleHdr.size;
469
0
  postfixTuple = ptr;
470
  /* postfix tuple is also unaligned */
471
0
  memcpy(&postfixTupleHdr, postfixTuple, sizeof(SpGistInnerTupleData));
472
473
  /*
474
   * In normal operation we would have both pages locked simultaneously; but
475
   * in WAL replay it should be safe to update them one at a time, as long
476
   * as we do it in the right order.
477
   */
478
479
  /* insert postfix tuple first to avoid dangling link */
480
0
  if (!xldata->postfixBlkSame)
481
0
  {
482
0
    if (xldata->newPage)
483
0
    {
484
0
      buffer = XLogInitBufferForRedo(record, 1);
485
      /* SplitTuple is not used for nulls pages */
486
0
      SpGistInitBuffer(buffer, 0);
487
0
      action = BLK_NEEDS_REDO;
488
0
    }
489
0
    else
490
0
      action = XLogReadBufferForRedo(record, 1, &buffer);
491
0
    if (action == BLK_NEEDS_REDO)
492
0
    {
493
0
      page = BufferGetPage(buffer);
494
495
0
      addOrReplaceTuple(page, (Item) postfixTuple,
496
0
                postfixTupleHdr.size, xldata->offnumPostfix);
497
498
0
      PageSetLSN(page, lsn);
499
0
      MarkBufferDirty(buffer);
500
0
    }
501
0
    if (BufferIsValid(buffer))
502
0
      UnlockReleaseBuffer(buffer);
503
0
  }
504
505
  /* now handle the original page */
506
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
507
0
  {
508
0
    page = BufferGetPage(buffer);
509
510
0
    PageIndexTupleDelete(page, xldata->offnumPrefix);
511
0
    if (PageAddItem(page, (Item) prefixTuple, prefixTupleHdr.size,
512
0
            xldata->offnumPrefix, false, false) != xldata->offnumPrefix)
513
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
514
0
         prefixTupleHdr.size);
515
516
0
    if (xldata->postfixBlkSame)
517
0
      addOrReplaceTuple(page, (Item) postfixTuple,
518
0
                postfixTupleHdr.size,
519
0
                xldata->offnumPostfix);
520
521
0
    PageSetLSN(page, lsn);
522
0
    MarkBufferDirty(buffer);
523
0
  }
524
0
  if (BufferIsValid(buffer))
525
0
    UnlockReleaseBuffer(buffer);
526
0
}
527
528
static void
529
spgRedoPickSplit(XLogReaderState *record)
530
0
{
531
0
  XLogRecPtr  lsn = record->EndRecPtr;
532
0
  char     *ptr = XLogRecGetData(record);
533
0
  spgxlogPickSplit *xldata = (spgxlogPickSplit *) ptr;
534
0
  char     *innerTuple;
535
0
  SpGistInnerTupleData innerTupleHdr;
536
0
  SpGistState state;
537
0
  OffsetNumber *toDelete;
538
0
  OffsetNumber *toInsert;
539
0
  uint8    *leafPageSelect;
540
0
  Buffer    srcBuffer;
541
0
  Buffer    destBuffer;
542
0
  Buffer    innerBuffer;
543
0
  Page    srcPage;
544
0
  Page    destPage;
545
0
  Page    page;
546
0
  int     i;
547
0
  BlockNumber blknoInner;
548
0
  XLogRedoAction action;
549
550
0
  XLogRecGetBlockTag(record, 2, NULL, NULL, &blknoInner);
551
552
0
  fillFakeState(&state, xldata->stateSrc);
553
554
0
  ptr += SizeOfSpgxlogPickSplit;
555
0
  toDelete = (OffsetNumber *) ptr;
556
0
  ptr += sizeof(OffsetNumber) * xldata->nDelete;
557
0
  toInsert = (OffsetNumber *) ptr;
558
0
  ptr += sizeof(OffsetNumber) * xldata->nInsert;
559
0
  leafPageSelect = (uint8 *) ptr;
560
0
  ptr += sizeof(uint8) * xldata->nInsert;
561
562
0
  innerTuple = ptr;
563
  /* the inner tuple is unaligned, so make a copy to access its header */
564
0
  memcpy(&innerTupleHdr, innerTuple, sizeof(SpGistInnerTupleData));
565
0
  ptr += innerTupleHdr.size;
566
567
  /* now ptr points to the list of leaf tuples */
568
569
0
  if (xldata->isRootSplit)
570
0
  {
571
    /* when splitting root, we touch it only in the guise of new inner */
572
0
    srcBuffer = InvalidBuffer;
573
0
    srcPage = NULL;
574
0
  }
575
0
  else if (xldata->initSrc)
576
0
  {
577
    /* just re-init the source page */
578
0
    srcBuffer = XLogInitBufferForRedo(record, 0);
579
0
    srcPage = (Page) BufferGetPage(srcBuffer);
580
581
0
    SpGistInitBuffer(srcBuffer,
582
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
583
    /* don't update LSN etc till we're done with it */
584
0
  }
585
0
  else
586
0
  {
587
    /*
588
     * Delete the specified tuples from source page.  (In case we're in
589
     * Hot Standby, we need to hold lock on the page till we're done
590
     * inserting leaf tuples and the new inner tuple, else the added
591
     * redirect tuple will be a dangling link.)
592
     */
593
0
    srcPage = NULL;
594
0
    if (XLogReadBufferForRedo(record, 0, &srcBuffer) == BLK_NEEDS_REDO)
595
0
    {
596
0
      srcPage = BufferGetPage(srcBuffer);
597
598
      /*
599
       * We have it a bit easier here than in doPickSplit(), because we
600
       * know the inner tuple's location already, so we can inject the
601
       * correct redirection tuple now.
602
       */
603
0
      if (!state.isBuild)
604
0
        spgPageIndexMultiDelete(&state, srcPage,
605
0
                    toDelete, xldata->nDelete,
606
0
                    SPGIST_REDIRECT,
607
0
                    SPGIST_PLACEHOLDER,
608
0
                    blknoInner,
609
0
                    xldata->offnumInner);
610
0
      else
611
0
        spgPageIndexMultiDelete(&state, srcPage,
612
0
                    toDelete, xldata->nDelete,
613
0
                    SPGIST_PLACEHOLDER,
614
0
                    SPGIST_PLACEHOLDER,
615
0
                    InvalidBlockNumber,
616
0
                    InvalidOffsetNumber);
617
618
      /* don't update LSN etc till we're done with it */
619
0
    }
620
0
  }
621
622
  /* try to access dest page if any */
623
0
  if (!XLogRecHasBlockRef(record, 1))
624
0
  {
625
0
    destBuffer = InvalidBuffer;
626
0
    destPage = NULL;
627
0
  }
628
0
  else if (xldata->initDest)
629
0
  {
630
    /* just re-init the dest page */
631
0
    destBuffer = XLogInitBufferForRedo(record, 1);
632
0
    destPage = (Page) BufferGetPage(destBuffer);
633
634
0
    SpGistInitBuffer(destBuffer,
635
0
             SPGIST_LEAF | (xldata->storesNulls ? SPGIST_NULLS : 0));
636
    /* don't update LSN etc till we're done with it */
637
0
  }
638
0
  else
639
0
  {
640
    /*
641
     * We could probably release the page lock immediately in the
642
     * full-page-image case, but for safety let's hold it till later.
643
     */
644
0
    if (XLogReadBufferForRedo(record, 1, &destBuffer) == BLK_NEEDS_REDO)
645
0
      destPage = (Page) BufferGetPage(destBuffer);
646
0
    else
647
0
      destPage = NULL; /* don't do any page updates */
648
0
  }
649
650
  /* restore leaf tuples to src and/or dest page */
651
0
  for (i = 0; i < xldata->nInsert; i++)
652
0
  {
653
0
    char     *leafTuple;
654
0
    SpGistLeafTupleData leafTupleHdr;
655
656
    /* the tuples are not aligned, so must copy to access the size field. */
657
0
    leafTuple = ptr;
658
0
    memcpy(&leafTupleHdr, leafTuple, sizeof(SpGistLeafTupleData));
659
0
    ptr += leafTupleHdr.size;
660
661
0
    page = leafPageSelect[i] ? destPage : srcPage;
662
0
    if (page == NULL)
663
0
      continue;     /* no need to touch this page */
664
665
0
    addOrReplaceTuple(page, (Item) leafTuple, leafTupleHdr.size,
666
0
              toInsert[i]);
667
0
  }
668
669
  /* Now update src and dest page LSNs if needed */
670
0
  if (srcPage != NULL)
671
0
  {
672
0
    PageSetLSN(srcPage, lsn);
673
0
    MarkBufferDirty(srcBuffer);
674
0
  }
675
0
  if (destPage != NULL)
676
0
  {
677
0
    PageSetLSN(destPage, lsn);
678
0
    MarkBufferDirty(destBuffer);
679
0
  }
680
681
  /* restore new inner tuple */
682
0
  if (xldata->initInner)
683
0
  {
684
0
    innerBuffer = XLogInitBufferForRedo(record, 2);
685
0
    SpGistInitBuffer(innerBuffer, (xldata->storesNulls ? SPGIST_NULLS : 0));
686
0
    action = BLK_NEEDS_REDO;
687
0
  }
688
0
  else
689
0
    action = XLogReadBufferForRedo(record, 2, &innerBuffer);
690
691
0
  if (action == BLK_NEEDS_REDO)
692
0
  {
693
0
    page = BufferGetPage(innerBuffer);
694
695
0
    addOrReplaceTuple(page, (Item) innerTuple, innerTupleHdr.size,
696
0
              xldata->offnumInner);
697
698
    /* if inner is also parent, update link while we're here */
699
0
    if (xldata->innerIsParent)
700
0
    {
701
0
      SpGistInnerTuple parent;
702
703
0
      parent = (SpGistInnerTuple) PageGetItem(page,
704
0
                          PageGetItemId(page, xldata->offnumParent));
705
0
      spgUpdateNodeLink(parent, xldata->nodeI,
706
0
                blknoInner, xldata->offnumInner);
707
0
    }
708
709
0
    PageSetLSN(page, lsn);
710
0
    MarkBufferDirty(innerBuffer);
711
0
  }
712
0
  if (BufferIsValid(innerBuffer))
713
0
    UnlockReleaseBuffer(innerBuffer);
714
715
  /*
716
   * Now we can release the leaf-page locks.  It's okay to do this before
717
   * updating the parent downlink.
718
   */
719
0
  if (BufferIsValid(srcBuffer))
720
0
    UnlockReleaseBuffer(srcBuffer);
721
0
  if (BufferIsValid(destBuffer))
722
0
    UnlockReleaseBuffer(destBuffer);
723
724
  /* update parent downlink, unless we did it above */
725
0
  if (XLogRecHasBlockRef(record, 3))
726
0
  {
727
0
    Buffer    parentBuffer;
728
729
0
    if (XLogReadBufferForRedo(record, 3, &parentBuffer) == BLK_NEEDS_REDO)
730
0
    {
731
0
      SpGistInnerTuple parent;
732
733
0
      page = BufferGetPage(parentBuffer);
734
735
0
      parent = (SpGistInnerTuple) PageGetItem(page,
736
0
                          PageGetItemId(page, xldata->offnumParent));
737
0
      spgUpdateNodeLink(parent, xldata->nodeI,
738
0
                blknoInner, xldata->offnumInner);
739
740
0
      PageSetLSN(page, lsn);
741
0
      MarkBufferDirty(parentBuffer);
742
0
    }
743
0
    if (BufferIsValid(parentBuffer))
744
0
      UnlockReleaseBuffer(parentBuffer);
745
0
  }
746
0
  else
747
0
    Assert(xldata->innerIsParent || xldata->isRootSplit);
748
0
}
749
750
static void
751
spgRedoVacuumLeaf(XLogReaderState *record)
752
0
{
753
0
  XLogRecPtr  lsn = record->EndRecPtr;
754
0
  char     *ptr = XLogRecGetData(record);
755
0
  spgxlogVacuumLeaf *xldata = (spgxlogVacuumLeaf *) ptr;
756
0
  OffsetNumber *toDead;
757
0
  OffsetNumber *toPlaceholder;
758
0
  OffsetNumber *moveSrc;
759
0
  OffsetNumber *moveDest;
760
0
  OffsetNumber *chainSrc;
761
0
  OffsetNumber *chainDest;
762
0
  SpGistState state;
763
0
  Buffer    buffer;
764
0
  Page    page;
765
0
  int     i;
766
767
0
  fillFakeState(&state, xldata->stateSrc);
768
769
0
  ptr += SizeOfSpgxlogVacuumLeaf;
770
0
  toDead = (OffsetNumber *) ptr;
771
0
  ptr += sizeof(OffsetNumber) * xldata->nDead;
772
0
  toPlaceholder = (OffsetNumber *) ptr;
773
0
  ptr += sizeof(OffsetNumber) * xldata->nPlaceholder;
774
0
  moveSrc = (OffsetNumber *) ptr;
775
0
  ptr += sizeof(OffsetNumber) * xldata->nMove;
776
0
  moveDest = (OffsetNumber *) ptr;
777
0
  ptr += sizeof(OffsetNumber) * xldata->nMove;
778
0
  chainSrc = (OffsetNumber *) ptr;
779
0
  ptr += sizeof(OffsetNumber) * xldata->nChain;
780
0
  chainDest = (OffsetNumber *) ptr;
781
782
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
783
0
  {
784
0
    page = BufferGetPage(buffer);
785
786
0
    spgPageIndexMultiDelete(&state, page,
787
0
                toDead, xldata->nDead,
788
0
                SPGIST_DEAD, SPGIST_DEAD,
789
0
                InvalidBlockNumber,
790
0
                InvalidOffsetNumber);
791
792
0
    spgPageIndexMultiDelete(&state, page,
793
0
                toPlaceholder, xldata->nPlaceholder,
794
0
                SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
795
0
                InvalidBlockNumber,
796
0
                InvalidOffsetNumber);
797
798
    /* see comments in vacuumLeafPage() */
799
0
    for (i = 0; i < xldata->nMove; i++)
800
0
    {
801
0
      ItemId    idSrc = PageGetItemId(page, moveSrc[i]);
802
0
      ItemId    idDest = PageGetItemId(page, moveDest[i]);
803
0
      ItemIdData  tmp;
804
805
0
      tmp = *idSrc;
806
0
      *idSrc = *idDest;
807
0
      *idDest = tmp;
808
0
    }
809
810
0
    spgPageIndexMultiDelete(&state, page,
811
0
                moveSrc, xldata->nMove,
812
0
                SPGIST_PLACEHOLDER, SPGIST_PLACEHOLDER,
813
0
                InvalidBlockNumber,
814
0
                InvalidOffsetNumber);
815
816
0
    for (i = 0; i < xldata->nChain; i++)
817
0
    {
818
0
      SpGistLeafTuple lt;
819
820
0
      lt = (SpGistLeafTuple) PageGetItem(page,
821
0
                         PageGetItemId(page, chainSrc[i]));
822
0
      Assert(lt->tupstate == SPGIST_LIVE);
823
0
      SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
824
0
    }
825
826
0
    PageSetLSN(page, lsn);
827
0
    MarkBufferDirty(buffer);
828
0
  }
829
0
  if (BufferIsValid(buffer))
830
0
    UnlockReleaseBuffer(buffer);
831
0
}
832
833
static void
834
spgRedoVacuumRoot(XLogReaderState *record)
835
0
{
836
0
  XLogRecPtr  lsn = record->EndRecPtr;
837
0
  char     *ptr = XLogRecGetData(record);
838
0
  spgxlogVacuumRoot *xldata = (spgxlogVacuumRoot *) ptr;
839
0
  OffsetNumber *toDelete;
840
0
  Buffer    buffer;
841
0
  Page    page;
842
843
0
  toDelete = xldata->offsets;
844
845
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
846
0
  {
847
0
    page = BufferGetPage(buffer);
848
849
    /* The tuple numbers are in order */
850
0
    PageIndexMultiDelete(page, toDelete, xldata->nDelete);
851
852
0
    PageSetLSN(page, lsn);
853
0
    MarkBufferDirty(buffer);
854
0
  }
855
0
  if (BufferIsValid(buffer))
856
0
    UnlockReleaseBuffer(buffer);
857
0
}
858
859
static void
860
spgRedoVacuumRedirect(XLogReaderState *record)
861
0
{
862
0
  XLogRecPtr  lsn = record->EndRecPtr;
863
0
  char     *ptr = XLogRecGetData(record);
864
0
  spgxlogVacuumRedirect *xldata = (spgxlogVacuumRedirect *) ptr;
865
0
  OffsetNumber *itemToPlaceholder;
866
0
  Buffer    buffer;
867
868
0
  itemToPlaceholder = xldata->offsets;
869
870
  /*
871
   * If any redirection tuples are being removed, make sure there are no
872
   * live Hot Standby transactions that might need to see them.
873
   */
874
0
  if (InHotStandby)
875
0
  {
876
0
    RelFileLocator locator;
877
878
0
    XLogRecGetBlockTag(record, 0, &locator, NULL, NULL);
879
0
    ResolveRecoveryConflictWithSnapshot(xldata->snapshotConflictHorizon,
880
0
                      xldata->isCatalogRel,
881
0
                      locator);
882
0
  }
883
884
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
885
0
  {
886
0
    Page    page = BufferGetPage(buffer);
887
0
    SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
888
0
    int     i;
889
890
    /* Convert redirect pointers to plain placeholders */
891
0
    for (i = 0; i < xldata->nToPlaceholder; i++)
892
0
    {
893
0
      SpGistDeadTuple dt;
894
895
0
      dt = (SpGistDeadTuple) PageGetItem(page,
896
0
                         PageGetItemId(page, itemToPlaceholder[i]));
897
0
      Assert(dt->tupstate == SPGIST_REDIRECT);
898
0
      dt->tupstate = SPGIST_PLACEHOLDER;
899
0
      ItemPointerSetInvalid(&dt->pointer);
900
0
    }
901
902
0
    Assert(opaque->nRedirection >= xldata->nToPlaceholder);
903
0
    opaque->nRedirection -= xldata->nToPlaceholder;
904
0
    opaque->nPlaceholder += xldata->nToPlaceholder;
905
906
    /* Remove placeholder tuples at end of page */
907
0
    if (xldata->firstPlaceholder != InvalidOffsetNumber)
908
0
    {
909
0
      int     max = PageGetMaxOffsetNumber(page);
910
0
      OffsetNumber *toDelete;
911
912
0
      toDelete = palloc(sizeof(OffsetNumber) * max);
913
914
0
      for (i = xldata->firstPlaceholder; i <= max; i++)
915
0
        toDelete[i - xldata->firstPlaceholder] = i;
916
917
0
      i = max - xldata->firstPlaceholder + 1;
918
0
      Assert(opaque->nPlaceholder >= i);
919
0
      opaque->nPlaceholder -= i;
920
921
      /* The array is sorted, so can use PageIndexMultiDelete */
922
0
      PageIndexMultiDelete(page, toDelete, i);
923
924
0
      pfree(toDelete);
925
0
    }
926
927
0
    PageSetLSN(page, lsn);
928
0
    MarkBufferDirty(buffer);
929
0
  }
930
0
  if (BufferIsValid(buffer))
931
0
    UnlockReleaseBuffer(buffer);
932
0
}
933
934
void
935
spg_redo(XLogReaderState *record)
936
0
{
937
0
  uint8   info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
938
0
  MemoryContext oldCxt;
939
940
0
  oldCxt = MemoryContextSwitchTo(opCtx);
941
0
  switch (info)
942
0
  {
943
0
    case XLOG_SPGIST_ADD_LEAF:
944
0
      spgRedoAddLeaf(record);
945
0
      break;
946
0
    case XLOG_SPGIST_MOVE_LEAFS:
947
0
      spgRedoMoveLeafs(record);
948
0
      break;
949
0
    case XLOG_SPGIST_ADD_NODE:
950
0
      spgRedoAddNode(record);
951
0
      break;
952
0
    case XLOG_SPGIST_SPLIT_TUPLE:
953
0
      spgRedoSplitTuple(record);
954
0
      break;
955
0
    case XLOG_SPGIST_PICKSPLIT:
956
0
      spgRedoPickSplit(record);
957
0
      break;
958
0
    case XLOG_SPGIST_VACUUM_LEAF:
959
0
      spgRedoVacuumLeaf(record);
960
0
      break;
961
0
    case XLOG_SPGIST_VACUUM_ROOT:
962
0
      spgRedoVacuumRoot(record);
963
0
      break;
964
0
    case XLOG_SPGIST_VACUUM_REDIRECT:
965
0
      spgRedoVacuumRedirect(record);
966
0
      break;
967
0
    default:
968
0
      elog(PANIC, "spg_redo: unknown op code %u", info);
969
0
  }
970
971
0
  MemoryContextSwitchTo(oldCxt);
972
0
  MemoryContextReset(opCtx);
973
0
}
974
975
void
976
spg_xlog_startup(void)
977
0
{
978
0
  opCtx = AllocSetContextCreate(CurrentMemoryContext,
979
0
                  "SP-GiST temporary context",
980
0
                  ALLOCSET_DEFAULT_SIZES);
981
0
}
982
983
void
984
spg_xlog_cleanup(void)
985
0
{
986
0
  MemoryContextDelete(opCtx);
987
0
  opCtx = NULL;
988
0
}
989
990
/*
991
 * Mask a SpGist page before performing consistency checks on it.
992
 */
993
void
994
spg_mask(char *pagedata, BlockNumber blkno)
995
0
{
996
0
  Page    page = (Page) pagedata;
997
0
  PageHeader  pagehdr = (PageHeader) page;
998
999
0
  mask_page_lsn_and_checksum(page);
1000
1001
0
  mask_page_hint_bits(page);
1002
1003
  /*
1004
   * Mask the unused space, but only if the page's pd_lower appears to have
1005
   * been set correctly.
1006
   */
1007
0
  if (pagehdr->pd_lower >= SizeOfPageHeaderData)
1008
0
    mask_unused_space(page);
1009
0
}