Coverage Report

Created: 2025-06-13 06:06

/src/postgres/src/backend/access/spgist/spgdoinsert.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * spgdoinsert.c
4
 *    implementation of insert algorithm
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *      src/backend/access/spgist/spgdoinsert.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
16
#include "postgres.h"
17
18
#include "access/genam.h"
19
#include "access/spgist_private.h"
20
#include "access/spgxlog.h"
21
#include "access/xloginsert.h"
22
#include "common/int.h"
23
#include "common/pg_prng.h"
24
#include "miscadmin.h"
25
#include "storage/bufmgr.h"
26
#include "utils/rel.h"
27
28
29
/*
30
 * SPPageDesc tracks all info about a page we are inserting into.  In some
31
 * situations it actually identifies a tuple, or even a specific node within
32
 * an inner tuple.  But any of the fields can be invalid.  If the buffer
33
 * field is valid, it implies we hold pin and exclusive lock on that buffer.
34
 * page pointer should be valid exactly when buffer is.
35
 */
36
typedef struct SPPageDesc
37
{
38
  BlockNumber blkno;      /* block number, or InvalidBlockNumber */
39
  Buffer    buffer;     /* page's buffer number, or InvalidBuffer */
40
  Page    page;     /* pointer to page buffer, or NULL */
41
  OffsetNumber offnum;    /* offset of tuple, or InvalidOffsetNumber */
42
  int     node;     /* node number within inner tuple, or -1 */
43
} SPPageDesc;
44
45
46
/*
47
 * Set the item pointer in the nodeN'th entry in inner tuple tup.  This
48
 * is used to update the parent inner tuple's downlink after a move or
49
 * split operation.
50
 */
51
void
52
spgUpdateNodeLink(SpGistInnerTuple tup, int nodeN,
53
          BlockNumber blkno, OffsetNumber offset)
54
0
{
55
0
  int     i;
56
0
  SpGistNodeTuple node;
57
58
0
  SGITITERATE(tup, i, node)
59
0
  {
60
0
    if (i == nodeN)
61
0
    {
62
0
      ItemPointerSet(&node->t_tid, blkno, offset);
63
0
      return;
64
0
    }
65
0
  }
66
67
0
  elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
68
0
     nodeN);
69
0
}
70
71
/*
72
 * Form a new inner tuple containing one more node than the given one, with
73
 * the specified label datum, inserted at offset "offset" in the node array.
74
 * The new tuple's prefix is the same as the old one's.
75
 *
76
 * Note that the new node initially has an invalid downlink.  We'll find a
77
 * page to point it to later.
78
 */
79
static SpGistInnerTuple
80
addNode(SpGistState *state, SpGistInnerTuple tuple, Datum label, int offset)
81
0
{
82
0
  SpGistNodeTuple node,
83
0
         *nodes;
84
0
  int     i;
85
86
  /* if offset is negative, insert at end */
87
0
  if (offset < 0)
88
0
    offset = tuple->nNodes;
89
0
  else if (offset > tuple->nNodes)
90
0
    elog(ERROR, "invalid offset for adding node to SPGiST inner tuple");
91
92
0
  nodes = palloc(sizeof(SpGistNodeTuple) * (tuple->nNodes + 1));
93
0
  SGITITERATE(tuple, i, node)
94
0
  {
95
0
    if (i < offset)
96
0
      nodes[i] = node;
97
0
    else
98
0
      nodes[i + 1] = node;
99
0
  }
100
101
0
  nodes[offset] = spgFormNodeTuple(state, label, false);
102
103
0
  return spgFormInnerTuple(state,
104
0
               (tuple->prefixSize > 0),
105
0
               SGITDATUM(tuple, state),
106
0
               tuple->nNodes + 1,
107
0
               nodes);
108
0
}
109
110
/* qsort comparator for sorting OffsetNumbers */
111
static int
112
cmpOffsetNumbers(const void *a, const void *b)
113
0
{
114
0
  return pg_cmp_u16(*(const OffsetNumber *) a, *(const OffsetNumber *) b);
115
0
}
116
117
/*
118
 * Delete multiple tuples from an index page, preserving tuple offset numbers.
119
 *
120
 * The first tuple in the given list is replaced with a dead tuple of type
121
 * "firststate" (REDIRECT/DEAD/PLACEHOLDER); the remaining tuples are replaced
122
 * with dead tuples of type "reststate".  If either firststate or reststate
123
 * is REDIRECT, blkno/offnum specify where to link to.
124
 *
125
 * NB: this is used during WAL replay, so beware of trying to make it too
126
 * smart.  In particular, it shouldn't use "state" except for calling
127
 * spgFormDeadTuple().  This is also used in a critical section, so no
128
 * pallocs either!
129
 */
130
void
131
spgPageIndexMultiDelete(SpGistState *state, Page page,
132
            OffsetNumber *itemnos, int nitems,
133
            int firststate, int reststate,
134
            BlockNumber blkno, OffsetNumber offnum)
135
0
{
136
0
  OffsetNumber firstItem;
137
0
  OffsetNumber sortednos[MaxIndexTuplesPerPage];
138
0
  SpGistDeadTuple tuple = NULL;
139
0
  int     i;
140
141
0
  if (nitems == 0)
142
0
    return;         /* nothing to do */
143
144
  /*
145
   * For efficiency we want to use PageIndexMultiDelete, which requires the
146
   * targets to be listed in sorted order, so we have to sort the itemnos
147
   * array.  (This also greatly simplifies the math for reinserting the
148
   * replacement tuples.)  However, we must not scribble on the caller's
149
   * array, so we have to make a copy.
150
   */
151
0
  memcpy(sortednos, itemnos, sizeof(OffsetNumber) * nitems);
152
0
  if (nitems > 1)
153
0
    qsort(sortednos, nitems, sizeof(OffsetNumber), cmpOffsetNumbers);
154
155
0
  PageIndexMultiDelete(page, sortednos, nitems);
156
157
0
  firstItem = itemnos[0];
158
159
0
  for (i = 0; i < nitems; i++)
160
0
  {
161
0
    OffsetNumber itemno = sortednos[i];
162
0
    int     tupstate;
163
164
0
    tupstate = (itemno == firstItem) ? firststate : reststate;
165
0
    if (tuple == NULL || tuple->tupstate != tupstate)
166
0
      tuple = spgFormDeadTuple(state, tupstate, blkno, offnum);
167
168
0
    if (PageAddItem(page, (Item) tuple, tuple->size,
169
0
            itemno, false, false) != itemno)
170
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
171
0
         tuple->size);
172
173
0
    if (tupstate == SPGIST_REDIRECT)
174
0
      SpGistPageGetOpaque(page)->nRedirection++;
175
0
    else if (tupstate == SPGIST_PLACEHOLDER)
176
0
      SpGistPageGetOpaque(page)->nPlaceholder++;
177
0
  }
178
0
}
179
180
/*
181
 * Update the parent inner tuple's downlink, and mark the parent buffer
182
 * dirty (this must be the last change to the parent page in the current
183
 * WAL action).
184
 */
185
static void
186
saveNodeLink(Relation index, SPPageDesc *parent,
187
       BlockNumber blkno, OffsetNumber offnum)
188
0
{
189
0
  SpGistInnerTuple innerTuple;
190
191
0
  innerTuple = (SpGistInnerTuple) PageGetItem(parent->page,
192
0
                        PageGetItemId(parent->page, parent->offnum));
193
194
0
  spgUpdateNodeLink(innerTuple, parent->node, blkno, offnum);
195
196
0
  MarkBufferDirty(parent->buffer);
197
0
}
198
199
/*
200
 * Add a leaf tuple to a leaf page where there is known to be room for it
201
 */
202
static void
203
addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
204
       SPPageDesc *current, SPPageDesc *parent, bool isNulls, bool isNew)
205
0
{
206
0
  spgxlogAddLeaf xlrec;
207
208
0
  xlrec.newPage = isNew;
209
0
  xlrec.storesNulls = isNulls;
210
211
  /* these will be filled below as needed */
212
0
  xlrec.offnumLeaf = InvalidOffsetNumber;
213
0
  xlrec.offnumHeadLeaf = InvalidOffsetNumber;
214
0
  xlrec.offnumParent = InvalidOffsetNumber;
215
0
  xlrec.nodeI = 0;
216
217
0
  START_CRIT_SECTION();
218
219
0
  if (current->offnum == InvalidOffsetNumber ||
220
0
    SpGistBlockIsRoot(current->blkno))
221
0
  {
222
    /* Tuple is not part of a chain */
223
0
    SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
224
0
    current->offnum = SpGistPageAddNewItem(state, current->page,
225
0
                         (Item) leafTuple, leafTuple->size,
226
0
                         NULL, false);
227
228
0
    xlrec.offnumLeaf = current->offnum;
229
230
    /* Must update parent's downlink if any */
231
0
    if (parent->buffer != InvalidBuffer)
232
0
    {
233
0
      xlrec.offnumParent = parent->offnum;
234
0
      xlrec.nodeI = parent->node;
235
236
0
      saveNodeLink(index, parent, current->blkno, current->offnum);
237
0
    }
238
0
  }
239
0
  else
240
0
  {
241
    /*
242
     * Tuple must be inserted into existing chain.  We mustn't change the
243
     * chain's head address, but we don't need to chase the entire chain
244
     * to put the tuple at the end; we can insert it second.
245
     *
246
     * Also, it's possible that the "chain" consists only of a DEAD tuple,
247
     * in which case we should replace the DEAD tuple in-place.
248
     */
249
0
    SpGistLeafTuple head;
250
0
    OffsetNumber offnum;
251
252
0
    head = (SpGistLeafTuple) PageGetItem(current->page,
253
0
                       PageGetItemId(current->page, current->offnum));
254
0
    if (head->tupstate == SPGIST_LIVE)
255
0
    {
256
0
      SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
257
0
      offnum = SpGistPageAddNewItem(state, current->page,
258
0
                      (Item) leafTuple, leafTuple->size,
259
0
                      NULL, false);
260
261
      /*
262
       * re-get head of list because it could have been moved on page,
263
       * and set new second element
264
       */
265
0
      head = (SpGistLeafTuple) PageGetItem(current->page,
266
0
                         PageGetItemId(current->page, current->offnum));
267
0
      SGLT_SET_NEXTOFFSET(head, offnum);
268
269
0
      xlrec.offnumLeaf = offnum;
270
0
      xlrec.offnumHeadLeaf = current->offnum;
271
0
    }
272
0
    else if (head->tupstate == SPGIST_DEAD)
273
0
    {
274
0
      SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
275
0
      PageIndexTupleDelete(current->page, current->offnum);
276
0
      if (PageAddItem(current->page,
277
0
              (Item) leafTuple, leafTuple->size,
278
0
              current->offnum, false, false) != current->offnum)
279
0
        elog(ERROR, "failed to add item of size %u to SPGiST index page",
280
0
           leafTuple->size);
281
282
      /* WAL replay distinguishes this case by equal offnums */
283
0
      xlrec.offnumLeaf = current->offnum;
284
0
      xlrec.offnumHeadLeaf = current->offnum;
285
0
    }
286
0
    else
287
0
      elog(ERROR, "unexpected SPGiST tuple state: %d", head->tupstate);
288
0
  }
289
290
0
  MarkBufferDirty(current->buffer);
291
292
0
  if (RelationNeedsWAL(index) && !state->isBuild)
293
0
  {
294
0
    XLogRecPtr  recptr;
295
0
    int     flags;
296
297
0
    XLogBeginInsert();
298
0
    XLogRegisterData(&xlrec, sizeof(xlrec));
299
0
    XLogRegisterData(leafTuple, leafTuple->size);
300
301
0
    flags = REGBUF_STANDARD;
302
0
    if (xlrec.newPage)
303
0
      flags |= REGBUF_WILL_INIT;
304
0
    XLogRegisterBuffer(0, current->buffer, flags);
305
0
    if (xlrec.offnumParent != InvalidOffsetNumber)
306
0
      XLogRegisterBuffer(1, parent->buffer, REGBUF_STANDARD);
307
308
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_LEAF);
309
310
0
    PageSetLSN(current->page, recptr);
311
312
    /* update parent only if we actually changed it */
313
0
    if (xlrec.offnumParent != InvalidOffsetNumber)
314
0
    {
315
0
      PageSetLSN(parent->page, recptr);
316
0
    }
317
0
  }
