Coverage Report

Created: 2025-06-13 06:06

/src/postgres/src/backend/access/gin/ginxlog.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * ginxlog.c
4
 *    WAL replay logic for inverted index.
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *       src/backend/access/gin/ginxlog.c
12
 *-------------------------------------------------------------------------
13
 */
14
#include "postgres.h"
15
16
#include "access/bufmask.h"
17
#include "access/gin_private.h"
18
#include "access/ginxlog.h"
19
#include "access/xlogutils.h"
20
#include "utils/memutils.h"
21
22
static MemoryContext opCtx;   /* working memory for operations */
23
24
static void
25
ginRedoClearIncompleteSplit(XLogReaderState *record, uint8 block_id)
26
0
{
27
0
  XLogRecPtr  lsn = record->EndRecPtr;
28
0
  Buffer    buffer;
29
0
  Page    page;
30
31
0
  if (XLogReadBufferForRedo(record, block_id, &buffer) == BLK_NEEDS_REDO)
32
0
  {
33
0
    page = (Page) BufferGetPage(buffer);
34
0
    GinPageGetOpaque(page)->flags &= ~GIN_INCOMPLETE_SPLIT;
35
36
0
    PageSetLSN(page, lsn);
37
0
    MarkBufferDirty(buffer);
38
0
  }
39
0
  if (BufferIsValid(buffer))
40
0
    UnlockReleaseBuffer(buffer);
41
0
}
42
43
static void
44
ginRedoCreatePTree(XLogReaderState *record)
45
0
{
46
0
  XLogRecPtr  lsn = record->EndRecPtr;
47
0
  ginxlogCreatePostingTree *data = (ginxlogCreatePostingTree *) XLogRecGetData(record);
48
0
  char     *ptr;
49
0
  Buffer    buffer;
50
0
  Page    page;
51
52
0
  buffer = XLogInitBufferForRedo(record, 0);
53
0
  page = (Page) BufferGetPage(buffer);
54
55
0
  GinInitBuffer(buffer, GIN_DATA | GIN_LEAF | GIN_COMPRESSED);
56
57
0
  ptr = XLogRecGetData(record) + sizeof(ginxlogCreatePostingTree);
58
59
  /* Place page data */
60
0
  memcpy(GinDataLeafPageGetPostingList(page), ptr, data->size);
61
62
0
  GinDataPageSetDataSize(page, data->size);
63
64
0
  PageSetLSN(page, lsn);
65
66
0
  MarkBufferDirty(buffer);
67
0
  UnlockReleaseBuffer(buffer);
68
0
}
69
70
static void
71
ginRedoInsertEntry(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
72
0
{
73
0
  Page    page = BufferGetPage(buffer);
74
0
  ginxlogInsertEntry *data = (ginxlogInsertEntry *) rdata;
75
0
  OffsetNumber offset = data->offset;
76
0
  IndexTuple  itup;
77
78
0
  if (rightblkno != InvalidBlockNumber)
79
0
  {
80
    /* update link to right page after split */
81
0
    Assert(!GinPageIsLeaf(page));
82
0
    Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
83
0
    itup = (IndexTuple) PageGetItem(page, PageGetItemId(page, offset));
84
0
    GinSetDownlink(itup, rightblkno);
85
0
  }
86
87
0
  if (data->isDelete)
88
0
  {
89
0
    Assert(GinPageIsLeaf(page));
90
0
    Assert(offset >= FirstOffsetNumber && offset <= PageGetMaxOffsetNumber(page));
91
0
    PageIndexTupleDelete(page, offset);
92
0
  }
93
94
0
  itup = &data->tuple;
95
96
0
  if (PageAddItem(page, (Item) itup, IndexTupleSize(itup), offset, false, false) == InvalidOffsetNumber)
97
0
  {
98
0
    RelFileLocator locator;
99
0
    ForkNumber  forknum;
100
0
    BlockNumber blknum;
101
102
0
    BufferGetTag(buffer, &locator, &forknum, &blknum);
103
0
    elog(ERROR, "failed to add item to index page in %u/%u/%u",
104
0
       locator.spcOid, locator.dbOid, locator.relNumber);
105
0
  }
106
0
}
107
108
/*
109
 * Redo recompression of posting list.  Doing all the changes in-place is not
110
 * always possible, because it might require more space than we've on the page.
111
 * Instead, once modification is required we copy unprocessed tail of the page
112
 * into separately allocated chunk of memory for further reading original
113
 * versions of segments.  Thanks to that we don't bother about moving page data
114
 * in-place.
115
 */
116
static void
117
ginRedoRecompress(Page page, ginxlogRecompressDataLeaf *data)
118
0
{
119
0
  int     actionno;
120
0
  int     segno;
121
0
  GinPostingList *oldseg;
122
0
  Pointer   segmentend;
123
0
  char     *walbuf;
124
0
  int     totalsize;
125
0
  Pointer   tailCopy = NULL;
126
0
  Pointer   writePtr;
127
0
  Pointer   segptr;
128
129
  /*
130
   * If the page is in pre-9.4 format, convert to new format first.
131
   */
132
0
  if (!GinPageIsCompressed(page))
133
0
  {
134
0
    ItemPointer uncompressed = (ItemPointer) GinDataPageGetData(page);
135
0
    int     nuncompressed = GinPageGetOpaque(page)->maxoff;
136
0
    int     npacked;
137
138
    /*
139
     * Empty leaf pages are deleted as part of vacuum, but leftmost and
140
     * rightmost pages are never deleted.  So, pg_upgrade'd from pre-9.4
141
     * instances might contain empty leaf pages, and we need to handle
142
     * them correctly.
143
     */
144
0
    if (nuncompressed > 0)
145
0
    {
146
0
      GinPostingList *plist;
147
148
0
      plist = ginCompressPostingList(uncompressed, nuncompressed,
149
0
                       BLCKSZ, &npacked);
150
0
      totalsize = SizeOfGinPostingList(plist);
151
152
0
      Assert(npacked == nuncompressed);
153
154
0
      memcpy(GinDataLeafPageGetPostingList(page), plist, totalsize);
155
0
    }
156
0
    else
157
0
    {
158
0
      totalsize = 0;
159
0
    }
160
161
0
    GinDataPageSetDataSize(page, totalsize);
162
0
    GinPageSetCompressed(page);
163
0
    GinPageGetOpaque(page)->maxoff = InvalidOffsetNumber;
164
0
  }
165
166
0
  oldseg = GinDataLeafPageGetPostingList(page);
167
0
  writePtr = (Pointer) oldseg;
168
0
  segmentend = (Pointer) oldseg + GinDataLeafPageGetPostingListSize(page);
169
0
  segno = 0;
170
171
0
  walbuf = ((char *) data) + sizeof(ginxlogRecompressDataLeaf);
172
0
  for (actionno = 0; actionno < data->nactions; actionno++)
173
0
  {
174
0
    uint8   a_segno = *((uint8 *) (walbuf++));
175
0
    uint8   a_action = *((uint8 *) (walbuf++));
176
0
    GinPostingList *newseg = NULL;
177
0
    int     newsegsize = 0;
178
0
    ItemPointerData *items = NULL;
179
0
    uint16    nitems = 0;
180
0
    ItemPointerData *olditems;
181
0
    int     nolditems;
182
0
    ItemPointerData *newitems;
183
0
    int     nnewitems;
184
0
    int     segsize;
185
186
    /* Extract all the information we need from the WAL record */
187
0
    if (a_action == GIN_SEGMENT_INSERT ||
188
0
      a_action == GIN_SEGMENT_REPLACE)
189
0
    {
190
0
      newseg = (GinPostingList *) walbuf;
191
0
      newsegsize = SizeOfGinPostingList(newseg);
192
0
      walbuf += SHORTALIGN(newsegsize);
193
0
    }
194
195
0
    if (a_action == GIN_SEGMENT_ADDITEMS)
196
0
    {
197
0
      memcpy(&nitems, walbuf, sizeof(uint16));
198
0
      walbuf += sizeof(uint16);
199
0
      items = (ItemPointerData *) walbuf;
200
0
      walbuf += nitems * sizeof(ItemPointerData);
201
0
    }
202
203
    /* Skip to the segment that this action concerns */
204
0
    Assert(segno <= a_segno);
205
0
    while (segno < a_segno)
206
0
    {
207
      /*
208
       * Once modification is started and page tail is copied, we've to
209
       * copy unmodified segments.
210
       */
211
0
      segsize = SizeOfGinPostingList(oldseg);
212
0
      if (tailCopy)
213
0
      {
214
0
        Assert(writePtr + segsize < PageGetSpecialPointer(page));
215
0
        memcpy(writePtr, (Pointer) oldseg, segsize);
216
0
      }
217
0
      writePtr += segsize;
218
0
      oldseg = GinNextPostingListSegment(oldseg);
219
0
      segno++;
220
0
    }
221
222
    /*
223
     * ADDITEMS action is handled like REPLACE, but the new segment to
224
     * replace the old one is reconstructed using the old segment from
225
     * disk and the new items from the WAL record.
226
     */
227
0
    if (a_action == GIN_SEGMENT_ADDITEMS)
228
0
    {
229
0
      int     npacked;
230
231
0
      olditems = ginPostingListDecode(oldseg, &nolditems);
232
233
0
      newitems = ginMergeItemPointers(items, nitems,
234
0
                      olditems, nolditems,
235
0
                      &nnewitems);
236
0
      Assert(nnewitems == nolditems + nitems);
237
238
0
      newseg = ginCompressPostingList(newitems, nnewitems,
239
0
                      BLCKSZ, &npacked);
240
0
      Assert(npacked == nnewitems);
241
242
0
      newsegsize = SizeOfGinPostingList(newseg);
243
0
      a_action = GIN_SEGMENT_REPLACE;
244
0
    }
245
246
0
    segptr = (Pointer) oldseg;
247
0
    if (segptr != segmentend)
248
0
      segsize = SizeOfGinPostingList(oldseg);
249
0
    else
250
0
    {
251
      /*
252
       * Positioned after the last existing segment. Only INSERTs
253
       * expected here.
254
       */
255
0
      Assert(a_action == GIN_SEGMENT_INSERT);
256
0
      segsize = 0;
257
0
    }
258
259
    /*
260
     * We're about to start modification of the page.  So, copy tail of
261
     * the page if it's not done already.
262
     */
263
0
    if (!tailCopy && segptr != segmentend)
264
0
    {
265
0
      int     tailSize = segmentend - segptr;
266
267
0
      tailCopy = (Pointer) palloc(tailSize);
268
0
      memcpy(tailCopy, segptr, tailSize);
269
0
      segptr = tailCopy;
270
0
      oldseg = (GinPostingList *) segptr;
271
0
      segmentend = segptr + tailSize;
272
0
    }
273
274
0
    switch (a_action)
275
0
    {
276
0
      case GIN_SEGMENT_DELETE:
277
0
        segptr += segsize;
278
0
        segno++;
279
0
        break;
280
281
0
      case GIN_SEGMENT_INSERT:
282
        /* copy the new segment in place */
283
0
        Assert(writePtr + newsegsize <= PageGetSpecialPointer(page));
284
0
        memcpy(writePtr, newseg, newsegsize);
285
0
        writePtr += newsegsize;
286
0
        break;
287
288
0
      case GIN_SEGMENT_REPLACE:
289
        /* copy the new version of segment in place */
290
0
        Assert(writePtr + newsegsize <= PageGetSpecialPointer(page));
291
0
        memcpy(writePtr, newseg, newsegsize);
292
0
        writePtr += newsegsize;
293
0
        segptr += segsize;
294
0
        segno++;
295
0
        break;
296
297
0
      default:
298
0
        elog(ERROR, "unexpected GIN leaf action: %u", a_action);
299
0
    }
300
0
    oldseg = (GinPostingList *) segptr;
301
0
  }
302
303
  /* Copy the rest of unmodified segments if any. */
304
0
  segptr = (Pointer) oldseg;
305
0
  if (segptr != segmentend && tailCopy)
306
0
  {
307
0
    int     restSize = segmentend - segptr;
308
309
0
    Assert(writePtr + restSize <= PageGetSpecialPointer(page));
310
0
    memcpy(writePtr, segptr, restSize);
311
0
    writePtr += restSize;
312
0
  }
313
314
0
  totalsize = writePtr - (Pointer) GinDataLeafPageGetPostingList(page);
315
0
  GinDataPageSetDataSize(page, totalsize);
316
0
}
317
318
static void
319
ginRedoInsertData(Buffer buffer, bool isLeaf, BlockNumber rightblkno, void *rdata)
320
0
{
321
0
  Page    page = BufferGetPage(buffer);
322
323
0
  if (isLeaf)
324
0
  {
325
0
    ginxlogRecompressDataLeaf *data = (ginxlogRecompressDataLeaf *) rdata;
326
327
0
    Assert(GinPageIsLeaf(page));
328
329
0
    ginRedoRecompress(page, data);
330
0
  }
331
0
  else
332
0
  {
333
0
    ginxlogInsertDataInternal *data = (ginxlogInsertDataInternal *) rdata;
334
0
    PostingItem *oldpitem;
335
336
0
    Assert(!GinPageIsLeaf(page));
337
338
    /* update link to right page after split */
339
0
    oldpitem = GinDataPageGetPostingItem(page, data->offset);
340
0
    PostingItemSetBlockNumber(oldpitem, rightblkno);
341
342
0
    GinDataPageAddPostingItem(page, &data->newitem, data->offset);
343
0
  }
344
0
}
345
346
static void
347
ginRedoInsert(XLogReaderState *record)
348
0
{
349
0
  XLogRecPtr  lsn = record->EndRecPtr;
350
0
  ginxlogInsert *data = (ginxlogInsert *) XLogRecGetData(record);
351
0
  Buffer    buffer;
352
#ifdef NOT_USED
353
  BlockNumber leftChildBlkno = InvalidBlockNumber;
354
#endif
355
0
  BlockNumber rightChildBlkno = InvalidBlockNumber;
356
0
  bool    isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
357
358
  /*
359
   * First clear incomplete-split flag on child page if this finishes a
360
   * split.
361
   */
362
0
  if (!isLeaf)
363
0
  {
364
0
    char     *payload = XLogRecGetData(record) + sizeof(ginxlogInsert);
365
366
#ifdef NOT_USED
367
    leftChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
368
#endif
369
0
    payload += sizeof(BlockIdData);
370
0
    rightChildBlkno = BlockIdGetBlockNumber((BlockId) payload);
371
0
    payload += sizeof(BlockIdData);
372
373
0
    ginRedoClearIncompleteSplit(record, 1);
374
0
  }
375
376
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
377
0
  {
378
0
    Page    page = BufferGetPage(buffer);
379
0
    Size    len;
380
0
    char     *payload = XLogRecGetBlockData(record, 0, &len);
381
382
    /* How to insert the payload is tree-type specific */
383
0
    if (data->flags & GIN_INSERT_ISDATA)
384
0
    {
385
0
      Assert(GinPageIsData(page));
386
0
      ginRedoInsertData(buffer, isLeaf, rightChildBlkno, payload);
387
0
    }
388
0
    else
389
0
    {
390
0
      Assert(!GinPageIsData(page));
391
0
      ginRedoInsertEntry(buffer, isLeaf, rightChildBlkno, payload);
392
0
    }
393
394
0
    PageSetLSN(page, lsn);
395
0
    MarkBufferDirty(buffer);
396
0
  }
397
0
  if (BufferIsValid(buffer))
398
0
    UnlockReleaseBuffer(buffer);
399
0
}
400
401
static void
402
ginRedoSplit(XLogReaderState *record)
403
0
{
404
0
  ginxlogSplit *data = (ginxlogSplit *) XLogRecGetData(record);
405
0
  Buffer    lbuffer,
406
0
        rbuffer,
407
0
        rootbuf;
408
0
  bool    isLeaf = (data->flags & GIN_INSERT_ISLEAF) != 0;
409
0
  bool    isRoot = (data->flags & GIN_SPLIT_ROOT) != 0;
410
411
  /*
412
   * First clear incomplete-split flag on child page if this finishes a
413
   * split
414
   */
415
0
  if (!isLeaf)
416
0
    ginRedoClearIncompleteSplit(record, 3);
417
418
0
  if (XLogReadBufferForRedo(record, 0, &lbuffer) != BLK_RESTORED)
419
0
    elog(ERROR, "GIN split record did not contain a full-page image of left page");
420
421
0
  if (XLogReadBufferForRedo(record, 1, &rbuffer) != BLK_RESTORED)
422
0
    elog(ERROR, "GIN split record did not contain a full-page image of right page");
423
424
0
  if (isRoot)
425
0
  {
426
0
    if (XLogReadBufferForRedo(record, 2, &rootbuf) != BLK_RESTORED)
427
0
      elog(ERROR, "GIN split record did not contain a full-page image of root page");
428
0
    UnlockReleaseBuffer(rootbuf);
429
0
  }
430
431
0
  UnlockReleaseBuffer(rbuffer);
432
0
  UnlockReleaseBuffer(lbuffer);
433
0
}
434
435
/*
436
 * VACUUM_PAGE record contains simply a full image of the page, similar to
437
 * an XLOG_FPI record.
438
 */
439
static void
440
ginRedoVacuumPage(XLogReaderState *record)
441
0
{
442
0
  Buffer    buffer;
443
444
0
  if (XLogReadBufferForRedo(record, 0, &buffer) != BLK_RESTORED)
445
0
  {
446
0
    elog(ERROR, "replay of gin entry tree page vacuum did not restore the page");
447
0
  }
448
0
  UnlockReleaseBuffer(buffer);
449
0
}
450
451
static void
452
ginRedoVacuumDataLeafPage(XLogReaderState *record)
453
0
{
454
0
  XLogRecPtr  lsn = record->EndRecPtr;
455
0
  Buffer    buffer;
456
457
0
  if (XLogReadBufferForRedo(record, 0, &buffer) == BLK_NEEDS_REDO)
458
0
  {
459
0
    Page    page = BufferGetPage(buffer);
460
0
    Size    len;
461
0
    ginxlogVacuumDataLeafPage *xlrec;
462
463
0
    xlrec = (ginxlogVacuumDataLeafPage *) XLogRecGetBlockData(record, 0, &len);
464
465
0
    Assert(GinPageIsLeaf(page));
466
0
    Assert(GinPageIsData(page));
467
468
0
    ginRedoRecompress(page, &xlrec->data);
469
0
    PageSetLSN(page, lsn);
470
0
    MarkBufferDirty(buffer);
471
0
  }
472
0
  if (BufferIsValid(buffer))
473
0
    UnlockReleaseBuffer(buffer);
474
0
}
475
476
static void
477
ginRedoDeletePage(XLogReaderState *record)
478
0
{
479
0
  XLogRecPtr  lsn = record->EndRecPtr;
480
0
  ginxlogDeletePage *data = (ginxlogDeletePage *) XLogRecGetData(record);
481
0
  Buffer    dbuffer;
482
0
  Buffer    pbuffer;
483
0
  Buffer    lbuffer;
484
0
  Page    page;
485
486
  /*
487
   * Lock left page first in order to prevent possible deadlock with
488
   * ginStepRight().
489
   */
490
0
  if (XLogReadBufferForRedo(record, 2, &lbuffer) == BLK_NEEDS_REDO)
491
0
  {
492
0
    page = BufferGetPage(lbuffer);
493
0
    Assert(GinPageIsData(page));
494
0
    GinPageGetOpaque(page)->rightlink = data->rightLink;
495
0
    PageSetLSN(page, lsn);
496
0
    MarkBufferDirty(lbuffer);
497
0
  }
498
499
0
  if (XLogReadBufferForRedo(record, 0, &dbuffer) == BLK_NEEDS_REDO)
500
0
  {
501
0
    page = BufferGetPage(dbuffer);
502
0
    Assert(GinPageIsData(page));
503
0
    GinPageSetDeleted(page);
504
0
    GinPageSetDeleteXid(page, data->deleteXid);
505
0
    PageSetLSN(page, lsn);
506
0
    MarkBufferDirty(dbuffer);
507
0
  }
508
509
0
  if (XLogReadBufferForRedo(record, 1, &pbuffer) == BLK_NEEDS_REDO)
510
0
  {
511
0
    page = BufferGetPage(pbuffer);
512
0
    Assert(GinPageIsData(page));
513
0
    Assert(!GinPageIsLeaf(page));
514
0
    GinPageDeletePostingItem(page, data->parentOffset);
515
0
    PageSetLSN(page, lsn);
516
0
    MarkBufferDirty(pbuffer);
517
0
  }
518
519
0
  if (BufferIsValid(lbuffer))
520
0
    UnlockReleaseBuffer(lbuffer);
521
0
  if (BufferIsValid(pbuffer))
522
0
    UnlockReleaseBuffer(pbuffer);
523
0
  if (BufferIsValid(dbuffer))
524
0
    UnlockReleaseBuffer(dbuffer);
525
0
}
526
527
static void
528
ginRedoUpdateMetapage(XLogReaderState *record)
529
0
{
530
0
  XLogRecPtr  lsn = record->EndRecPtr;
531
0
  ginxlogUpdateMeta *data = (ginxlogUpdateMeta *) XLogRecGetData(record);
532
0
  Buffer    metabuffer;
533
0
  Page    metapage;
534
0
  Buffer    buffer;
535
536
  /*
537
   * Restore the metapage. This is essentially the same as a full-page
538
   * image, so restore the metapage unconditionally without looking at the
539
   * LSN, to avoid torn page hazards.
540
   */
541
0
  metabuffer = XLogInitBufferForRedo(record, 0);
542
0
  Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
543
0
  metapage = BufferGetPage(metabuffer);
544
545
0
  GinInitMetabuffer(metabuffer);
546
0
  memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
547
0
  PageSetLSN(metapage, lsn);
548
0
  MarkBufferDirty(metabuffer);
549
550
0
  if (data->ntuples > 0)
551
0
  {
552
    /*
553
     * insert into tail page
554
     */
555
0
    if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
556
0
    {
557
0
      Page    page = BufferGetPage(buffer);
558
0
      OffsetNumber off;
559
0
      int     i;
560
0
      Size    tupsize;
561
0
      char     *payload;
562
0
      IndexTuple  tuples;
563
0
      Size    totaltupsize;
564
565
0
      payload = XLogRecGetBlockData(record, 1, &totaltupsize);
566
0
      tuples = (IndexTuple) payload;
567
568
0
      if (PageIsEmpty(page))
569
0
        off = FirstOffsetNumber;
570
0
      else
571
0
        off = OffsetNumberNext(PageGetMaxOffsetNumber(page));
572
573
0
      for (i = 0; i < data->ntuples; i++)
574
0
      {
575
0
        tupsize = IndexTupleSize(tuples);
576
577
0
        if (PageAddItem(page, (Item) tuples, tupsize, off,
578
0
                false, false) == InvalidOffsetNumber)
579
0
          elog(ERROR, "failed to add item to index page");
580
581
0
        tuples = (IndexTuple) (((char *) tuples) + tupsize);
582
583
0
        off++;
584
0
      }
585
0
      Assert(payload + totaltupsize == (char *) tuples);
586
587
      /*
588
       * Increase counter of heap tuples
589
       */
590
0
      GinPageGetOpaque(page)->maxoff++;
591
592
0
      PageSetLSN(page, lsn);
593
0
      MarkBufferDirty(buffer);
594
0
    }
595
0
    if (BufferIsValid(buffer))
596
0
      UnlockReleaseBuffer(buffer);
597
0
  }
598
0
  else if (data->prevTail != InvalidBlockNumber)
599
0
  {
600
    /*
601
     * New tail
602
     */
603
0
    if (XLogReadBufferForRedo(record, 1, &buffer) == BLK_NEEDS_REDO)
604
0
    {
605
0
      Page    page = BufferGetPage(buffer);
606
607
0
      GinPageGetOpaque(page)->rightlink = data->newRightlink;
608
609
0
      PageSetLSN(page, lsn);
610
0
      MarkBufferDirty(buffer);
611
0
    }
612
0
    if (BufferIsValid(buffer))
613
0
      UnlockReleaseBuffer(buffer);
614
0
  }
615
616
0
  UnlockReleaseBuffer(metabuffer);
617
0
}
618
619
static void
620
ginRedoInsertListPage(XLogReaderState *record)
621
0
{
622
0
  XLogRecPtr  lsn = record->EndRecPtr;
623
0
  ginxlogInsertListPage *data = (ginxlogInsertListPage *) XLogRecGetData(record);
624
0
  Buffer    buffer;
625
0
  Page    page;
626
0
  OffsetNumber l,
627
0
        off = FirstOffsetNumber;
628
0
  int     i,
629
0
        tupsize;
630
0
  char     *payload;
631
0
  IndexTuple  tuples;
632
0
  Size    totaltupsize;
633
634
  /* We always re-initialize the page. */
635
0
  buffer = XLogInitBufferForRedo(record, 0);
636
0
  page = BufferGetPage(buffer);
637
638
0
  GinInitBuffer(buffer, GIN_LIST);
639
0
  GinPageGetOpaque(page)->rightlink = data->rightlink;
640
0
  if (data->rightlink == InvalidBlockNumber)
641
0
  {
642
    /* tail of sublist */
643
0
    GinPageSetFullRow(page);
644
0
    GinPageGetOpaque(page)->maxoff = 1;
645
0
  }
646
0
  else
647
0
  {
648
0
    GinPageGetOpaque(page)->maxoff = 0;
649
0
  }
650
651
0
  payload = XLogRecGetBlockData(record, 0, &totaltupsize);
652
653
0
  tuples = (IndexTuple) payload;
654
0
  for (i = 0; i < data->ntuples; i++)
655
0
  {
656
0
    tupsize = IndexTupleSize(tuples);
657
658
0
    l = PageAddItem(page, (Item) tuples, tupsize, off, false, false);
659
660
0
    if (l == InvalidOffsetNumber)
661
0
      elog(ERROR, "failed to add item to index page");
662
663
0
    tuples = (IndexTuple) (((char *) tuples) + tupsize);
664
0
    off++;
665
0
  }
666
0
  Assert((char *) tuples == payload + totaltupsize);
667
668
0
  PageSetLSN(page, lsn);
669
0
  MarkBufferDirty(buffer);
670
671
0
  UnlockReleaseBuffer(buffer);
672
0
}
673
674
static void
675
ginRedoDeleteListPages(XLogReaderState *record)
676
0
{
677
0
  XLogRecPtr  lsn = record->EndRecPtr;
678
0
  ginxlogDeleteListPages *data = (ginxlogDeleteListPages *) XLogRecGetData(record);
679
0
  Buffer    metabuffer;
680
0
  Page    metapage;
681
0
  int     i;
682
683
0
  metabuffer = XLogInitBufferForRedo(record, 0);
684
0
  Assert(BufferGetBlockNumber(metabuffer) == GIN_METAPAGE_BLKNO);
685
0
  metapage = BufferGetPage(metabuffer);
686
687
0
  GinInitMetabuffer(metabuffer);
688
689
0
  memcpy(GinPageGetMeta(metapage), &data->metadata, sizeof(GinMetaPageData));
690
0
  PageSetLSN(metapage, lsn);
691
0
  MarkBufferDirty(metabuffer);
692
693
  /*
694
   * In normal operation, shiftList() takes exclusive lock on all the
695
   * pages-to-be-deleted simultaneously.  During replay, however, it should
696
   * be all right to lock them one at a time.  This is dependent on the fact
697
   * that we are deleting pages from the head of the list, and that readers
698
   * share-lock the next page before releasing the one they are on. So we
699
   * cannot get past a reader that is on, or due to visit, any page we are
700
   * going to delete.  New incoming readers will block behind our metapage
701
   * lock and then see a fully updated page list.
702
   *
703
   * No full-page images are taken of the deleted pages. Instead, they are
704
   * re-initialized as empty, deleted pages. Their right-links don't need to
705
   * be preserved, because no new readers can see the pages, as explained
706
   * above.
707
   */
708
0
  for (i = 0; i < data->ndeleted; i++)
709
0
  {
710
0
    Buffer    buffer;
711
0
    Page    page;
712
713
0
    buffer = XLogInitBufferForRedo(record, i + 1);
714
0
    page = BufferGetPage(buffer);
715
0
    GinInitBuffer(buffer, GIN_DELETED);
716
717
0
    PageSetLSN(page, lsn);
718
0
    MarkBufferDirty(buffer);
719
720
0
    UnlockReleaseBuffer(buffer);
721
0
  }
722
0
  UnlockReleaseBuffer(metabuffer);
723
0
}
724
725
void
726
gin_redo(XLogReaderState *record)
727
0
{
728
0
  uint8   info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
729
0
  MemoryContext oldCtx;
730
731
  /*
732
   * GIN indexes do not require any conflict processing. NB: If we ever
733
   * implement a similar optimization as we have in b-tree, and remove
734
   * killed tuples outside VACUUM, we'll need to handle that here.
735
   */
736
737
0
  oldCtx = MemoryContextSwitchTo(opCtx);
738
0
  switch (info)
739
0
  {
740
0
    case XLOG_GIN_CREATE_PTREE:
741
0
      ginRedoCreatePTree(record);
742
0
      break;
743
0
    case XLOG_GIN_INSERT:
744
0
      ginRedoInsert(record);
745
0
      break;
746
0
    case XLOG_GIN_SPLIT:
747
0
      ginRedoSplit(record);
748
0
      break;
749
0
    case XLOG_GIN_VACUUM_PAGE:
750
0
      ginRedoVacuumPage(record);
751
0
      break;
752
0
    case XLOG_GIN_VACUUM_DATA_LEAF_PAGE:
753
0
      ginRedoVacuumDataLeafPage(record);
754
0
      break;
755
0
    case XLOG_GIN_DELETE_PAGE:
756
0
      ginRedoDeletePage(record);
757
0
      break;
758
0
    case XLOG_GIN_UPDATE_META_PAGE:
759
0
      ginRedoUpdateMetapage(record);
760
0
      break;
761
0
    case XLOG_GIN_INSERT_LISTPAGE:
762
0
      ginRedoInsertListPage(record);
763
0
      break;
764
0
    case XLOG_GIN_DELETE_LISTPAGE:
765
0
      ginRedoDeleteListPages(record);
766
0
      break;
767
0
    default:
768
0
      elog(PANIC, "gin_redo: unknown op code %u", info);
769
0
  }
770
0
  MemoryContextSwitchTo(oldCtx);
771
0
  MemoryContextReset(opCtx);
772
0
}
773
774
void
775
gin_xlog_startup(void)
776
0
{
777
0
  opCtx = AllocSetContextCreate(CurrentMemoryContext,
778
0
                  "GIN recovery temporary context",
779
0
                  ALLOCSET_DEFAULT_SIZES);
780
0
}
781
782
void
783
gin_xlog_cleanup(void)
784
0
{
785
0
  MemoryContextDelete(opCtx);
786
0
  opCtx = NULL;
787
0
}
788
789
/*
790
 * Mask a GIN page before running consistency checks on it.
791
 */
792
void
793
gin_mask(char *pagedata, BlockNumber blkno)
794
0
{
795
0
  Page    page = (Page) pagedata;
796
0
  PageHeader  pagehdr = (PageHeader) page;
797
0
  GinPageOpaque opaque;
798
799
0
  mask_page_lsn_and_checksum(page);
800
0
  opaque = GinPageGetOpaque(page);
801
802
0
  mask_page_hint_bits(page);
803
804
  /*
805
   * For a GIN_DELETED page, the page is initialized to empty.  Hence, mask
806
   * the whole page content.  For other pages, mask the hole if pd_lower
807
   * appears to have been set correctly.
808
   */
809
0
  if (opaque->flags & GIN_DELETED)
810
0
    mask_page_content(page);
811
0
  else if (pagehdr->pd_lower > SizeOfPageHeaderData)
812
0
    mask_unused_space(page);
813
0
}