Coverage Report

Created: 2025-10-09 06:07

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/postgres/src/backend/access/gin/ginbtree.c
Line
Count
Source
1
/*-------------------------------------------------------------------------
2
 *
3
 * ginbtree.c
4
 *    page utilities routines for the postgres inverted index access method.
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *      src/backend/access/gin/ginbtree.c
12
 *-------------------------------------------------------------------------
13
 */
14
15
#include "postgres.h"
16
17
#include "access/gin_private.h"
18
#include "access/ginxlog.h"
19
#include "access/xloginsert.h"
20
#include "miscadmin.h"
21
#include "storage/predicate.h"
22
#include "utils/injection_point.h"
23
#include "utils/memutils.h"
24
#include "utils/rel.h"
25
26
static void ginFindParents(GinBtree btree, GinBtreeStack *stack);
27
static bool ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
28
               void *insertdata, BlockNumber updateblkno,
29
               Buffer childbuf, GinStatsData *buildStats);
30
static void ginFinishSplit(GinBtree btree, GinBtreeStack *stack,
31
               bool freestack, GinStatsData *buildStats);
32
static void ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack,
33
                GinStatsData *buildStats, int access);
34
35
/*
36
 * Lock buffer by needed method for search.
37
 */
38
int
39
ginTraverseLock(Buffer buffer, bool searchMode)
40
0
{
41
0
  Page    page;
42
0
  int     access = GIN_SHARE;
43
44
0
  LockBuffer(buffer, GIN_SHARE);
45
0
  page = BufferGetPage(buffer);
46
0
  if (GinPageIsLeaf(page))
47
0
  {
48
0
    if (searchMode == false)
49
0
    {
50
      /* we should relock our page */
51
0
      LockBuffer(buffer, GIN_UNLOCK);
52
0
      LockBuffer(buffer, GIN_EXCLUSIVE);
53
54
      /* But root can become non-leaf during relock */
55
0
      if (!GinPageIsLeaf(page))
56
0
      {
57
        /* restore old lock type (very rare) */
58
0
        LockBuffer(buffer, GIN_UNLOCK);
59
0
        LockBuffer(buffer, GIN_SHARE);
60
0
      }
61
0
      else
62
0
        access = GIN_EXCLUSIVE;
63
0
    }
64
0
  }
65
66
0
  return access;
67
0
}
68
69
/*
70
 * Descend the tree to the leaf page that contains or would contain the key
71
 * we're searching for. The key should already be filled in 'btree', in
72
 * tree-type specific manner. If btree->fullScan is true, descends to the
73
 * leftmost leaf page.
74
 *
75
 * If 'searchmode' is false, on return stack->buffer is exclusively locked,
76
 * and the stack represents the full path to the root. Otherwise stack->buffer
77
 * is share-locked, and stack->parent is NULL.
78
 *
79
 * If 'rootConflictCheck' is true, tree root is checked for serialization
80
 * conflict.
81
 */
82
GinBtreeStack *
83
ginFindLeafPage(GinBtree btree, bool searchMode,
84
        bool rootConflictCheck)