318
319
0
  END_CRIT_SECTION();
320
0
}
321
322
/*
323
 * Count the number and total size of leaf tuples in the chain starting at
324
 * current->offnum.  Return number into *nToSplit and total size as function
325
 * result.
326
 *
327
 * Klugy special case when considering the root page (i.e., root is a leaf
328
 * page, but we're about to split for the first time): return fake large
329
 * values to force spgdoinsert() to take the doPickSplit rather than
330
 * moveLeafs code path.  moveLeafs is not prepared to deal with root page.
331
 */
332
static int
333
checkSplitConditions(Relation index, SpGistState *state,
334
           SPPageDesc *current, int *nToSplit)
335
0
{
336
0
  int     i,
337
0
        n = 0,
338
0
        totalSize = 0;
339
340
0
  if (SpGistBlockIsRoot(current->blkno))
341
0
  {
342
    /* return impossible values to force split */
343
0
    *nToSplit = BLCKSZ;
344
0
    return BLCKSZ;
345
0
  }
346
347
0
  i = current->offnum;
348
0
  while (i != InvalidOffsetNumber)
349
0
  {
350
0
    SpGistLeafTuple it;
351
352
0
    Assert(i >= FirstOffsetNumber &&
353
0
         i <= PageGetMaxOffsetNumber(current->page));
354
0
    it = (SpGistLeafTuple) PageGetItem(current->page,
355
0
                       PageGetItemId(current->page, i));
356
0
    if (it->tupstate == SPGIST_LIVE)
357
0
    {
358
0
      n++;
359
0
      totalSize += it->size + sizeof(ItemIdData);
360
0
    }
361
0
    else if (it->tupstate == SPGIST_DEAD)
362
0
    {
363
      /* We could see a DEAD tuple as first/only chain item */
364
0
      Assert(i == current->offnum);
365
0
      Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
366
      /* Don't count it in result, because it won't go to other page */
367
0
    }
368
0
    else
369
0
      elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
370
371
0
    i = SGLT_GET_NEXTOFFSET(it);
372
0
  }
373
374
0
  *nToSplit = n;
375
376
0
  return totalSize;
377
0
}
378
379
/*
380
 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
381
 * but the chain has to be moved because there's not enough room to add
382
 * newLeafTuple to its page.  We use this method when the chain contains
383
 * very little data so a split would be inefficient.  We are sure we can
384
 * fit the chain plus newLeafTuple on one other page.
385
 */
386
static void
387
moveLeafs(Relation index, SpGistState *state,
388
      SPPageDesc *current, SPPageDesc *parent,
389
      SpGistLeafTuple newLeafTuple, bool isNulls)
390
0
{
391
0
  int     i,
392
0
        nDelete,
393
0
        nInsert,
394
0
        size;
395
0
  Buffer    nbuf;
396
0
  Page    npage;
397
0
  OffsetNumber r = InvalidOffsetNumber,
398
0
        startOffset = InvalidOffsetNumber;
399
0
  bool    replaceDead = false;
400
0
  OffsetNumber *toDelete;
401
0
  OffsetNumber *toInsert;
402
0
  BlockNumber nblkno;
403
0
  spgxlogMoveLeafs xlrec;
404
0
  char     *leafdata,
405
0
         *leafptr;
406
407
  /* This doesn't work on root page */
408
0
  Assert(parent->buffer != InvalidBuffer);
409
0
  Assert(parent->buffer != current->buffer);
410
411
  /* Locate the tuples to be moved, and count up the space needed */
412
0
  i = PageGetMaxOffsetNumber(current->page);
413
0
  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * i);
414
0
  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * (i + 1));
415
416
0
  size = newLeafTuple->size + sizeof(ItemIdData);
417
418
0
  nDelete = 0;
419
0
  i = current->offnum;
420
0
  while (i != InvalidOffsetNumber)
421
0
  {
422
0
    SpGistLeafTuple it;
423
424
0
    Assert(i >= FirstOffsetNumber &&
425
0
         i <= PageGetMaxOffsetNumber(current->page));
426
0
    it = (SpGistLeafTuple) PageGetItem(current->page,
427
0
                       PageGetItemId(current->page, i));
428
429
0
    if (it->tupstate == SPGIST_LIVE)
430
0
    {
431
0
      toDelete[nDelete] = i;
432
0
      size += it->size + sizeof(ItemIdData);
433
0
      nDelete++;
434
0
    }
435
0
    else if (it->tupstate == SPGIST_DEAD)
436
0
    {
437
      /* We could see a DEAD tuple as first/only chain item */
438
0
      Assert(i == current->offnum);
439
0
      Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
440
      /* We don't want to move it, so don't count it in size */
441
0
      toDelete[nDelete] = i;
442
0
      nDelete++;
443
0
      replaceDead = true;
444
0
    }
445
0
    else
446
0
      elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
447
448
0
    i = SGLT_GET_NEXTOFFSET(it);
449
0
  }
450
451
  /* Find a leaf page that will hold them */
452
0
  nbuf = SpGistGetBuffer(index, GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
453
0
               size, &xlrec.newPage);
454
0
  npage = BufferGetPage(nbuf);
455
0
  nblkno = BufferGetBlockNumber(nbuf);
456
0
  Assert(nblkno != current->blkno);
457
458
0
  leafdata = leafptr = palloc(size);
459
460
0
  START_CRIT_SECTION();
461
462
  /* copy all the old tuples to new page, unless they're dead */
463
0
  nInsert = 0;
464
0
  if (!replaceDead)
465
0
  {
466
0
    for (i = 0; i < nDelete; i++)
467
0
    {
468
0
      SpGistLeafTuple it;
469
470
0
      it = (SpGistLeafTuple) PageGetItem(current->page,
471
0
                         PageGetItemId(current->page, toDelete[i]));
472
0
      Assert(it->tupstate == SPGIST_LIVE);
473
474
      /*
475
       * Update chain link (notice the chain order gets reversed, but we
476
       * don't care).  We're modifying the tuple on the source page
477
       * here, but it's okay since we're about to delete it.
478
       */
479
0
      SGLT_SET_NEXTOFFSET(it, r);
480
481
0
      r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
482
0
                   &startOffset, false);
483
484
0
      toInsert[nInsert] = r;
485
0
      nInsert++;
486
487
      /* save modified tuple into leafdata as well */
488
0
      memcpy(leafptr, it, it->size);
489
0
      leafptr += it->size;
490
0
    }
491
0
  }
492
493
  /* add the new tuple as well */
494
0
  SGLT_SET_NEXTOFFSET(newLeafTuple, r);
495
0
  r = SpGistPageAddNewItem(state, npage,
496
0
               (Item) newLeafTuple, newLeafTuple->size,
497
0
               &startOffset, false);
498
0
  toInsert[nInsert] = r;
499
0
  nInsert++;
500
0
  memcpy(leafptr, newLeafTuple, newLeafTuple->size);
501
0
  leafptr += newLeafTuple->size;
502
503
  /*
504
   * Now delete the old tuples, leaving a redirection pointer behind for the
505
   * first one, unless we're doing an index build; in which case there can't
506
   * be any concurrent scan so we need not provide a redirect.
507
   */
508
0
  spgPageIndexMultiDelete(state, current->page, toDelete, nDelete,
509
0
              state->isBuild ? SPGIST_PLACEHOLDER : SPGIST_REDIRECT,
510
0
              SPGIST_PLACEHOLDER,
511
0
              nblkno, r);
512
513
  /* Update parent's downlink and mark parent page dirty */
514
0
  saveNodeLink(index, parent, nblkno, r);
515
516
  /* Mark the leaf pages too */
517
0
  MarkBufferDirty(current->buffer);
518
0
  MarkBufferDirty(nbuf);
519
520
0
  if (RelationNeedsWAL(index) && !state->isBuild)
521
0
  {
522
0
    XLogRecPtr  recptr;
523
524
    /* prepare WAL info */
525
0
    STORE_STATE(state, xlrec.stateSrc);
526
527
0
    xlrec.nMoves = nDelete;
528
0
    xlrec.replaceDead = replaceDead;
529
0
    xlrec.storesNulls = isNulls;
530
531
0
    xlrec.offnumParent = parent->offnum;
532
0
    xlrec.nodeI = parent->node;
533
534
0
    XLogBeginInsert();
535
0
    XLogRegisterData(&xlrec, SizeOfSpgxlogMoveLeafs);
536
0
    XLogRegisterData(toDelete,
537
0
             sizeof(OffsetNumber) * nDelete);
538
0
    XLogRegisterData(toInsert,
539
0
             sizeof(OffsetNumber) * nInsert);
540
0
    XLogRegisterData(leafdata, leafptr - leafdata);
541
542
0
    XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
543
0
    XLogRegisterBuffer(1, nbuf, REGBUF_STANDARD | (xlrec.newPage ? REGBUF_WILL_INIT : 0));
544
0
    XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
545
546
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_MOVE_LEAFS);
547
548
0
    PageSetLSN(current->page, recptr);
549
0
    PageSetLSN(npage, recptr);
550
0
    PageSetLSN(parent->page, recptr);
551
0
  }
552
553
0
  END_CRIT_SECTION();
554
555
  /* Update local free-space cache and release new buffer */
556
0
  SpGistSetLastUsedPage(index, nbuf);
557
0
  UnlockReleaseBuffer(nbuf);
558
0
}
559
560
/*
561
 * Update previously-created redirection tuple with appropriate destination
562
 *
563
 * We use this when it's not convenient to know the destination first.
564
 * The tuple should have been made with the "impossible" destination of
565
 * the metapage.
566
 */
567
static void
568
setRedirectionTuple(SPPageDesc *current, OffsetNumber position,
569
          BlockNumber blkno, OffsetNumber offnum)
570
0
{
571
0
  SpGistDeadTuple dt;
572
573
0
  dt = (SpGistDeadTuple) PageGetItem(current->page,
574
0
                     PageGetItemId(current->page, position));
575
0
  Assert(dt->tupstate == SPGIST_REDIRECT);
576
0
  Assert(ItemPointerGetBlockNumber(&dt->pointer) == SPGIST_METAPAGE_BLKNO);
577
0
  ItemPointerSet(&dt->pointer, blkno, offnum);
578
0
}
579
580
/*
581
 * Test to see if the user-defined picksplit function failed to do its job,
582
 * ie, it put all the leaf tuples into the same node.
583
 * If so, randomly divide the tuples into several nodes (all with the same
584
 * label) and return true to select allTheSame mode for this inner tuple.
585
 *
586
 * (This code is also used to forcibly select allTheSame mode for nulls.)
587
 *
588
 * If we know that the leaf tuples wouldn't all fit on one page, then we
589
 * exclude the last tuple (which is the incoming new tuple that forced a split)
590
 * from the check to see if more than one node is used.  The reason for this
591
 * is that if the existing tuples are put into only one chain, then even if
592
 * we move them all to an empty page, there would still not be room for the
593
 * new tuple, so we'd get into an infinite loop of picksplit attempts.
594
 * Forcing allTheSame mode dodges this problem by ensuring the old tuples will
595
 * be split across pages.  (Exercise for the reader: figure out why this
596
 * fixes the problem even when there is only one old tuple.)
597
 */
598
static bool
599
checkAllTheSame(spgPickSplitIn *in, spgPickSplitOut *out, bool tooBig,
600
        bool *includeNew)