85
0
{
86
0
  GinBtreeStack *stack;
87
88
0
  stack = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
89
0
  stack->blkno = btree->rootBlkno;
90
0
  stack->buffer = ReadBuffer(btree->index, btree->rootBlkno);
91
0
  stack->parent = NULL;
92
0
  stack->predictNumber = 1;
93
94
0
  if (rootConflictCheck)
95
0
    CheckForSerializableConflictIn(btree->index, NULL, btree->rootBlkno);
96
97
0
  for (;;)
98
0
  {
99
0
    Page    page;
100
0
    BlockNumber child;
101
0
    int     access;
102
103
0
    stack->off = InvalidOffsetNumber;
104
105
0
    page = BufferGetPage(stack->buffer);
106
107
0
    access = ginTraverseLock(stack->buffer, searchMode);
108
109
    /*
110
     * If we're going to modify the tree, finish any incomplete splits we
111
     * encounter on the way.
112
     */
113
0
    if (!searchMode && GinPageIsIncompleteSplit(page))
114
0
      ginFinishOldSplit(btree, stack, NULL, access);
115
116
    /*
117
     * ok, page is correctly locked, we should check to move right ..,
118
     * root never has a right link, so small optimization
119
     */
120
0
    while (btree->fullScan == false && stack->blkno != btree->rootBlkno &&
121
0
         btree->isMoveRight(btree, page))
122
0
    {
123
0
      BlockNumber rightlink = GinPageGetOpaque(page)->rightlink;
124
125
0
      if (rightlink == InvalidBlockNumber)
126
        /* rightmost page */
127
0
        break;
128
129
0
      stack->buffer = ginStepRight(stack->buffer, btree->index, access);
130
0
      stack->blkno = rightlink;
131
0
      page = BufferGetPage(stack->buffer);
132
133
0
      if (!searchMode && GinPageIsIncompleteSplit(page))
134
0
        ginFinishOldSplit(btree, stack, NULL, access);
135
0
    }
136
137
0
    if (GinPageIsLeaf(page)) /* we found, return locked page */
138
0
      return stack;
139
140
    /* now we have correct buffer, try to find child */
141
0
    child = btree->findChildPage(btree, stack);
142
143
0
    LockBuffer(stack->buffer, GIN_UNLOCK);
144
0
    Assert(child != InvalidBlockNumber);
145
0
    Assert(stack->blkno != child);
146
147
0
    if (searchMode)
148
0
    {
149
      /* in search mode we may forget path to leaf */
150
0
      stack->blkno = child;
151
0
      stack->buffer = ReleaseAndReadBuffer(stack->buffer, btree->index, stack->blkno);
152
0
    }
153
0
    else
154
0
    {
155
0
      GinBtreeStack *ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
156
157
0
      ptr->parent = stack;
158
0
      stack = ptr;
159
0
      stack->blkno = child;
160
0
      stack->buffer = ReadBuffer(btree->index, stack->blkno);
161
0
      stack->predictNumber = 1;
162
0
    }
163
0
  }
164
0
}
165
166
/*
167
 * Step right from current page.
168
 *
169
 * The next page is locked first, before releasing the current page. This is
170
 * crucial to prevent concurrent VACUUM from deleting a page that we are about
171
 * to step to. (The lock-coupling isn't strictly necessary when we are
172
 * traversing the tree to find an insert location, because page deletion grabs
173
 * a cleanup lock on the root to prevent any concurrent inserts. See Page
174
 * deletion section in the README. But there's no harm in doing it always.)
175
 */
176
Buffer
177
ginStepRight(Buffer buffer, Relation index, int lockmode)
178
0
{
179
0
  Buffer    nextbuffer;
180
0
  Page    page = BufferGetPage(buffer);
181
0
  bool    isLeaf = GinPageIsLeaf(page);
182
0
  bool    isData = GinPageIsData(page);
183
0
  BlockNumber blkno = GinPageGetOpaque(page)->rightlink;
184
185
0
  nextbuffer = ReadBuffer(index, blkno);
186
0
  LockBuffer(nextbuffer, lockmode);
187
0
  UnlockReleaseBuffer(buffer);
188
189
  /* Sanity check that the page we stepped to is of similar kind. */
190
0
  page = BufferGetPage(nextbuffer);
191
0
  if (isLeaf != GinPageIsLeaf(page) || isData != GinPageIsData(page))
192
0
    elog(ERROR, "right sibling of GIN page is of different type");
193
194
0
  return nextbuffer;
195
0
}
196
197
void
198
freeGinBtreeStack(GinBtreeStack *stack)
199
0
{
200
0
  while (stack)
201
0
  {
202
0
    GinBtreeStack *tmp = stack->parent;
203
204
0
    if (stack->buffer != InvalidBuffer)
205
0
      ReleaseBuffer(stack->buffer);
206
207
0
    pfree(stack);
208
0
    stack = tmp;
209
0
  }
210
0
}
211
212
/*
213
 * Try to find parent for current stack position. Returns correct parent and
214
 * child's offset in stack->parent. The root page is never released, to
215
 * prevent conflict with vacuum process.
216
 */