601
0
{
602
0
  int     theNode;
603
0
  int     limit;
604
0
  int     i;
605
606
  /* For the moment, assume we can include the new leaf tuple */
607
0
  *includeNew = true;
608
609
  /* If there's only the new leaf tuple, don't select allTheSame mode */
610
0
  if (in->nTuples <= 1)
611
0
    return false;
612
613
  /* If tuple set doesn't fit on one page, ignore the new tuple in test */
614
0
  limit = tooBig ? in->nTuples - 1 : in->nTuples;
615
616
  /* Check to see if more than one node is populated */
617
0
  theNode = out->mapTuplesToNodes[0];
618
0
  for (i = 1; i < limit; i++)
619
0
  {
620
0
    if (out->mapTuplesToNodes[i] != theNode)
621
0
      return false;
622
0
  }
623
624
  /* Nope, so override the picksplit function's decisions */
625
626
  /* If the new tuple is in its own node, it can't be included in split */
627
0
  if (tooBig && out->mapTuplesToNodes[in->nTuples - 1] != theNode)
628
0
    *includeNew = false;
629
630
0
  out->nNodes = 8;      /* arbitrary number of child nodes */
631
632
  /* Random assignment of tuples to nodes (note we include new tuple) */
633
0
  for (i = 0; i < in->nTuples; i++)
634
0
    out->mapTuplesToNodes[i] = i % out->nNodes;
635
636
  /* The opclass may not use node labels, but if it does, duplicate 'em */
637
0
  if (out->nodeLabels)
638
0
  {
639
0
    Datum   theLabel = out->nodeLabels[theNode];
640
641
0
    out->nodeLabels = (Datum *) palloc(sizeof(Datum) * out->nNodes);
642
0
    for (i = 0; i < out->nNodes; i++)
643
0
      out->nodeLabels[i] = theLabel;
644
0
  }
645
646
  /* We don't touch the prefix or the leaf tuple datum assignments */
647
648
0
  return true;
649
0
}
650
651
/*
652
 * current points to a leaf-tuple chain that we wanted to add newLeafTuple to,
653
 * but the chain has to be split because there's not enough room to add
654
 * newLeafTuple to its page.
655
 *
656
 * This function splits the leaf tuple set according to picksplit's rules,
657
 * creating one or more new chains that are spread across the current page
658
 * and an additional leaf page (we assume that two leaf pages will be
659
 * sufficient).  A new inner tuple is created, and the parent downlink
660
 * pointer is updated to point to that inner tuple instead of the leaf chain.
661
 *
662
 * On exit, current contains the address of the new inner tuple.
663
 *
664
 * Returns true if we successfully inserted newLeafTuple during this function,
665
 * false if caller still has to do it (meaning another picksplit operation is
666
 * probably needed).  Failure could occur if the picksplit result is fairly
667
 * unbalanced, or if newLeafTuple is just plain too big to fit on a page.
668
 * Because we force the picksplit result to be at least two chains, each
669
 * cycle will get rid of at least one leaf tuple from the chain, so the loop
670
 * will eventually terminate if lack of balance is the issue.  If the tuple
671
 * is too big, we assume that repeated picksplit operations will eventually
672
 * make it small enough by repeated prefix-stripping.  A broken opclass could
673
 * make this an infinite loop, though, so spgdoinsert() checks that the
674
 * leaf datums get smaller each time.
675
 */
676
static bool
677
doPickSplit(Relation index, SpGistState *state,
678
      SPPageDesc *current, SPPageDesc *parent,
679
      SpGistLeafTuple newLeafTuple,
680
      int level, bool isNulls, bool isNew)
681
0
{
682
0
  bool    insertedNew = false;
683
0
  spgPickSplitIn in;
684
0
  spgPickSplitOut out;
685
0
  FmgrInfo   *procinfo;
686
0
  bool    includeNew;
687
0
  int     i,
688
0
        max,
689
0
        n;
690
0
  SpGistInnerTuple innerTuple;
691
0
  SpGistNodeTuple node,
692
0
         *nodes;
693
0
  Buffer    newInnerBuffer,
694
0
        newLeafBuffer;
695
0
  uint8    *leafPageSelect;
696
0
  int      *leafSizes;
697
0
  OffsetNumber *toDelete;
698
0
  OffsetNumber *toInsert;
699
0
  OffsetNumber redirectTuplePos = InvalidOffsetNumber;
700
0
  OffsetNumber startOffsets[2];
701
0
  SpGistLeafTuple *oldLeafs;
702
0
  SpGistLeafTuple *newLeafs;
703
0
  Datum   leafDatums[INDEX_MAX_KEYS];
704
0
  bool    leafIsnulls[INDEX_MAX_KEYS];
705
0
  int     spaceToDelete;
706
0
  int     currentFreeSpace;
707
0
  int     totalLeafSizes;
708
0
  bool    allTheSame;
709
0
  spgxlogPickSplit xlrec;
710
0
  char     *leafdata,
711
0
         *leafptr;
712
0
  SPPageDesc  saveCurrent;
713
0
  int     nToDelete,
714
0
        nToInsert,
715
0
        maxToInclude;
716
717
0
  in.level = level;
718
719
  /*
720
   * Allocate per-leaf-tuple work arrays with max possible size
721
   */
722
0
  max = PageGetMaxOffsetNumber(current->page);
723
0
  n = max + 1;
724
0
  in.datums = (Datum *) palloc(sizeof(Datum) * n);
725
0
  toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
726
0
  toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
727
0
  oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
728
0
  newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
729
0
  leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
730
731
0
  STORE_STATE(state, xlrec.stateSrc);
732
733
  /*
734
   * Form list of leaf tuples which will be distributed as split result;
735
   * also, count up the amount of space that will be freed from current.
736
   * (Note that in the non-root case, we won't actually delete the old
737
   * tuples, only replace them with redirects or placeholders.)
738
   */
739
0
  nToInsert = 0;
740
0
  nToDelete = 0;
741
0
  spaceToDelete = 0;
742
0
  if (SpGistBlockIsRoot(current->blkno))
743
0
  {
744
    /*
745
     * We are splitting the root (which up to now is also a leaf page).
746
     * Its tuples are not linked, so scan sequentially to get them all. We
747
     * ignore the original value of current->offnum.
748
     */
749
0
    for (i = FirstOffsetNumber; i <= max; i++)
750
0
    {
751
0
      SpGistLeafTuple it;
752
753
0
      it = (SpGistLeafTuple) PageGetItem(current->page,
754
0
                         PageGetItemId(current->page, i));
755
0
      if (it->tupstate == SPGIST_LIVE)
756
0
      {
757
0
        in.datums[nToInsert] =
758
0
          isNulls ? (Datum) 0 : SGLTDATUM(it, state);
759
0
        oldLeafs[nToInsert] = it;
760
0
        nToInsert++;
761
0
        toDelete[nToDelete] = i;
762
0
        nToDelete++;
763
        /* we will delete the tuple altogether, so count full space */
764
0
        spaceToDelete += it->size + sizeof(ItemIdData);
765
0
      }
766
0
      else        /* tuples on root should be live */
767
0
        elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
768
0
    }
769
0
  }
770
0
  else
771
0
  {
772
    /* Normal case, just collect the leaf tuples in the chain */
773
0
    i = current->offnum;
774
0
    while (i != InvalidOffsetNumber)
775
0
    {
776
0
      SpGistLeafTuple it;
777
778
0
      Assert(i >= FirstOffsetNumber && i <= max);
779
0
      it = (SpGistLeafTuple) PageGetItem(current->page,
780
0
                         PageGetItemId(current->page, i));
781
0
      if (it->tupstate == SPGIST_LIVE)
782
0
      {
783
0
        in.datums[nToInsert] =
784
0
          isNulls ? (Datum) 0 : SGLTDATUM(it, state);
785
0
        oldLeafs[nToInsert] = it;
786
0
        nToInsert++;
787
0
        toDelete[nToDelete] = i;
788
0
        nToDelete++;
789
        /* we will not delete the tuple, only replace with dead */
790
0
        Assert(it->size >= SGDTSIZE);
791
0
        spaceToDelete += it->size - SGDTSIZE;
792
0
      }
793
0
      else if (it->tupstate == SPGIST_DEAD)
794
0
      {
795
        /* We could see a DEAD tuple as first/only chain item */
796
0
        Assert(i == current->offnum);
797
0
        Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
798
0
        toDelete[nToDelete] = i;
799
0
        nToDelete++;
800
        /* replacing it with redirect will save no space */
801
0
      }
802
0
      else
803
0
        elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
804
805
0
      i = SGLT_GET_NEXTOFFSET(it);
806
0
    }
807
0
  }
808
0
  in.nTuples = nToInsert;
809
810
  /*
811
   * We may not actually insert new tuple because another picksplit may be
812
   * necessary due to too large value, but we will try to allocate enough
813
   * space to include it; and in any case it has to be included in the input
814
   * for the picksplit function.  So don't increment nToInsert yet.
815
   */
816
0
  in.datums[in.nTuples] =
817
0
    isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
818
0
  oldLeafs[in.nTuples] = newLeafTuple;
819
0
  in.nTuples++;
820
821
0
  memset(&out, 0, sizeof(out));
822
823
0
  if (!isNulls)
824
0
  {
825
    /*
826
     * Perform split using user-defined method.
827
     */
828
0
    procinfo = index_getprocinfo(index, 1, SPGIST_PICKSPLIT_PROC);
829
0
    FunctionCall2Coll(procinfo,
830
0
              index->rd_indcollation[0],
831
0
              PointerGetDatum(&in),
832
0
              PointerGetDatum(&out));
833
834
    /*
835
     * Form new leaf tuples and count up the total space needed.
836
     */
837
0
    totalLeafSizes = 0;
838
0
    for (i = 0; i < in.nTuples; i++)
839
0
    {
840
0
      if (state->leafTupDesc->natts > 1)
841
0
        spgDeformLeafTuple(oldLeafs[i],
842
0
                   state->leafTupDesc,
843
0
                   leafDatums,
844
0
                   leafIsnulls,
845
0
                   isNulls);
846
847
0
      leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
848
0
      leafIsnulls[spgKeyColumn] = false;
849
850
0
      newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
851
0
                       leafDatums,
852
0
                       leafIsnulls);
853
0
      totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
854
0
    }
855
0
  }
856
0
  else
857
0
  {
858
    /*
859
     * Perform dummy split that puts all tuples into one node.
860
     * checkAllTheSame will override this and force allTheSame mode.
861
     */
862
0
    out.hasPrefix = false;
863
0
    out.nNodes = 1;
864
0
    out.nodeLabels = NULL;
865
0
    out.mapTuplesToNodes = palloc0(sizeof(int) * in.nTuples);
866
867
    /*
868
     * Form new leaf tuples and count up the total space needed.
869
     */
870
0
    totalLeafSizes = 0;
871
0
    for (i = 0; i < in.nTuples; i++)
872
0
    {
873
0
      if (state->leafTupDesc->natts > 1)
874
0
        spgDeformLeafTuple(oldLeafs[i],
875
0
                   state->leafTupDesc,
876
0
                   leafDatums,
877
0
                   leafIsnulls,
878
0
                   isNulls);
879
880
      /*
881
       * Nulls tree can contain only null key values.
882
       */
883
0
      leafDatums[spgKeyColumn] = (Datum) 0;
884
0
      leafIsnulls[spgKeyColumn] = true;
885
886
0
      newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
887
0
                       leafDatums,
888
0
                       leafIsnulls);
889
0
      totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
890
0
    }
891
0
  }
892
893
  /*
894
   * Check to see if the picksplit function failed to separate the values,
895
   * ie, it put them all into the same child node.  If so, select allTheSame
896
   * mode and create a random split instead.  See comments for
897
   * checkAllTheSame as to why we need to know if the new leaf tuples could
898
   * fit on one page.
899
   */
900
0
  allTheSame = checkAllTheSame(&in, &out,
901
0
                 totalLeafSizes > SPGIST_PAGE_CAPACITY,
902
0
                 &includeNew);
903
904
  /*
905
   * If checkAllTheSame decided we must exclude the new tuple, don't
906
   * consider it any further.
907
   */
908
0
  if (includeNew)
909
0
    maxToInclude = in.nTuples;
910
0
  else
911
0
  {
912
0
    maxToInclude = in.nTuples - 1;
913
0
    totalLeafSizes -= newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
914
0
  }
915
916
  /*
917
   * Allocate per-node work arrays.  Since checkAllTheSame could replace
918
   * out.nNodes with a value larger than the number of tuples on the input
919
   * page, we can't allocate these arrays before here.
920
   */
921
0
  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) * out.nNodes);
922
0
  leafSizes = (int *) palloc0(sizeof(int) * out.nNodes);
923
924
  /*
925
   * Form nodes of inner tuple and inner tuple itself
926
   */
927
0
  for (i = 0; i < out.nNodes; i++)
928
0
  {
929
0
    Datum   label = (Datum) 0;
930
0
    bool    labelisnull = (out.nodeLabels == NULL);
931
932
0
    if (!labelisnull)
933
0
      label = out.nodeLabels[i];
934
0
    nodes[i] = spgFormNodeTuple(state, label, labelisnull);
935
0
  }
936
0
  innerTuple = spgFormInnerTuple(state,
937
0
                   out.hasPrefix, out.prefixDatum,
938
0
                   out.nNodes, nodes);
939
0
  innerTuple->allTheSame = allTheSame;
940
941
  /*
942
   * Update nodes[] array to point into the newly formed innerTuple, so that
943
   * we can adjust their downlinks below.
944
   */
945
0
  SGITITERATE(innerTuple, i, node)
946
0
  {
947
0
    nodes[i] = node;
948
0
  }
949
950
  /*
951
   * Re-scan new leaf tuples and count up the space needed under each node.
952
   */
953
0
  for (i = 0; i < maxToInclude; i++)
954
0
  {
955
0
    n = out.mapTuplesToNodes[i];
956
0
    if (n < 0 || n >= out.nNodes)
957
0
      elog(ERROR, "inconsistent result of SPGiST picksplit function");
958
0
    leafSizes[n] += newLeafs[i]->size + sizeof(ItemIdData);
959
0
  }
960
961
  /*
962
   * To perform the split, we must insert a new inner tuple, which can't go
963
   * on a leaf page; and unless we are splitting the root page, we must then
964
   * update the parent tuple's downlink to point to the inner tuple.  If
965
   * there is room, we'll put the new inner tuple on the same page as the
966
   * parent tuple, otherwise we need another non-leaf buffer. But if the
967
   * parent page is the root, we can't add the new inner tuple there,
968
   * because the root page must have only one inner tuple.
969
   */
970
0
  xlrec.initInner = false;
971
0
  if (parent->buffer != InvalidBuffer &&
972
0
    !SpGistBlockIsRoot(parent->blkno) &&
973
0
    (SpGistPageGetFreeSpace(parent->page, 1) >=
974
0
     innerTuple->size + sizeof(ItemIdData)))
975
0
  {
976
    /* New inner tuple will fit on parent page */
977
0
    newInnerBuffer = parent->buffer;
978
0
  }
979
0
  else if (parent->buffer != InvalidBuffer)
980
0
  {
981
    /* Send tuple to page with next triple parity (see README) */
982
0
    newInnerBuffer = SpGistGetBuffer(index,
983
0
                     GBUF_INNER_PARITY(parent->blkno + 1) |
984
0
                     (isNulls ? GBUF_NULLS : 0),
985
0
                     innerTuple->size + sizeof(ItemIdData),
986
0
                     &xlrec.initInner);
987
0
  }
988
0
  else
989
0
  {
990
    /* Root page split ... inner tuple will go to root page */
991
0
    newInnerBuffer = InvalidBuffer;
992
0
  }
993
994
  /*
995
   * The new leaf tuples converted from the existing ones should require the
996
   * same or less space, and therefore should all fit onto one page
997
   * (although that's not necessarily the current page, since we can't
998
   * delete the old tuples but only replace them with placeholders).
999
   * However, the incoming new tuple might not also fit, in which case we
1000
   * might need another picksplit cycle to reduce it some more.
1001
   *
1002
   * If there's not room to put everything back onto the current page, then
1003
   * we decide on a per-node basis which tuples go to the new page. (We do
1004
   * it like that because leaf tuple chains can't cross pages, so we must
1005
   * place all leaf tuples belonging to the same parent node on the same
1006
   * page.)
1007
   *
1008
   * If we are splitting the root page (turning it from a leaf page into an
1009
   * inner page), then no leaf tuples can go back to the current page; they
1010
   * must all go somewhere else.
1011
   */
1012
0
  if (!SpGistBlockIsRoot(current->blkno))
1013
0
    currentFreeSpace = PageGetExactFreeSpace(current->page) + spaceToDelete;
1014
0
  else
1015
0
    currentFreeSpace = 0; /* prevent assigning any tuples to current */
1016
1017
0
  xlrec.initDest = false;
1018
1019
0
  if (totalLeafSizes <= currentFreeSpace)
1020
0
  {
1021
    /* All the leaf tuples will fit on current page */
1022
0
    newLeafBuffer = InvalidBuffer;
1023
    /* mark new leaf tuple as included in insertions, if allowed */
1024
0
    if (includeNew)
1025
0
    {
1026
0
      nToInsert++;
1027
0
      insertedNew = true;
1028
0
    }
1029
0
    for (i = 0; i < nToInsert; i++)
1030
0
      leafPageSelect[i] = 0; /* signifies current page */
1031
0
  }
1032
0
  else if (in.nTuples == 1 && totalLeafSizes > SPGIST_PAGE_CAPACITY)
1033
0
  {
1034
    /*
1035
     * We're trying to split up a long value by repeated suffixing, but
1036
     * it's not going to fit yet.  Don't bother allocating a second leaf
1037
     * buffer that we won't be able to use.
1038
     */
1039
0
    newLeafBuffer = InvalidBuffer;
1040
0
    Assert(includeNew);
1041
0
    Assert(nToInsert == 0);
1042
0
  }
1043
0
  else
1044
0
  {
1045
    /* We will need another leaf page */
1046
0
    uint8    *nodePageSelect;
1047
0
    int     curspace;
1048
0
    int     newspace;
1049
1050
0
    newLeafBuffer = SpGistGetBuffer(index,
1051
0
                    GBUF_LEAF | (isNulls ? GBUF_NULLS : 0),
1052
0
                    Min(totalLeafSizes,
1053
0
                      SPGIST_PAGE_CAPACITY),
1054
0
                    &xlrec.initDest);
1055
1056
    /*
1057
     * Attempt to assign node groups to the two pages.  We might fail to
1058
     * do so, even if totalLeafSizes is less than the available space,
1059
     * because we can't split a group across pages.
1060
     */
1061
0
    nodePageSelect = (uint8 *) palloc(sizeof(uint8) * out.nNodes);
1062
1063
0
    curspace = currentFreeSpace;
1064
0
    newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1065
0
    for (i = 0; i < out.nNodes; i++)
1066
0
    {
1067
0
      if (leafSizes[i] <= curspace)
1068
0
      {
1069
0
        nodePageSelect[i] = 0;  /* signifies current page */
1070
0
        curspace -= leafSizes[i];
1071
0
      }
1072
0
      else
1073
0
      {
1074
0
        nodePageSelect[i] = 1;  /* signifies new leaf page */
1075
0
        newspace -= leafSizes[i];
1076
0
      }
1077
0
    }
1078
0
    if (curspace >= 0 && newspace >= 0)
1079
0
    {
1080
      /* Successful assignment, so we can include the new leaf tuple */
1081
0
      if (includeNew)
1082
0
      {
1083
0
        nToInsert++;
1084
0
        insertedNew = true;
1085
0
      }
1086
0
    }
1087
0
    else if (includeNew)
1088
0
    {
1089
      /* We must exclude the new leaf tuple from the split */
1090
0
      int     nodeOfNewTuple = out.mapTuplesToNodes[in.nTuples - 1];
1091
1092
0
      leafSizes[nodeOfNewTuple] -=
1093
0
        newLeafs[in.nTuples - 1]->size + sizeof(ItemIdData);
1094
1095
      /* Repeat the node assignment process --- should succeed now */
1096
0
      curspace = currentFreeSpace;
1097
0
      newspace = PageGetExactFreeSpace(BufferGetPage(newLeafBuffer));
1098
0
      for (i = 0; i < out.nNodes; i++)
1099
0
      {
1100
0
        if (leafSizes[i] <= curspace)
1101
0
        {
1102
0
          nodePageSelect[i] = 0;  /* signifies current page */
1103
0
          curspace -= leafSizes[i];
1104
0
        }
1105
0
        else
1106
0
        {
1107
0
          nodePageSelect[i] = 1;  /* signifies new leaf page */
1108
0
          newspace -= leafSizes[i];
1109
0
        }
1110
0
      }
1111
0
      if (curspace < 0 || newspace < 0)
1112
0
        elog(ERROR, "failed to divide leaf tuple groups across pages");
1113
0
    }
1114
0
    else
1115
0
    {
1116
      /* oops, we already excluded new tuple ... should not get here */
1117
0
      elog(ERROR, "failed to divide leaf tuple groups across pages");
1118
0
    }
1119
    /* Expand the per-node assignments to be shown per leaf tuple */
1120
0
    for (i = 0; i < nToInsert; i++)
1121
0
    {
1122
0
      n = out.mapTuplesToNodes[i];
1123
0
      leafPageSelect[i] = nodePageSelect[n];
1124
0
    }
1125
0
  }
1126
1127
  /* Start preparing WAL record */
1128
0
  xlrec.nDelete = 0;
1129
0
  xlrec.initSrc = isNew;
1130
0
  xlrec.storesNulls = isNulls;
1131
0
  xlrec.isRootSplit = SpGistBlockIsRoot(current->blkno);
1132
1133
0
  leafdata = leafptr = (char *) palloc(totalLeafSizes);
1134
1135
  /* Here we begin making the changes to the target pages */
1136
0
  START_CRIT_SECTION();
1137
1138
  /*
1139
   * Delete old leaf tuples from current buffer, except when we're splitting
1140
   * the root; in that case there's no need because we'll re-init the page
1141
   * below.  We do this first to make room for reinserting new leaf tuples.
1142
   */
1143
0
  if (!SpGistBlockIsRoot(current->blkno))
1144
0
  {
1145
    /*
1146
     * Init buffer instead of deleting individual tuples, but only if
1147
     * there aren't any other live tuples and only during build; otherwise
1148
     * we need to set a redirection tuple for concurrent scans.
1149
     */
1150
0
    if (state->isBuild &&
1151
0
      nToDelete + SpGistPageGetOpaque(current->page)->nPlaceholder ==
1152
0
      PageGetMaxOffsetNumber(current->page))
1153
0
    {
1154
0
      SpGistInitBuffer(current->buffer,
1155
0
               SPGIST_LEAF | (isNulls ? SPGIST_NULLS : 0));
1156
0
      xlrec.initSrc = true;
1157
0
    }
1158
0
    else if (isNew)
1159
0
    {
1160
      /* don't expose the freshly init'd buffer as a backup block */
1161
0
      Assert(nToDelete == 0);
1162
0
    }
1163
0
    else
1164
0
    {
1165
0
      xlrec.nDelete = nToDelete;
1166
1167
0
      if (!state->isBuild)
1168
0
      {
1169
        /*
1170
         * Need to create redirect tuple (it will point to new inner
1171
         * tuple) but right now the new tuple's location is not known
1172
         * yet.  So, set the redirection pointer to "impossible" value
1173
         * and remember its position to update tuple later.
1174
         */
1175
0
        if (nToDelete > 0)
1176
0
          redirectTuplePos = toDelete[0];
1177
0
        spgPageIndexMultiDelete(state, current->page,
1178
0
                    toDelete, nToDelete,
1179
0
                    SPGIST_REDIRECT,
1180
0
                    SPGIST_PLACEHOLDER,
1181
0
                    SPGIST_METAPAGE_BLKNO,
1182
0
                    FirstOffsetNumber);
1183
0
      }
1184
0
      else
1185
0
      {
1186
        /*
1187
         * During index build there is not concurrent searches, so we
1188
         * don't need to create redirection tuple.
1189
         */
1190
0
        spgPageIndexMultiDelete(state, current->page,
1191
0
                    toDelete, nToDelete,
1192
0
                    SPGIST_PLACEHOLDER,
1193
0
                    SPGIST_PLACEHOLDER,
1194
0
                    InvalidBlockNumber,
1195
0
                    InvalidOffsetNumber);
1196
0
      }
1197
0
    }
1198
0
  }
1199
1200
  /*
1201
   * Put leaf tuples on proper pages, and update downlinks in innerTuple's
1202
   * nodes.
1203
   */
1204
0
  startOffsets[0] = startOffsets[1] = InvalidOffsetNumber;
1205
0
  for (i = 0; i < nToInsert; i++)