217
static void
218
ginFindParents(GinBtree btree, GinBtreeStack *stack)
219
0
{
220
0
  Page    page;
221
0
  Buffer    buffer;
222
0
  BlockNumber blkno,
223
0
        leftmostBlkno;
224
0
  OffsetNumber offset;
225
0
  GinBtreeStack *root;
226
0
  GinBtreeStack *ptr;
227
228
  /*
229
   * Unwind the stack all the way up to the root, leaving only the root
230
   * item.
231
   *
232
   * Be careful not to release the pin on the root page! The pin on root
233
   * page is required to lock out concurrent vacuums on the tree.
234
   */
235
0
  root = stack->parent;
236
0
  while (root->parent)
237
0
  {
238
0
    ReleaseBuffer(root->buffer);
239
0
    root = root->parent;
240
0
  }
241
242
0
  Assert(root->blkno == btree->rootBlkno);
243
0
  Assert(BufferGetBlockNumber(root->buffer) == btree->rootBlkno);
244
0
  root->off = InvalidOffsetNumber;
245
246
0
  blkno = root->blkno;
247
0
  buffer = root->buffer;
248
249
0
  ptr = (GinBtreeStack *) palloc(sizeof(GinBtreeStack));
250
251
0
  for (;;)
252
0
  {
253
0
    LockBuffer(buffer, GIN_EXCLUSIVE);
254
0
    page = BufferGetPage(buffer);
255
0
    if (GinPageIsLeaf(page))
256
0
      elog(ERROR, "Lost path");
257
258
0
    if (GinPageIsIncompleteSplit(page))
259
0
    {
260
0
      Assert(blkno != btree->rootBlkno);
261
0
      ptr->blkno = blkno;
262
0
      ptr->buffer = buffer;
263
264
      /*
265
       * parent may be wrong, but if so, the ginFinishSplit call will
266
       * recurse to call ginFindParents again to fix it.
267
       */
268
0
      ptr->parent = root;
269
0
      ptr->off = InvalidOffsetNumber;
270
271
0
      ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
272
0
    }
273
274
0
    leftmostBlkno = btree->getLeftMostChild(btree, page);
275
276
0
    while ((offset = btree->findChildPtr(btree, page, stack->blkno, InvalidOffsetNumber)) == InvalidOffsetNumber)
277
0
    {
278
0
      blkno = GinPageGetOpaque(page)->rightlink;
279
0
      if (blkno == InvalidBlockNumber)
280
0
      {
281
        /* Link not present in this level */
282
0
        LockBuffer(buffer, GIN_UNLOCK);
283
        /* Do not release pin on the root buffer */
284
0
        if (buffer != root->buffer)
285
0
          ReleaseBuffer(buffer);
286
0
        break;
287
0
      }
288
0
      buffer = ginStepRight(buffer, btree->index, GIN_EXCLUSIVE);
289
0
      page = BufferGetPage(buffer);
290
291
      /* finish any incomplete splits, as above */
292
0
      if (GinPageIsIncompleteSplit(page))
293
0
      {
294
0
        Assert(blkno != btree->rootBlkno);
295
0
        ptr->blkno = blkno;
296
0
        ptr->buffer = buffer;
297
0
        ptr->parent = root;
298
0
        ptr->off = InvalidOffsetNumber;
299
300
0
        ginFinishOldSplit(btree, ptr, NULL, GIN_EXCLUSIVE);
301
0
      }
302
0
    }
303
304
0
    if (blkno != InvalidBlockNumber)
305
0
    {
306
0
      ptr->blkno = blkno;
307
0
      ptr->buffer = buffer;
308
0
      ptr->parent = root; /* it may be wrong, but in next call we will
309
                 * correct */
310
0
      ptr->off = offset;
311
0
      stack->parent = ptr;
312
0
      return;
313
0
    }
314
315
    /* Descend down to next level */
316
0
    blkno = leftmostBlkno;
317
0
    buffer = ReadBuffer(btree->index, blkno);
318
0
  }
319
0
}
320
321
/*
322
 * Insert a new item to a page.
323
 *
324
 * Returns true if the insertion was finished. On false, the page was split and
325
 * the parent needs to be updated. (A root split returns true as it doesn't
326
 * need any further action by the caller to complete.)
327
 *
328
 * When inserting a downlink to an internal page, 'childbuf' contains the
329
 * child page that was split. Its GIN_INCOMPLETE_SPLIT flag will be cleared
330
 * atomically with the insert. Also, the existing item at offset stack->off
331
 * in the target page is updated to point to updateblkno.
332
 *
333
 * stack->buffer is locked on entry, and is kept locked.
334
 * Likewise for childbuf, if given.
335
 */