1206
0
  {
1207
0
    SpGistLeafTuple it = newLeafs[i];
1208
0
    Buffer    leafBuffer;
1209
0
    BlockNumber leafBlock;
1210
0
    OffsetNumber newoffset;
1211
1212
    /* Which page is it going to? */
1213
0
    leafBuffer = leafPageSelect[i] ? newLeafBuffer : current->buffer;
1214
0
    leafBlock = BufferGetBlockNumber(leafBuffer);
1215
1216
    /* Link tuple into correct chain for its node */
1217
0
    n = out.mapTuplesToNodes[i];
1218
1219
0
    if (ItemPointerIsValid(&nodes[n]->t_tid))
1220
0
    {
1221
0
      Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
1222
0
      SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
1223
0
    }
1224
0
    else
1225
0
      SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
1226
1227
    /* Insert it on page */
1228
0
    newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
1229
0
                     (Item) it, it->size,
1230
0
                     &startOffsets[leafPageSelect[i]],
1231
0
                     false);
1232
0
    toInsert[i] = newoffset;
1233
1234
    /* ... and complete the chain linking */
1235
0
    ItemPointerSet(&nodes[n]->t_tid, leafBlock, newoffset);
1236
1237
    /* Also copy leaf tuple into WAL data */
1238
0
    memcpy(leafptr, newLeafs[i], newLeafs[i]->size);
1239
0
    leafptr += newLeafs[i]->size;
1240
0
  }
1241
1242
  /*
1243
   * We're done modifying the other leaf buffer (if any), so mark it dirty.
1244
   * current->buffer will be marked below, after we're entirely done
1245
   * modifying it.
1246
   */
1247
0
  if (newLeafBuffer != InvalidBuffer)
1248
0
  {
1249
0
    MarkBufferDirty(newLeafBuffer);
1250
0
  }
1251
1252
  /* Remember current buffer, since we're about to change "current" */
1253
0
  saveCurrent = *current;
1254
1255
  /*
1256
   * Store the new innerTuple
1257
   */
1258
0
  if (newInnerBuffer == parent->buffer && newInnerBuffer != InvalidBuffer)
1259
0
  {
1260
    /*
1261
     * new inner tuple goes to parent page
1262
     */
1263
0
    Assert(current->buffer != parent->buffer);
1264
1265
    /* Repoint "current" at the new inner tuple */
1266
0
    current->blkno = parent->blkno;
1267
0
    current->buffer = parent->buffer;
1268
0
    current->page = parent->page;
1269
0
    xlrec.offnumInner = current->offnum =
1270
0
      SpGistPageAddNewItem(state, current->page,
1271
0
                 (Item) innerTuple, innerTuple->size,
1272
0
                 NULL, false);
1273
1274
    /*
1275
     * Update parent node link and mark parent page dirty
1276
     */
1277
0
    xlrec.innerIsParent = true;
1278
0
    xlrec.offnumParent = parent->offnum;
1279
0
    xlrec.nodeI = parent->node;
1280
0
    saveNodeLink(index, parent, current->blkno, current->offnum);
1281
1282
    /*
1283
     * Update redirection link (in old current buffer)
1284
     */
1285
0
    if (redirectTuplePos != InvalidOffsetNumber)
1286
0
      setRedirectionTuple(&saveCurrent, redirectTuplePos,
1287
0
                current->blkno, current->offnum);
1288
1289
    /* Done modifying old current buffer, mark it dirty */
1290
0
    MarkBufferDirty(saveCurrent.buffer);
1291
0
  }
1292
0
  else if (parent->buffer != InvalidBuffer)
1293
0
  {
1294
    /*
1295
     * new inner tuple will be stored on a new page
1296
     */
1297
0
    Assert(newInnerBuffer != InvalidBuffer);
1298
1299
    /* Repoint "current" at the new inner tuple */
1300
0
    current->buffer = newInnerBuffer;
1301
0
    current->blkno = BufferGetBlockNumber(current->buffer);
1302
0
    current->page = BufferGetPage(current->buffer);
1303
0
    xlrec.offnumInner = current->offnum =
1304
0
      SpGistPageAddNewItem(state, current->page,
1305
0
                 (Item) innerTuple, innerTuple->size,
1306
0
                 NULL, false);
1307
1308
    /* Done modifying new current buffer, mark it dirty */
1309
0
    MarkBufferDirty(current->buffer);
1310
1311
    /*
1312
     * Update parent node link and mark parent page dirty
1313
     */
1314
0
    xlrec.innerIsParent = (parent->buffer == current->buffer);
1315
0
    xlrec.offnumParent = parent->offnum;
1316
0
    xlrec.nodeI = parent->node;
1317
0
    saveNodeLink(index, parent, current->blkno, current->offnum);
1318
1319
    /*
1320
     * Update redirection link (in old current buffer)
1321
     */
1322
0
    if (redirectTuplePos != InvalidOffsetNumber)
1323
0
      setRedirectionTuple(&saveCurrent, redirectTuplePos,
1324
0
                current->blkno, current->offnum);
1325
1326
    /* Done modifying old current buffer, mark it dirty */
1327
0
    MarkBufferDirty(saveCurrent.buffer);
1328
0
  }
1329
0
  else
1330
0
  {
1331
    /*
1332
     * Splitting root page, which was a leaf but now becomes inner page
1333
     * (and so "current" continues to point at it)
1334
     */
1335
0
    Assert(SpGistBlockIsRoot(current->blkno));
1336
0
    Assert(redirectTuplePos == InvalidOffsetNumber);
1337
1338
0
    SpGistInitBuffer(current->buffer, (isNulls ? SPGIST_NULLS : 0));
1339
0
    xlrec.initInner = true;
1340
0
    xlrec.innerIsParent = false;
1341
1342
0
    xlrec.offnumInner = current->offnum =
1343
0
      PageAddItem(current->page, (Item) innerTuple, innerTuple->size,
1344
0
            InvalidOffsetNumber, false, false);
1345
0
    if (current->offnum != FirstOffsetNumber)
1346
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
1347
0
         innerTuple->size);
1348
1349
    /* No parent link to update, nor redirection to do */
1350
0
    xlrec.offnumParent = InvalidOffsetNumber;
1351
0
    xlrec.nodeI = 0;
1352
1353
    /* Done modifying new current buffer, mark it dirty */
1354
0
    MarkBufferDirty(current->buffer);
1355
1356
    /* saveCurrent doesn't represent a different buffer */
1357
0
    saveCurrent.buffer = InvalidBuffer;
1358
0
  }
1359
1360
0
  if (RelationNeedsWAL(index) && !state->isBuild)
1361
0
  {
1362
0
    XLogRecPtr  recptr;
1363
0
    int     flags;
1364
1365
0
    XLogBeginInsert();
1366
1367
0
    xlrec.nInsert = nToInsert;
1368
0
    XLogRegisterData(&xlrec, SizeOfSpgxlogPickSplit);
1369
1370
0
    XLogRegisterData(toDelete,
1371
0
             sizeof(OffsetNumber) * xlrec.nDelete);
1372
0
    XLogRegisterData(toInsert,
1373
0
             sizeof(OffsetNumber) * xlrec.nInsert);
1374
0
    XLogRegisterData(leafPageSelect,
1375
0
             sizeof(uint8) * xlrec.nInsert);
1376
0
    XLogRegisterData(innerTuple, innerTuple->size);
1377
0
    XLogRegisterData(leafdata, leafptr - leafdata);
1378
1379
    /* Old leaf page */
1380
0
    if (BufferIsValid(saveCurrent.buffer))
1381
0
    {
1382
0
      flags = REGBUF_STANDARD;
1383
0
      if (xlrec.initSrc)
1384
0
        flags |= REGBUF_WILL_INIT;
1385
0
      XLogRegisterBuffer(0, saveCurrent.buffer, flags);
1386
0
    }
1387
1388
    /* New leaf page */
1389
0
    if (BufferIsValid(newLeafBuffer))
1390
0
    {
1391
0
      flags = REGBUF_STANDARD;
1392
0
      if (xlrec.initDest)
1393
0
        flags |= REGBUF_WILL_INIT;
1394
0
      XLogRegisterBuffer(1, newLeafBuffer, flags);
1395
0
    }
1396
1397
    /* Inner page */
1398
0
    flags = REGBUF_STANDARD;
1399
0
    if (xlrec.initInner)
1400
0
      flags |= REGBUF_WILL_INIT;
1401
0
    XLogRegisterBuffer(2, current->buffer, flags);
1402
1403
    /* Parent page, if different from inner page */
1404
0
    if (parent->buffer != InvalidBuffer)
1405
0
    {
1406
0
      if (parent->buffer != current->buffer)
1407
0
        XLogRegisterBuffer(3, parent->buffer, REGBUF_STANDARD);
1408
0
      else
1409
0
        Assert(xlrec.innerIsParent);
1410
0
    }
1411
1412
    /* Issue the WAL record */
1413
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_PICKSPLIT);
1414
1415
    /* Update page LSNs on all affected pages */
1416
0
    if (newLeafBuffer != InvalidBuffer)
1417
0
    {
1418
0
      Page    page = BufferGetPage(newLeafBuffer);
1419
1420
0
      PageSetLSN(page, recptr);
1421
0
    }
1422
1423
0
    if (saveCurrent.buffer != InvalidBuffer)
1424
0
    {
1425
0
      Page    page = BufferGetPage(saveCurrent.buffer);
1426
1427
0
      PageSetLSN(page, recptr);
1428
0
    }
1429
1430
0
    PageSetLSN(current->page, recptr);
1431
1432
0
    if (parent->buffer != InvalidBuffer)
1433
0
    {
1434
0
      PageSetLSN(parent->page, recptr);
1435
0
    }
1436
0
  }
1437
1438
0
  END_CRIT_SECTION();
1439
1440
  /* Update local free-space cache and unlock buffers */
1441
0
  if (newLeafBuffer != InvalidBuffer)
1442
0
  {
1443
0
    SpGistSetLastUsedPage(index, newLeafBuffer);
1444
0
    UnlockReleaseBuffer(newLeafBuffer);
1445
0
  }
1446
0
  if (saveCurrent.buffer != InvalidBuffer)
1447
0
  {
1448
0
    SpGistSetLastUsedPage(index, saveCurrent.buffer);
1449
0
    UnlockReleaseBuffer(saveCurrent.buffer);
1450
0
  }
1451
1452
0
  return insertedNew;
1453
0
}
1454
1455
/*
1456
 * spgMatchNode action: descend to N'th child node of current inner tuple
1457
 */
1458
static void
1459
spgMatchNodeAction(Relation index, SpGistState *state,
1460
           SpGistInnerTuple innerTuple,
1461
           SPPageDesc *current, SPPageDesc *parent, int nodeN)
1462
0
{
1463
0
  int     i;
1464
0
  SpGistNodeTuple node;
1465
1466
  /* Release previous parent buffer if any */
1467
0
  if (parent->buffer != InvalidBuffer &&
1468
0
    parent->buffer != current->buffer)
1469
0
  {
1470
0
    SpGistSetLastUsedPage(index, parent->buffer);
1471
0
    UnlockReleaseBuffer(parent->buffer);
1472
0
  }
1473
1474
  /* Repoint parent to specified node of current inner tuple */
1475
0
  parent->blkno = current->blkno;
1476
0
  parent->buffer = current->buffer;
1477
0
  parent->page = current->page;
1478
0
  parent->offnum = current->offnum;
1479
0
  parent->node = nodeN;
1480
1481
  /* Locate that node */
1482
0
  SGITITERATE(innerTuple, i, node)
1483
0
  {
1484
0
    if (i == nodeN)
1485
0
      break;
1486
0
  }
1487
1488
0
  if (i != nodeN)
1489
0
    elog(ERROR, "failed to find requested node %d in SPGiST inner tuple",
1490
0
       nodeN);
1491
1492
  /* Point current to the downlink location, if any */
1493
0
  if (ItemPointerIsValid(&node->t_tid))
1494
0
  {
1495
0
    current->blkno = ItemPointerGetBlockNumber(&node->t_tid);
1496
0
    current->offnum = ItemPointerGetOffsetNumber(&node->t_tid);
1497
0
  }
1498
0
  else
1499
0
  {
1500
    /* Downlink is empty, so we'll need to find a new page */
1501
0
    current->blkno = InvalidBlockNumber;
1502
0
    current->offnum = InvalidOffsetNumber;
1503
0
  }
1504
1505
0
  current->buffer = InvalidBuffer;
1506
0
  current->page = NULL;
1507
0
}
1508
1509
/*
1510
 * spgAddNode action: add a node to the inner tuple at current
1511
 */
1512
static void
1513
spgAddNodeAction(Relation index, SpGistState *state,
1514
         SpGistInnerTuple innerTuple,
1515
         SPPageDesc *current, SPPageDesc *parent,
1516
         int nodeN, Datum nodeLabel)
1517
0
{
1518
0
  SpGistInnerTuple newInnerTuple;
1519
0
  spgxlogAddNode xlrec;
1520
1521
  /* Should not be applied to nulls */
1522
0
  Assert(!SpGistPageStoresNulls(current->page));
1523
1524
  /* Construct new inner tuple with additional node */
1525
0
  newInnerTuple = addNode(state, innerTuple, nodeLabel, nodeN);
1526
1527
  /* Prepare WAL record */
1528
0
  STORE_STATE(state, xlrec.stateSrc);
1529
0
  xlrec.offnum = current->offnum;
1530
1531
  /* we don't fill these unless we need to change the parent downlink */
1532
0
  xlrec.parentBlk = -1;
1533
0
  xlrec.offnumParent = InvalidOffsetNumber;
1534
0
  xlrec.nodeI = 0;
1535
1536
  /* we don't fill these unless tuple has to be moved */
1537
0
  xlrec.offnumNew = InvalidOffsetNumber;
1538
0
  xlrec.newPage = false;
1539
1540
0
  if (PageGetExactFreeSpace(current->page) >=
1541
0
    newInnerTuple->size - innerTuple->size)
1542
0
  {
1543
    /*
1544
     * We can replace the inner tuple by new version in-place
1545
     */
1546
0
    START_CRIT_SECTION();
1547
1548
0
    PageIndexTupleDelete(current->page, current->offnum);
1549
0
    if (PageAddItem(current->page,
1550
0
            (Item) newInnerTuple, newInnerTuple->size,
1551
0
            current->offnum, false, false) != current->offnum)
1552
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
1553
0
         newInnerTuple->size);
1554
1555
0
    MarkBufferDirty(current->buffer);
1556
1557
0
    if (RelationNeedsWAL(index) && !state->isBuild)
1558
0
    {
1559
0
      XLogRecPtr  recptr;
1560
1561
0
      XLogBeginInsert();
1562
0
      XLogRegisterData(&xlrec, sizeof(xlrec));
1563
0
      XLogRegisterData(newInnerTuple, newInnerTuple->size);
1564
1565
0
      XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1566
1567
0
      recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1568
1569
0
      PageSetLSN(current->page, recptr);
1570
0
    }
1571
1572
0
    END_CRIT_SECTION();
1573
0
  }
1574
0
  else
1575
0
  {
1576
    /*
1577
     * move inner tuple to another page, and update parent
1578
     */
1579
0
    SpGistDeadTuple dt;
1580
0
    SPPageDesc  saveCurrent;
1581
1582
    /*
1583
     * It should not be possible to get here for the root page, since we
1584
     * allow only one inner tuple on the root page, and spgFormInnerTuple
1585
     * always checks that inner tuples don't exceed the size of a page.
1586
     */
1587
0
    if (SpGistBlockIsRoot(current->blkno))
1588
0
      elog(ERROR, "cannot enlarge root tuple any more");
1589
0
    Assert(parent->buffer != InvalidBuffer);
1590
1591
0
    saveCurrent = *current;
1592
1593
0
    xlrec.offnumParent = parent->offnum;
1594
0
    xlrec.nodeI = parent->node;
1595
1596
    /*
1597
     * obtain new buffer with the same parity as current, since it will be
1598
     * a child of same parent tuple
1599
     */
1600
0
    current->buffer = SpGistGetBuffer(index,
1601
0
                      GBUF_INNER_PARITY(current->blkno),
1602
0
                      newInnerTuple->size + sizeof(ItemIdData),
1603
0
                      &xlrec.newPage);
1604
0
    current->blkno = BufferGetBlockNumber(current->buffer);
1605
0
    current->page = BufferGetPage(current->buffer);
1606
1607
    /*
1608
     * Let's just make real sure new current isn't same as old.  Right now
1609
     * that's impossible, but if SpGistGetBuffer ever got smart enough to
1610
     * delete placeholder tuples before checking space, maybe it wouldn't
1611
     * be impossible.  The case would appear to work except that WAL
1612
     * replay would be subtly wrong, so I think a mere assert isn't enough
1613
     * here.
1614
     */
1615
0
    if (current->blkno == saveCurrent.blkno)
1616
0
      elog(ERROR, "SPGiST new buffer shouldn't be same as old buffer");
1617
1618
    /*
1619
     * New current and parent buffer will both be modified; but note that
1620
     * parent buffer could be same as either new or old current.
1621
     */
1622
0
    if (parent->buffer == saveCurrent.buffer)
1623
0
      xlrec.parentBlk = 0;
1624
0
    else if (parent->buffer == current->buffer)
1625
0
      xlrec.parentBlk = 1;
1626
0
    else
1627
0
      xlrec.parentBlk = 2;
1628
1629
0
    START_CRIT_SECTION();
1630
1631
    /* insert new ... */
1632
0
    xlrec.offnumNew = current->offnum =
1633
0
      SpGistPageAddNewItem(state, current->page,
1634
0
                 (Item) newInnerTuple, newInnerTuple->size,
1635
0
                 NULL, false);
1636
1637
0
    MarkBufferDirty(current->buffer);
1638
1639
    /* update parent's downlink and mark parent page dirty */
1640
0
    saveNodeLink(index, parent, current->blkno, current->offnum);
1641
1642
    /*
1643
     * Replace old tuple with a placeholder or redirection tuple.  Unless
1644
     * doing an index build, we have to insert a redirection tuple for
1645
     * possible concurrent scans.  We can't just delete it in any case,
1646
     * because that could change the offsets of other tuples on the page,
1647
     * breaking downlinks from their parents.
1648
     */
1649
0
    if (state->isBuild)
1650
0
      dt = spgFormDeadTuple(state, SPGIST_PLACEHOLDER,
1651
0
                  InvalidBlockNumber, InvalidOffsetNumber);
1652
0
    else
1653
0
      dt = spgFormDeadTuple(state, SPGIST_REDIRECT,
1654
0
                  current->blkno, current->offnum);
1655
1656
0
    PageIndexTupleDelete(saveCurrent.page, saveCurrent.offnum);
1657
0
    if (PageAddItem(saveCurrent.page, (Item) dt, dt->size,
1658
0
            saveCurrent.offnum,
1659
0
            false, false) != saveCurrent.offnum)
1660
0
      elog(ERROR, "failed to add item of size %u to SPGiST index page",
1661
0
         dt->size);
1662
1663
0
    if (state->isBuild)
1664
0
      SpGistPageGetOpaque(saveCurrent.page)->nPlaceholder++;
1665
0
    else
1666
0
      SpGistPageGetOpaque(saveCurrent.page)->nRedirection++;
1667
1668
0
    MarkBufferDirty(saveCurrent.buffer);
1669
1670
0
    if (RelationNeedsWAL(index) && !state->isBuild)
1671
0
    {
1672
0
      XLogRecPtr  recptr;
1673
0
      int     flags;
1674
1675
0
      XLogBeginInsert();
1676
1677
      /* orig page */
1678
0
      XLogRegisterBuffer(0, saveCurrent.buffer, REGBUF_STANDARD);
1679
      /* new page */
1680
0
      flags = REGBUF_STANDARD;
1681
0
      if (xlrec.newPage)
1682
0
        flags |= REGBUF_WILL_INIT;
1683
0
      XLogRegisterBuffer(1, current->buffer, flags);
1684
      /* parent page (if different from orig and new) */
1685
0
      if (xlrec.parentBlk == 2)
1686
0
        XLogRegisterBuffer(2, parent->buffer, REGBUF_STANDARD);
1687
1688
0
      XLogRegisterData(&xlrec, sizeof(xlrec));
1689
0
      XLogRegisterData(newInnerTuple, newInnerTuple->size);
1690
1691
0
      recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_ADD_NODE);
1692
1693
      /* we don't bother to check if any of these are redundant */
1694
0
      PageSetLSN(current->page, recptr);
1695
0
      PageSetLSN(parent->page, recptr);
1696
0
      PageSetLSN(saveCurrent.page, recptr);
1697
0
    }
1698
1699
0
    END_CRIT_SECTION();
1700
1701
    /* Release saveCurrent if it's not same as current or parent */
1702
0
    if (saveCurrent.buffer != current->buffer &&
1703
0
      saveCurrent.buffer != parent->buffer)
1704
0
    {
1705
0
      SpGistSetLastUsedPage(index, saveCurrent.buffer);
1706
0
      UnlockReleaseBuffer(saveCurrent.buffer);
1707
0
    }
1708
0
  }
1709
0
}
1710
1711
/*
1712
 * spgSplitNode action: split inner tuple at current into prefix and postfix
1713
 */
1714
static void
1715
spgSplitNodeAction(Relation index, SpGistState *state,
1716
           SpGistInnerTuple innerTuple,
1717
           SPPageDesc *current, spgChooseOut *out)
1718
0
{
1719
0
  SpGistInnerTuple prefixTuple,
1720
0
        postfixTuple;
1721
0
  SpGistNodeTuple node,
1722
0
         *nodes;
1723
0
  BlockNumber postfixBlkno;
1724
0
  OffsetNumber postfixOffset;
1725
0
  int     i;
1726
0
  spgxlogSplitTuple xlrec;
1727
0
  Buffer    newBuffer = InvalidBuffer;
1728
1729
  /* Should not be applied to nulls */
1730
0
  Assert(!SpGistPageStoresNulls(current->page));
1731
1732
  /* Check opclass gave us sane values */
1733
0
  if (out->result.splitTuple.prefixNNodes <= 0 ||
1734
0
    out->result.splitTuple.prefixNNodes > SGITMAXNNODES)
1735
0
    elog(ERROR, "invalid number of prefix nodes: %d",
1736
0
       out->result.splitTuple.prefixNNodes);
1737
0
  if (out->result.splitTuple.childNodeN < 0 ||
1738
0
    out->result.splitTuple.childNodeN >=
1739
0
    out->result.splitTuple.prefixNNodes)
1740
0
    elog(ERROR, "invalid child node number: %d",
1741
0
       out->result.splitTuple.childNodeN);
1742
1743
  /*
1744
   * Construct new prefix tuple with requested number of nodes.  We'll fill
1745
   * in the childNodeN'th node's downlink below.
1746
   */
1747
0
  nodes = (SpGistNodeTuple *) palloc(sizeof(SpGistNodeTuple) *
1748
0
                     out->result.splitTuple.prefixNNodes);
1749
1750
0
  for (i = 0; i < out->result.splitTuple.prefixNNodes; i++)
1751
0
  {
1752
0
    Datum   label = (Datum) 0;
1753
0
    bool    labelisnull;
1754
1755
0
    labelisnull = (out->result.splitTuple.prefixNodeLabels == NULL);
1756
0
    if (!labelisnull)
1757
0
      label = out->result.splitTuple.prefixNodeLabels[i];
1758
0
    nodes[i] = spgFormNodeTuple(state, label, labelisnull);
1759
0
  }
1760
1761
0
  prefixTuple = spgFormInnerTuple(state,
1762
0
                  out->result.splitTuple.prefixHasPrefix,
1763
0
                  out->result.splitTuple.prefixPrefixDatum,
1764
0
                  out->result.splitTuple.prefixNNodes,
1765
0
                  nodes);
1766
1767
  /* it must fit in the space that innerTuple now occupies */
1768
0
  if (prefixTuple->size > innerTuple->size)
1769
0
    elog(ERROR, "SPGiST inner-tuple split must not produce longer prefix");
1770
1771
  /*
1772
   * Construct new postfix tuple, containing all nodes of innerTuple with
1773
   * same node datums, but with the prefix specified by the picksplit
1774
   * function.
1775
   */
1776
0
  nodes = palloc(sizeof(SpGistNodeTuple) * innerTuple->nNodes);
1777
0
  SGITITERATE(innerTuple, i, node)
1778
0
  {
1779
0
    nodes[i] = node;
1780
0
  }
1781
1782
0
  postfixTuple = spgFormInnerTuple(state,
1783
0
                   out->result.splitTuple.postfixHasPrefix,
1784
0
                   out->result.splitTuple.postfixPrefixDatum,
1785
0
                   innerTuple->nNodes, nodes);
1786
1787
  /* Postfix tuple is allTheSame if original tuple was */
1788
0
  postfixTuple->allTheSame = innerTuple->allTheSame;
1789
1790
  /* prep data for WAL record */
1791
0
  xlrec.newPage = false;
1792
1793
  /*
1794
   * If we can't fit both tuples on the current page, get a new page for the
1795
   * postfix tuple.  In particular, can't split to the root page.
1796
   *
1797
   * For the space calculation, note that prefixTuple replaces innerTuple
1798
   * but postfixTuple will be a new entry.
1799
   */
1800
0
  if (SpGistBlockIsRoot(current->blkno) ||
1801
0
    SpGistPageGetFreeSpace(current->page, 1) + innerTuple->size <
1802
0
    prefixTuple->size + postfixTuple->size + sizeof(ItemIdData))
1803
0
  {
1804
    /*
1805
     * Choose page with next triple parity, because postfix tuple is a
1806
     * child of prefix one
1807
     */
1808
0
    newBuffer = SpGistGetBuffer(index,
1809
0
                  GBUF_INNER_PARITY(current->blkno + 1),
1810
0
                  postfixTuple->size + sizeof(ItemIdData),
1811
0
                  &xlrec.newPage);
1812
0
  }
1813
1814
0
  START_CRIT_SECTION();
1815
1816
  /*
1817
   * Replace old tuple by prefix tuple
1818
   */
1819
0
  PageIndexTupleDelete(current->page, current->offnum);
1820
0
  xlrec.offnumPrefix = PageAddItem(current->page,
1821
0
                   (Item) prefixTuple, prefixTuple->size,
1822
0
                   current->offnum, false, false);
1823
0
  if (xlrec.offnumPrefix != current->offnum)
1824
0
    elog(ERROR, "failed to add item of size %u to SPGiST index page",
1825
0
       prefixTuple->size);
1826
1827
  /*
1828
   * put postfix tuple into appropriate page
1829
   */
1830
0
  if (newBuffer == InvalidBuffer)
1831
0
  {
1832
0
    postfixBlkno = current->blkno;
1833
0
    xlrec.offnumPostfix = postfixOffset =
1834
0
      SpGistPageAddNewItem(state, current->page,
1835
0
                 (Item) postfixTuple, postfixTuple->size,
1836
0
                 NULL, false);
1837
0
    xlrec.postfixBlkSame = true;
1838
0
  }
1839
0
  else
1840
0
  {
1841
0
    postfixBlkno = BufferGetBlockNumber(newBuffer);
1842
0
    xlrec.offnumPostfix = postfixOffset =
1843
0
      SpGistPageAddNewItem(state, BufferGetPage(newBuffer),
1844
0
                 (Item) postfixTuple, postfixTuple->size,
1845
0
                 NULL, false);
1846
0
    MarkBufferDirty(newBuffer);
1847
0
    xlrec.postfixBlkSame = false;
1848
0
  }
1849
1850
  /*
1851
   * And set downlink pointer in the prefix tuple to point to postfix tuple.
1852
   * (We can't avoid this step by doing the above two steps in opposite
1853
   * order, because there might not be enough space on the page to insert
1854
   * the postfix tuple first.)  We have to update the local copy of the
1855
   * prefixTuple too, because that's what will be written to WAL.
1856
   */
1857
0
  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1858
0
            postfixBlkno, postfixOffset);