336
static bool
337
ginPlaceToPage(GinBtree btree, GinBtreeStack *stack,
338
         void *insertdata, BlockNumber updateblkno,
339
         Buffer childbuf, GinStatsData *buildStats)
340
0
{
341
0
  Page    page = BufferGetPage(stack->buffer);
342
0
  bool    result;
343
0
  GinPlaceToPageRC rc;
344
0
  uint16    xlflags = 0;
345
0
  Page    childpage = NULL;
346
0
  Page    newlpage = NULL,
347
0
        newrpage = NULL;
348
0
  void     *ptp_workspace = NULL;
349
0
  MemoryContext tmpCxt;
350
0
  MemoryContext oldCxt;
351
352
  /*
353
   * We do all the work of this function and its subfunctions in a temporary
354
   * memory context.  This avoids leakages and simplifies APIs, since some
355
   * subfunctions allocate storage that has to survive until we've finished
356
   * the WAL insertion.
357
   */
358
0
  tmpCxt = AllocSetContextCreate(CurrentMemoryContext,
359
0
                   "ginPlaceToPage temporary context",
360
0
                   ALLOCSET_DEFAULT_SIZES);
361
0
  oldCxt = MemoryContextSwitchTo(tmpCxt);
362
363
0
  if (GinPageIsData(page))
364
0
    xlflags |= GIN_INSERT_ISDATA;
365
0
  if (GinPageIsLeaf(page))
366
0
  {
367
0
    xlflags |= GIN_INSERT_ISLEAF;
368
0
    Assert(!BufferIsValid(childbuf));
369
0
    Assert(updateblkno == InvalidBlockNumber);
370
0
  }
371
0
  else
372
0
  {
373
0
    Assert(BufferIsValid(childbuf));
374
0
    Assert(updateblkno != InvalidBlockNumber);
375
0
    childpage = BufferGetPage(childbuf);
376
0
  }
377
378
  /*
379
   * See if the incoming tuple will fit on the page.  beginPlaceToPage will
380
   * decide if the page needs to be split, and will compute the split
381
   * contents if so.  See comments for beginPlaceToPage and execPlaceToPage
382
   * functions for more details of the API here.
383
   */
384
0
  rc = btree->beginPlaceToPage(btree, stack->buffer, stack,
385
0
                 insertdata, updateblkno,
386
0
                 &ptp_workspace,
387
0
                 &newlpage, &newrpage);
388
389
0
  if (rc == GPTP_NO_WORK)
390
0
  {
391
    /* Nothing to do */
392
0
    result = true;
393
0
  }
394
0
  else if (rc == GPTP_INSERT)
395
0
  {
396
    /* It will fit, perform the insertion */
397
0
    START_CRIT_SECTION();
398
399
0
    if (RelationNeedsWAL(btree->index) && !btree->isBuild)
400
0
      XLogBeginInsert();
401
402
    /*
403
     * Perform the page update, dirty and register stack->buffer, and
404
     * register any extra WAL data.
405
     */
406
0
    btree->execPlaceToPage(btree, stack->buffer, stack,
407
0
                 insertdata, updateblkno, ptp_workspace);
408
409
    /* An insert to an internal page finishes the split of the child. */
410
0
    if (BufferIsValid(childbuf))
411
0
    {
412
0
      GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
413
0
      MarkBufferDirty(childbuf);
414
0
      if (RelationNeedsWAL(btree->index) && !btree->isBuild)
415
0
        XLogRegisterBuffer(1, childbuf, REGBUF_STANDARD);
416
0
    }
417
418
0
    if (RelationNeedsWAL(btree->index) && !btree->isBuild)
419
0
    {
420
0
      XLogRecPtr  recptr;
421
0
      ginxlogInsert xlrec;
422
0
      BlockIdData childblknos[2];
423
424
0
      xlrec.flags = xlflags;
425
426
0
      XLogRegisterData(&xlrec, sizeof(ginxlogInsert));
427
428
      /*
429
       * Log information about child if this was an insertion of a
430
       * downlink.
431
       */
432
0
      if (BufferIsValid(childbuf))
433
0
      {
434
0
        BlockIdSet(&childblknos[0], BufferGetBlockNumber(childbuf));
435
0
        BlockIdSet(&childblknos[1], GinPageGetOpaque(childpage)->rightlink);
436
0
        XLogRegisterData(childblknos,
437
0
                 sizeof(BlockIdData) * 2);
438
0
      }
439
440
0
      recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_INSERT);
441
0
      PageSetLSN(page, recptr);
442
0
      if (BufferIsValid(childbuf))
443
0
        PageSetLSN(childpage, recptr);
444
0
    }
445
446
0
    END_CRIT_SECTION();
447
448
    /* Insertion is complete. */
449
0
    result = true;
450
0
  }