1859
0
  prefixTuple = (SpGistInnerTuple) PageGetItem(current->page,
1860
0
                         PageGetItemId(current->page, current->offnum));
1861
0
  spgUpdateNodeLink(prefixTuple, out->result.splitTuple.childNodeN,
1862
0
            postfixBlkno, postfixOffset);
1863
1864
0
  MarkBufferDirty(current->buffer);
1865
1866
0
  if (RelationNeedsWAL(index) && !state->isBuild)
1867
0
  {
1868
0
    XLogRecPtr  recptr;
1869
1870
0
    XLogBeginInsert();
1871
0
    XLogRegisterData(&xlrec, sizeof(xlrec));
1872
0
    XLogRegisterData(prefixTuple, prefixTuple->size);
1873
0
    XLogRegisterData(postfixTuple, postfixTuple->size);
1874
1875
0
    XLogRegisterBuffer(0, current->buffer, REGBUF_STANDARD);
1876
0
    if (newBuffer != InvalidBuffer)
1877
0
    {
1878
0
      int     flags;
1879
1880
0
      flags = REGBUF_STANDARD;
1881
0
      if (xlrec.newPage)
1882
0
        flags |= REGBUF_WILL_INIT;
1883
0
      XLogRegisterBuffer(1, newBuffer, flags);
1884
0
    }
1885
1886
0
    recptr = XLogInsert(RM_SPGIST_ID, XLOG_SPGIST_SPLIT_TUPLE);
1887
1888
0
    PageSetLSN(current->page, recptr);
1889
1890
0
    if (newBuffer != InvalidBuffer)
1891
0
    {
1892
0
      PageSetLSN(BufferGetPage(newBuffer), recptr);
1893
0
    }
1894
0
  }
1895
1896
0
  END_CRIT_SECTION();
1897
1898
  /* Update local free-space cache and release buffer */
1899
0
  if (newBuffer != InvalidBuffer)
1900
0
  {
1901
0
    SpGistSetLastUsedPage(index, newBuffer);
1902
0
    UnlockReleaseBuffer(newBuffer);
1903
0
  }
1904
0
}
1905
1906
/*
1907
 * Insert one item into the index.
1908
 *
1909
 * Returns true on success, false if we failed to complete the insertion
1910
 * (typically because of conflict with a concurrent insert).  In the latter
1911
 * case, caller should re-call spgdoinsert() with the same args.
1912
 */
1913
bool
1914
spgdoinsert(Relation index, SpGistState *state,
1915
      ItemPointer heapPtr, Datum *datums, bool *isnulls)
1916
0
{
1917
0
  bool    result = true;
1918
0
  TupleDesc leafDescriptor = state->leafTupDesc;
1919
0
  bool    isnull = isnulls[spgKeyColumn];
1920
0
  int     level = 0;
1921
0
  Datum   leafDatums[INDEX_MAX_KEYS];
1922
0
  int     leafSize;
1923
0
  int     bestLeafSize;
1924
0
  int     numNoProgressCycles = 0;
1925
0
  SPPageDesc  current,
1926
0
        parent;
1927
0
  FmgrInfo   *procinfo = NULL;
1928
1929
  /*
1930
   * Look up FmgrInfo of the user-defined choose function once, to save
1931
   * cycles in the loop below.
1932
   */
1933
0
  if (!isnull)
1934
0
    procinfo = index_getprocinfo(index, 1, SPGIST_CHOOSE_PROC);
1935
1936
  /*
1937
   * Prepare the leaf datum to insert.
1938
   *
1939
   * If an optional "compress" method is provided, then call it to form the
1940
   * leaf key datum from the input datum.  Otherwise, store the input datum
1941
   * as is.  Since we don't use index_form_tuple in this AM, we have to make
1942
   * sure value to be inserted is not toasted; FormIndexDatum doesn't
1943
   * guarantee that.  But we assume the "compress" method to return an
1944
   * untoasted value.
1945
   */
1946
0
  if (!isnull)
1947
0
  {
1948
0
    if (OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
1949
0
    {
1950
0
      FmgrInfo   *compressProcinfo = NULL;
1951
1952
0
      compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
1953
0
      leafDatums[spgKeyColumn] =
1954
0
        FunctionCall1Coll(compressProcinfo,
1955
0
                  index->rd_indcollation[spgKeyColumn],
1956
0
                  datums[spgKeyColumn]);
1957
0
    }
1958
0
    else
1959
0
    {
1960
0
      Assert(state->attLeafType.type == state->attType.type);
1961
1962
0
      if (state->attType.attlen == -1)
1963
0
        leafDatums[spgKeyColumn] =
1964
0
          PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
1965
0
      else
1966
0
        leafDatums[spgKeyColumn] = datums[spgKeyColumn];
1967
0
    }
1968
0
  }
1969
0
  else
1970
0
    leafDatums[spgKeyColumn] = (Datum) 0;
1971
1972
  /* Likewise, ensure that any INCLUDE values are not toasted */
1973
0
  for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
1974
0
  {
1975
0
    if (!isnulls[i])
1976
0
    {
1977
0
      if (TupleDescCompactAttr(leafDescriptor, i)->attlen == -1)
1978
0
        leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
1979
0
      else
1980
0
        leafDatums[i] = datums[i];
1981
0
    }
1982
0
    else
1983
0
      leafDatums[i] = (Datum) 0;
1984
0
  }
1985
1986
  /*
1987
   * Compute space needed for a leaf tuple containing the given data.
1988
   */
1989
0
  leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
1990
  /* Account for an item pointer, too */
1991
0
  leafSize += sizeof(ItemIdData);
1992
1993
  /*
1994
   * If it isn't gonna fit, and the opclass can't reduce the datum size by
1995
   * suffixing, bail out now rather than doing a lot of useless work.
1996
   */
1997
0
  if (leafSize > SPGIST_PAGE_CAPACITY &&
1998
0
    (isnull || !state->config.longValuesOK))
1999
0
    ereport(ERROR,
2000
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2001
0
         errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2002
0
            leafSize - sizeof(ItemIdData),
2003
0
            SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2004
0
            RelationGetRelationName(index)),
2005
0
         errhint("Values larger than a buffer page cannot be indexed.")));
2006
0
  bestLeafSize = leafSize;
2007
2008
  /* Initialize "current" to the appropriate root page */
2009
0
  current.blkno = isnull ? SPGIST_NULL_BLKNO : SPGIST_ROOT_BLKNO;
2010
0
  current.buffer = InvalidBuffer;
2011
0
  current.page = NULL;
2012
0
  current.offnum = FirstOffsetNumber;
2013
0
  current.node = -1;
2014
2015
  /* "parent" is invalid for the moment */
2016
0
  parent.blkno = InvalidBlockNumber;
2017
0
  parent.buffer = InvalidBuffer;
2018
0
  parent.page = NULL;
2019
0
  parent.offnum = InvalidOffsetNumber;
2020
0
  parent.node = -1;
2021
2022
  /*
2023
   * Before entering the loop, try to clear any pending interrupt condition.
2024
   * If a query cancel is pending, we might as well accept it now not later;
2025
   * while if a non-canceling condition is pending, servicing it here avoids
2026
   * having to restart the insertion and redo all the work so far.
2027
   */
2028
0
  CHECK_FOR_INTERRUPTS();
2029
2030
0
  for (;;)