451
0
  else if (rc == GPTP_SPLIT)
452
0
  {
453
    /*
454
     * Didn't fit, need to split.  The split has been computed in newlpage
455
     * and newrpage, which are pointers to palloc'd pages, not associated
456
     * with buffers.  stack->buffer is not touched yet.
457
     */
458
0
    Buffer    rbuffer;
459
0
    BlockNumber savedRightLink;
460
0
    ginxlogSplit data;
461
0
    Buffer    lbuffer = InvalidBuffer;
462
0
    Page    newrootpg = NULL;
463
464
    /* Get a new index page to become the right page */
465
0
    rbuffer = GinNewBuffer(btree->index);
466
467
    /* During index build, count the new page */
468
0
    if (buildStats)
469
0
    {
470
0
      if (btree->isData)
471
0
        buildStats->nDataPages++;
472
0
      else
473
0
        buildStats->nEntryPages++;
474
0
    }
475
476
0
    savedRightLink = GinPageGetOpaque(page)->rightlink;
477
478
    /* Begin setting up WAL record */
479
0
    data.locator = btree->index->rd_locator;
480
0
    data.flags = xlflags;
481
0
    if (BufferIsValid(childbuf))
482
0
    {
483
0
      data.leftChildBlkno = BufferGetBlockNumber(childbuf);
484
0
      data.rightChildBlkno = GinPageGetOpaque(childpage)->rightlink;
485
0
    }
486
0
    else
487
0
      data.leftChildBlkno = data.rightChildBlkno = InvalidBlockNumber;
488
489
0
    if (stack->parent == NULL)
490
0
    {
491
      /*
492
       * splitting the root, so we need to allocate new left page and
493
       * place pointers to left and right page on root page.
494
       */
495
0
      lbuffer = GinNewBuffer(btree->index);
496
497
      /* During index build, count the new left page */
498
0
      if (buildStats)
499
0
      {
500
0
        if (btree->isData)
501
0
          buildStats->nDataPages++;
502
0
        else
503
0
          buildStats->nEntryPages++;
504
0
      }
505
506
0
      data.rrlink = InvalidBlockNumber;
507
0
      data.flags |= GIN_SPLIT_ROOT;
508
509
0
      GinPageGetOpaque(newrpage)->rightlink = InvalidBlockNumber;
510
0
      GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
511
512
      /*
513
       * Construct a new root page containing downlinks to the new left
514
       * and right pages.  (Do this in a temporary copy rather than
515
       * overwriting the original page directly, since we're not in the
516
       * critical section yet.)
517
       */
518
0
      newrootpg = PageGetTempPage(newrpage);
519
0
      GinInitPage(newrootpg, GinPageGetOpaque(newlpage)->flags & ~(GIN_LEAF | GIN_COMPRESSED), BLCKSZ);
520
521
0
      btree->fillRoot(btree, newrootpg,
522
0
              BufferGetBlockNumber(lbuffer), newlpage,
523
0
              BufferGetBlockNumber(rbuffer), newrpage);
524
525
0
      if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
526
0
      {
527
528
0
        PredicateLockPageSplit(btree->index,
529
0
                     BufferGetBlockNumber(stack->buffer),
530
0
                     BufferGetBlockNumber(lbuffer));
531
532
0
        PredicateLockPageSplit(btree->index,
533
0
                     BufferGetBlockNumber(stack->buffer),
534
0
                     BufferGetBlockNumber(rbuffer));
535
0
      }
536
0
    }
537
0
    else
538
0
    {
539
      /* splitting a non-root page */
540
0
      data.rrlink = savedRightLink;
541
542
0
      GinPageGetOpaque(newrpage)->rightlink = savedRightLink;
543
0
      GinPageGetOpaque(newlpage)->flags |= GIN_INCOMPLETE_SPLIT;
544
0
      GinPageGetOpaque(newlpage)->rightlink = BufferGetBlockNumber(rbuffer);
545
546
0
      if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
547
0
      {
548
549
0
        PredicateLockPageSplit(btree->index,
550
0
                     BufferGetBlockNumber(stack->buffer),
551
0
                     BufferGetBlockNumber(rbuffer));
552
0
      }
553
0
    }
554
555
    /*
556
     * OK, we have the new contents of the left page in a temporary copy
557
     * now (newlpage), and likewise for the new contents of the
558
     * newly-allocated right block. The original page is still unchanged.
559
     *
560
     * If this is a root split, we also have a temporary page containing
561
     * the new contents of the root.
562
     */
563
564
0
    START_CRIT_SECTION();
565
566
0
    MarkBufferDirty(rbuffer);
567
0
    MarkBufferDirty(stack->buffer);
568
569
    /*
570
     * Restore the temporary copies over the real buffers.
571
     */
572
0
    if (stack->parent == NULL)
573
0
    {
574
      /* Splitting the root, three pages to update */
575
0
      MarkBufferDirty(lbuffer);
576
0
      memcpy(page, newrootpg, BLCKSZ);
577
0
      memcpy(BufferGetPage(lbuffer), newlpage, BLCKSZ);
578
0
      memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
579
0
    }
580
0
    else
581
0
    {
582
      /* Normal split, only two pages to update */
583
0
      memcpy(page, newlpage, BLCKSZ);
584
0
      memcpy(BufferGetPage(rbuffer), newrpage, BLCKSZ);
585
0
    }
586
587
    /* We also clear childbuf's INCOMPLETE_SPLIT flag, if passed */
588
0
    if (BufferIsValid(childbuf))
589
0
    {
590
0
      GinPageGetOpaque(childpage)->flags &= ~GIN_INCOMPLETE_SPLIT;
591
0
      MarkBufferDirty(childbuf);
592
0
    }
593
594
    /* write WAL record */
595
0
    if (RelationNeedsWAL(btree->index) && !btree->isBuild)
596
0
    {
597
0
      XLogRecPtr  recptr;
598
599
0
      XLogBeginInsert();
600
601
      /*
602
       * We just take full page images of all the split pages. Splits
603
       * are uncommon enough that it's not worth complicating the code
604
       * to be more efficient.
605
       */
606
0
      if (stack->parent == NULL)
607
0
      {
608
0
        XLogRegisterBuffer(0, lbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
609
0
        XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
610
0
        XLogRegisterBuffer(2, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
611
0
      }
612
0
      else
613
0
      {
614
0
        XLogRegisterBuffer(0, stack->buffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
615
0
        XLogRegisterBuffer(1, rbuffer, REGBUF_FORCE_IMAGE | REGBUF_STANDARD);
616
0
      }
617
0
      if (BufferIsValid(childbuf))
618
0
        XLogRegisterBuffer(3, childbuf, REGBUF_STANDARD);
619
620
0
      XLogRegisterData(&data, sizeof(ginxlogSplit));
621
622
0
      recptr = XLogInsert(RM_GIN_ID, XLOG_GIN_SPLIT);
623
624
0
      PageSetLSN(page, recptr);
625
0
      PageSetLSN(BufferGetPage(rbuffer), recptr);
626
0
      if (stack->parent == NULL)
627
0
        PageSetLSN(BufferGetPage(lbuffer), recptr);
628
0
      if (BufferIsValid(childbuf))
629
0
        PageSetLSN(childpage, recptr);
630
0
    }
631
0
    END_CRIT_SECTION();
632
633
    /*
634
     * We can release the locks/pins on the new pages now, but keep
635
     * stack->buffer locked.  childbuf doesn't get unlocked either.
636
     */
637
0
    UnlockReleaseBuffer(rbuffer);
638
0
    if (stack->parent == NULL)
639
0
      UnlockReleaseBuffer(lbuffer);
640
641
    /*
642
     * If we split the root, we're done. Otherwise the split is not
643
     * complete until the downlink for the new page has been inserted to
644
     * the parent.
645
     */
646
0
    result = (stack->parent == NULL);
647
0
  }
648
0
  else
649
0
  {
650
0
    elog(ERROR, "invalid return code from GIN beginPlaceToPage method: %d", rc);
651
0
    result = false;      /* keep compiler quiet */
652
0
  }
653
654
  /* Clean up temp context */
655
0
  MemoryContextSwitchTo(oldCxt);
656
0
  MemoryContextDelete(tmpCxt);
657
658
0
  return result;
659
0
}
660
661
/*
662
 * Finish a split by inserting the downlink for the new page to parent.
663
 *
664
 * On entry, stack->buffer is exclusively locked.
665
 *
666
 * If freestack is true, all the buffers are released and unlocked as we
667
 * crawl up the tree, and 'stack' is freed. Otherwise stack->buffer is kept
668
 * locked, and stack is unmodified, except for possibly moving right to find
669
 * the correct parent of page.
670
 */
671
static void
672
ginFinishSplit(GinBtree btree, GinBtreeStack *stack, bool freestack,
673
         GinStatsData *buildStats)
674
0
{
675
0
  Page    page;
676
0
  bool    done;
677
0
  bool    first = true;
678
679
  /* this loop crawls up the stack until the insertion is complete */
680
0
  do
681
0
  {
682
0
    GinBtreeStack *parent = stack->parent;
683
0
    void     *insertdata;
684
0
    BlockNumber updateblkno;
685
686
#ifdef USE_INJECTION_POINTS
687
    if (GinPageIsLeaf(BufferGetPage(stack->buffer)))
688
      INJECTION_POINT("gin-leave-leaf-split-incomplete", NULL);
689
    else
690
      INJECTION_POINT("gin-leave-internal-split-incomplete", NULL);
691
#endif
692
693
    /* search parent to lock */
694
0
    LockBuffer(parent->buffer, GIN_EXCLUSIVE);
695
696
    /*
697
     * If the parent page was incompletely split, finish that split first,
698
     * then continue with the current one.
699
     *
700
     * Note: we have to finish *all* incomplete splits we encounter, even
701
     * if we have to move right. Otherwise we might choose as the target a
702
     * page that has no downlink in the parent, and splitting it further
703
     * would fail.
704
     */
705
0
    if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
706
0
      ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
707
708
    /* move right if it's needed */
709
0
    page = BufferGetPage(parent->buffer);
710
0
    while ((parent->off = btree->findChildPtr(btree, page, stack->blkno, parent->off)) == InvalidOffsetNumber)
711
0
    {
712
0
      if (GinPageRightMost(page))
713
0
      {
714
        /*
715
         * rightmost page, but we don't find parent, we should use
716
         * plain search...
717
         */
718
0
        LockBuffer(parent->buffer, GIN_UNLOCK);
719
0
        ginFindParents(btree, stack);
720
0
        parent = stack->parent;
721
0
        Assert(parent != NULL);
722
0
        break;
723
0
      }
724
725
0
      parent->buffer = ginStepRight(parent->buffer, btree->index, GIN_EXCLUSIVE);
726
0
      parent->blkno = BufferGetBlockNumber(parent->buffer);
727
0
      page = BufferGetPage(parent->buffer);
728
729
0
      if (GinPageIsIncompleteSplit(BufferGetPage(parent->buffer)))
730
0
        ginFinishOldSplit(btree, parent, buildStats, GIN_EXCLUSIVE);
731
0
    }
732
733
    /* insert the downlink */
734
0
    insertdata = btree->prepareDownlink(btree, stack->buffer);
735
0
    updateblkno = GinPageGetOpaque(BufferGetPage(stack->buffer))->rightlink;
736
0
    done = ginPlaceToPage(btree, parent,
737
0
                insertdata, updateblkno,
738
0
                stack->buffer, buildStats);
739
0
    pfree(insertdata);
740
741
    /*
742
     * If the caller requested to free the stack, unlock and release the
743
     * child buffer now. Otherwise keep it pinned and locked, but if we
744
     * have to recurse up the tree, we can unlock the upper pages, only
745
     * keeping the page at the bottom of the stack locked.
746
     */
747
0
    if (!first || freestack)
748
0
      LockBuffer(stack->buffer, GIN_UNLOCK);
749
0
    if (freestack)
750
0
    {
751
0
      ReleaseBuffer(stack->buffer);
752
0
      pfree(stack);
753
0
    }
754
0
    stack = parent;
755
756
0
    first = false;
757
0
  } while (!done);
758
759
  /* unlock the parent */
760
0
  LockBuffer(stack->buffer, GIN_UNLOCK);
761
762
0
  if (freestack)
763
0
    freeGinBtreeStack(stack);
764
0
}
765
766
/*
767
 * An entry point to ginFinishSplit() that is used when we stumble upon an
768
 * existing incompletely split page in the tree, as opposed to completing a
769
 * split that we just made ourselves. The difference is that stack->buffer may
770
 * be merely share-locked on entry, and will be upgraded to exclusive mode.
771
 *
772
 * Note: Upgrading the lock momentarily releases it. Doing that in a scan
773
 * would not be OK, because a concurrent VACUUM might delete the page while
774
 * we're not holding the lock. It's OK in an insert, though, because VACUUM
775
 * has a different mechanism that prevents it from running concurrently with
776
 * inserts. (Namely, it holds a cleanup lock on the root.)
777
 */
778
static void
779
ginFinishOldSplit(GinBtree btree, GinBtreeStack *stack, GinStatsData *buildStats, int access)
780
0
{
781
0
  INJECTION_POINT("gin-finish-incomplete-split", NULL);
782
0
  elog(DEBUG1, "finishing incomplete split of block %u in gin index \"%s\"",
783
0
     stack->blkno, RelationGetRelationName(btree->index));
784
785
0
  if (access == GIN_SHARE)
786
0
  {
787
0
    LockBuffer(stack->buffer, GIN_UNLOCK);
788
0
    LockBuffer(stack->buffer, GIN_EXCLUSIVE);
789
790
0
    if (!GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
791
0
    {
792
      /*
793
       * Someone else already completed the split while we were not
794
       * holding the lock.
795
       */
796
0
      return;
797
0
    }
798
0
  }
799
800
0
  ginFinishSplit(btree, stack, false, buildStats);
801
0
}
802
803
/*
804
 * Insert a value to tree described by stack.
805
 *
806
 * The value to be inserted is given in 'insertdata'. Its format depends
807
 * on whether this is an entry or data tree, ginInsertValue just passes it
808
 * through to the tree-specific callback function.
809
 *
810
 * During an index build, buildStats is non-null and the counters it contains
811
 * are incremented as needed.
812
 *
813
 * NB: the passed-in stack is freed, as though by freeGinBtreeStack.
814
 */
815
void
816
ginInsertValue(GinBtree btree, GinBtreeStack *stack, void *insertdata,
817
         GinStatsData *buildStats)
818
0
{
819
0
  bool    done;
820
821
  /* If the leaf page was incompletely split, finish the split first */
822
0
  if (GinPageIsIncompleteSplit(BufferGetPage(stack->buffer)))
823
0
    ginFinishOldSplit(btree, stack, buildStats, GIN_EXCLUSIVE);
824
825
0
  done = ginPlaceToPage(btree, stack,
826
0
              insertdata, InvalidBlockNumber,
827
0
              InvalidBuffer, buildStats);
828
0
  if (done)
829
0
  {
830
0
    LockBuffer(stack->buffer, GIN_UNLOCK);
831
0
    freeGinBtreeStack(stack);
832
0
  }
833
0
  else
834
0
    ginFinishSplit(btree, stack, true, buildStats);
835
0
}