2031
0
  {
2032
0
    bool    isNew = false;
2033
2034
    /*
2035
     * Bail out if query cancel is pending.  We must have this somewhere
2036
     * in the loop since a broken opclass could produce an infinite
2037
     * picksplit loop.  However, because we'll be holding buffer lock(s)
2038
     * after the first iteration, ProcessInterrupts() wouldn't be able to
2039
     * throw a cancel error here.  Hence, if we see that an interrupt is
2040
     * pending, break out of the loop and deal with the situation below.
2041
     * Set result = false because we must restart the insertion if the
2042
     * interrupt isn't a query-cancel-or-die case.
2043
     */
2044
0
    if (INTERRUPTS_PENDING_CONDITION())
2045
0
    {
2046
0
      result = false;
2047
0
      break;
2048
0
    }
2049
2050
0
    if (current.blkno == InvalidBlockNumber)
2051
0
    {
2052
      /*
2053
       * Create a leaf page.  If leafSize is too large to fit on a page,
2054
       * we won't actually use the page yet, but it simplifies the API
2055
       * for doPickSplit to always have a leaf page at hand; so just
2056
       * quietly limit our request to a page size.
2057
       */
2058
0
      current.buffer =
2059
0
        SpGistGetBuffer(index,
2060
0
                GBUF_LEAF | (isnull ? GBUF_NULLS : 0),
2061
0
                Min(leafSize, SPGIST_PAGE_CAPACITY),
2062
0
                &isNew);
2063
0
      current.blkno = BufferGetBlockNumber(current.buffer);
2064
0
    }
2065
0
    else if (parent.buffer == InvalidBuffer)
2066
0
    {
2067
      /* we hold no parent-page lock, so no deadlock is possible */
2068
0
      current.buffer = ReadBuffer(index, current.blkno);
2069
0
      LockBuffer(current.buffer, BUFFER_LOCK_EXCLUSIVE);
2070
0
    }
2071
0
    else if (current.blkno != parent.blkno)
2072
0
    {
2073
      /* descend to a new child page */
2074
0
      current.buffer = ReadBuffer(index, current.blkno);
2075
2076
      /*
2077
       * Attempt to acquire lock on child page.  We must beware of
2078
       * deadlock against another insertion process descending from that
2079
       * page to our parent page (see README).  If we fail to get lock,
2080
       * abandon the insertion and tell our caller to start over.
2081
       *
2082
       * XXX this could be improved, because failing to get lock on a
2083
       * buffer is not proof of a deadlock situation; the lock might be
2084
       * held by a reader, or even just background writer/checkpointer
2085
       * process.  Perhaps it'd be worth retrying after sleeping a bit?
2086
       */
2087
0
      if (!ConditionalLockBuffer(current.buffer))
2088
0
      {
2089
0
        ReleaseBuffer(current.buffer);
2090
0
        UnlockReleaseBuffer(parent.buffer);
2091
0
        return false;
2092
0
      }
2093
0
    }
2094
0
    else
2095
0
    {
2096
      /* inner tuple can be stored on the same page as parent one */
2097
0
      current.buffer = parent.buffer;
2098
0
    }
2099
0
    current.page = BufferGetPage(current.buffer);
2100
2101
    /* should not arrive at a page of the wrong type */
2102
0
    if (isnull ? !SpGistPageStoresNulls(current.page) :
2103
0
      SpGistPageStoresNulls(current.page))
2104
0
      elog(ERROR, "SPGiST index page %u has wrong nulls flag",
2105
0
         current.blkno);
2106
2107
0
    if (SpGistPageIsLeaf(current.page))
2108
0
    {
2109
0
      SpGistLeafTuple leafTuple;
2110
0
      int     nToSplit,
2111
0
            sizeToSplit;
2112
2113
0
      leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
2114
0
      if (leafTuple->size + sizeof(ItemIdData) <=
2115
0
        SpGistPageGetFreeSpace(current.page, 1))
2116
0
      {
2117
        /* it fits on page, so insert it and we're done */
2118
0
        addLeafTuple(index, state, leafTuple,
2119
0
               &current, &parent, isnull, isNew);
2120
0
        break;
2121
0
      }
2122
0
      else if ((sizeToSplit =
2123
0
            checkSplitConditions(index, state, &current,
2124
0
                       &nToSplit)) < SPGIST_PAGE_CAPACITY / 2 &&
2125
0
           nToSplit < 64 &&
2126
0
           leafTuple->size + sizeof(ItemIdData) + sizeToSplit <= SPGIST_PAGE_CAPACITY)
2127
0
      {
2128
        /*
2129
         * the amount of data is pretty small, so just move the whole
2130
         * chain to another leaf page rather than splitting it.
2131
         */
2132
0
        Assert(!isNew);
2133
0
        moveLeafs(index, state, &current, &parent, leafTuple, isnull);
2134
0
        break;      /* we're done */
2135
0
      }
2136
0
      else
2137
0
      {
2138
        /* picksplit */
2139
0
        if (doPickSplit(index, state, &current, &parent,
2140
0
                leafTuple, level, isnull, isNew))
2141
0
          break;   /* doPickSplit installed new tuples */
2142
2143
        /* leaf tuple will not be inserted yet */
2144
0
        pfree(leafTuple);
2145
2146
        /*
2147
         * current now describes new inner tuple, go insert into it
2148
         */
2149
0
        Assert(!SpGistPageIsLeaf(current.page));
2150
0
        goto process_inner_tuple;
2151
0
      }
2152
0
    }
2153
0
    else          /* non-leaf page */
2154
0
    {
2155
      /*
2156
       * Apply the opclass choose function to figure out how to insert
2157
       * the given datum into the current inner tuple.
2158
       */
2159
0
      SpGistInnerTuple innerTuple;
2160
0
      spgChooseIn in;
2161
0
      spgChooseOut out;
2162
2163
      /*
2164
       * spgAddNode and spgSplitTuple cases will loop back to here to
2165
       * complete the insertion operation.  Just in case the choose
2166
       * function is broken and produces add or split requests
2167
       * repeatedly, check for query cancel (see comments above).
2168
       */
2169
0
  process_inner_tuple:
2170
0
      if (INTERRUPTS_PENDING_CONDITION())
2171
0
      {
2172
0
        result = false;
2173
0
        break;
2174
0
      }
2175
2176
0
      innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
2177
0
                            PageGetItemId(current.page, current.offnum));
2178
2179
0
      in.datum = datums[spgKeyColumn];
2180
0
      in.leafDatum = leafDatums[spgKeyColumn];
2181
0
      in.level = level;
2182
0
      in.allTheSame = innerTuple->allTheSame;
2183
0
      in.hasPrefix = (innerTuple->prefixSize > 0);
2184
0
      in.prefixDatum = SGITDATUM(innerTuple, state);
2185
0
      in.nNodes = innerTuple->nNodes;
2186
0
      in.nodeLabels = spgExtractNodeLabels(state, innerTuple);
2187
2188
0
      memset(&out, 0, sizeof(out));
2189
2190
0
      if (!isnull)
2191
0
      {
2192
        /* use user-defined choose method */
2193
0
        FunctionCall2Coll(procinfo,
2194
0
                  index->rd_indcollation[0],
2195
0
                  PointerGetDatum(&in),
2196
0
                  PointerGetDatum(&out));
2197
0
      }
2198
0
      else
2199
0
      {
2200
        /* force "match" action (to insert to random subnode) */
2201
0
        out.resultType = spgMatchNode;
2202
0
      }
2203
2204
0
      if (innerTuple->allTheSame)
2205
0
      {
2206
        /*
2207
         * It's not allowed to do an AddNode at an allTheSame tuple.
2208
         * Opclass must say "match", in which case we choose a random
2209
         * one of the nodes to descend into, or "split".
2210
         */
2211
0
        if (out.resultType == spgAddNode)
2212
0
          elog(ERROR, "cannot add a node to an allTheSame inner tuple");
2213
0
        else if (out.resultType == spgMatchNode)
2214
0
          out.result.matchNode.nodeN =
2215
0
            pg_prng_uint64_range(&pg_global_prng_state,
2216
0
                       0, innerTuple->nNodes - 1);
2217
0
      }
2218
2219
0
      switch (out.resultType)
2220
0
      {
2221
0
        case spgMatchNode:
2222
          /* Descend to N'th child node */
2223
0
          spgMatchNodeAction(index, state, innerTuple,
2224
0
                     &current, &parent,
2225
0
                     out.result.matchNode.nodeN);
2226
          /* Adjust level as per opclass request */
2227
0
          level += out.result.matchNode.levelAdd;
2228
          /* Replace leafDatum and recompute leafSize */
2229
0
          if (!isnull)
2230
0
          {
2231
0
            leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
2232
0
            leafSize = SpGistGetLeafTupleSize(leafDescriptor,
2233
0
                              leafDatums, isnulls);
2234
0
            leafSize += sizeof(ItemIdData);
2235
0
          }
2236
2237
          /*
2238
           * Check new tuple size; fail if it can't fit, unless the
2239
           * opclass says it can handle the situation by suffixing.
2240
           *
2241
           * However, the opclass can only shorten the leaf datum,
2242
           * which may not be enough to ever make the tuple fit,
2243
           * since INCLUDE columns might alone use more than a page.
2244
           * Depending on the opclass' behavior, that could lead to
2245
           * an infinite loop --- spgtextproc.c, for example, will
2246
           * just repeatedly generate an empty-string leaf datum
2247
           * once it runs out of data.  Actual bugs in opclasses
2248
           * might cause infinite looping, too.  To detect such a
2249
           * loop, check to see if we are making progress by
2250
           * reducing the leafSize in each pass.  This is a bit
2251
           * tricky though.  Because of alignment considerations,
2252
           * the total tuple size might not decrease on every pass.
2253
           * Also, there are edge cases where the choose method
2254
           * might seem to not make progress for a cycle or two.
2255
           * Somewhat arbitrarily, we allow up to 10 no-progress
2256
           * iterations before failing.  (This limit should be more
2257
           * than MAXALIGN, to accommodate opclasses that trim one
2258
           * byte from the leaf datum per pass.)
2259
           */
2260
0
          if (leafSize > SPGIST_PAGE_CAPACITY)
2261
0
          {
2262
0
            bool    ok = false;
2263
2264
0
            if (state->config.longValuesOK && !isnull)
2265
0
            {
2266
0
              if (leafSize < bestLeafSize)
2267
0
              {
2268
0
                ok = true;
2269
0
                bestLeafSize = leafSize;
2270
0
                numNoProgressCycles = 0;
2271
0
              }
2272
0
              else if (++numNoProgressCycles < 10)
2273
0
                ok = true;
2274
0
            }
2275
0
            if (!ok)
2276
0
              ereport(ERROR,
2277
0
                  (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
2278
0
                   errmsg("index row size %zu exceeds maximum %zu for index \"%s\"",
2279
0
                      leafSize - sizeof(ItemIdData),
2280
0
                      SPGIST_PAGE_CAPACITY - sizeof(ItemIdData),
2281
0
                      RelationGetRelationName(index)),
2282
0
                   errhint("Values larger than a buffer page cannot be indexed.")));
2283
0
          }
2284
2285
          /*
2286
           * Loop around and attempt to insert the new leafDatum at
2287
           * "current" (which might reference an existing child
2288
           * tuple, or might be invalid to force us to find a new
2289
           * page for the tuple).
2290
           */
2291
0
          break;
2292
0
        case spgAddNode:
2293
          /* AddNode is not sensible if nodes don't have labels */
2294
0
          if (in.nodeLabels == NULL)
2295
0
            elog(ERROR, "cannot add a node to an inner tuple without node labels");
2296
          /* Add node to inner tuple, per request */
2297
0
          spgAddNodeAction(index, state, innerTuple,
2298
0
                   &current, &parent,
2299
0
                   out.result.addNode.nodeN,
2300
0
                   out.result.addNode.nodeLabel);
2301
2302
          /*
2303
           * Retry insertion into the enlarged node.  We assume that
2304
           * we'll get a MatchNode result this time.
2305
           */
2306
0
          goto process_inner_tuple;
2307
0
          break;
2308
0
        case spgSplitTuple:
2309
          /* Split inner tuple, per request */
2310
0
          spgSplitNodeAction(index, state, innerTuple,
2311
0
                     &current, &out);
2312
2313
          /* Retry insertion into the split node */
2314
0
          goto process_inner_tuple;
2315
0
          break;
2316
0
        default:
2317
0
          elog(ERROR, "unrecognized SPGiST choose result: %d",
2318
0
             (int) out.resultType);
2319
0
          break;
2320
0
      }
2321
0
    }
2322
0
  }             /* end loop */
2323
2324
  /*
2325
   * Release any buffers we're still holding.  Beware of possibility that
2326
   * current and parent reference same buffer.
2327
   */
2328
0
  if (current.buffer != InvalidBuffer)
2329
0
  {
2330
0
    SpGistSetLastUsedPage(index, current.buffer);
2331
0
    UnlockReleaseBuffer(current.buffer);
2332
0
  }
2333
0
  if (parent.buffer != InvalidBuffer &&
2334
0
    parent.buffer != current.buffer)
2335
0
  {
2336
0
    SpGistSetLastUsedPage(index, parent.buffer);
2337
0
    UnlockReleaseBuffer(parent.buffer);
2338
0
  }
2339
2340
  /*
2341
   * We do not support being called while some outer function is holding a
2342
   * buffer lock (or any other reason to postpone query cancels).  If that
2343
   * were the case, telling the caller to retry would create an infinite
2344
   * loop.
2345
   */
2346
0
  Assert(INTERRUPTS_CAN_BE_PROCESSED());
2347
2348
  /*
2349
   * Finally, check for interrupts again.  If there was a query cancel,
2350
   * ProcessInterrupts() will be able to throw the error here.  If it was
2351
   * some other kind of interrupt that can just be cleared, return false to
2352
   * tell our caller to retry.
2353
   */
2354
0
  CHECK_FOR_INTERRUPTS();
2355
2356
0
  return result;
2357
0
}