Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/access/spgist/spgutils.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * spgutils.c
4
 *    various support functions for SP-GiST
5
 *
6
 *
7
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
8
 * Portions Copyright (c) 1994, Regents of the University of California
9
 *
10
 * IDENTIFICATION
11
 *      src/backend/access/spgist/spgutils.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
16
#include "postgres.h"
17
18
#include "access/amvalidate.h"
19
#include "access/htup_details.h"
20
#include "access/reloptions.h"
21
#include "access/spgist_private.h"
22
#include "access/toast_compression.h"
23
#include "access/transam.h"
24
#include "access/xact.h"
25
#include "catalog/pg_amop.h"
26
#include "commands/vacuum.h"
27
#include "nodes/nodeFuncs.h"
28
#include "parser/parse_coerce.h"
29
#include "storage/bufmgr.h"
30
#include "storage/indexfsm.h"
31
#include "utils/catcache.h"
32
#include "utils/fmgrprotos.h"
33
#include "utils/index_selfuncs.h"
34
#include "utils/lsyscache.h"
35
#include "utils/rel.h"
36
#include "utils/syscache.h"
37
38
39
/*
40
 * SP-GiST handler function: return IndexAmRoutine with access method parameters
41
 * and callbacks.
42
 */
43
Datum
44
spghandler(PG_FUNCTION_ARGS)
45
0
{
46
0
  IndexAmRoutine *amroutine = makeNode(IndexAmRoutine);
47
48
0
  amroutine->amstrategies = 0;
49
0
  amroutine->amsupport = SPGISTNProc;
50
0
  amroutine->amoptsprocnum = SPGIST_OPTIONS_PROC;
51
0
  amroutine->amcanorder = false;
52
0
  amroutine->amcanorderbyop = true;
53
0
  amroutine->amcanhash = false;
54
0
  amroutine->amconsistentequality = false;
55
0
  amroutine->amconsistentordering = false;
56
0
  amroutine->amcanbackward = false;
57
0
  amroutine->amcanunique = false;
58
0
  amroutine->amcanmulticol = false;
59
0
  amroutine->amoptionalkey = true;
60
0
  amroutine->amsearcharray = false;
61
0
  amroutine->amsearchnulls = true;
62
0
  amroutine->amstorage = true;
63
0
  amroutine->amclusterable = false;
64
0
  amroutine->ampredlocks = false;
65
0
  amroutine->amcanparallel = false;
66
0
  amroutine->amcanbuildparallel = false;
67
0
  amroutine->amcaninclude = true;
68
0
  amroutine->amusemaintenanceworkmem = false;
69
0
  amroutine->amsummarizing = false;
70
0
  amroutine->amparallelvacuumoptions =
71
0
    VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
72
0
  amroutine->amkeytype = InvalidOid;
73
74
0
  amroutine->ambuild = spgbuild;
75
0
  amroutine->ambuildempty = spgbuildempty;
76
0
  amroutine->aminsert = spginsert;
77
0
  amroutine->aminsertcleanup = NULL;
78
0
  amroutine->ambulkdelete = spgbulkdelete;
79
0
  amroutine->amvacuumcleanup = spgvacuumcleanup;
80
0
  amroutine->amcanreturn = spgcanreturn;
81
0
  amroutine->amcostestimate = spgcostestimate;
82
0
  amroutine->amgettreeheight = NULL;
83
0
  amroutine->amoptions = spgoptions;
84
0
  amroutine->amproperty = spgproperty;
85
0
  amroutine->ambuildphasename = NULL;
86
0
  amroutine->amvalidate = spgvalidate;
87
0
  amroutine->amadjustmembers = spgadjustmembers;
88
0
  amroutine->ambeginscan = spgbeginscan;
89
0
  amroutine->amrescan = spgrescan;
90
0
  amroutine->amgettuple = spggettuple;
91
0
  amroutine->amgetbitmap = spggetbitmap;
92
0
  amroutine->amendscan = spgendscan;
93
0
  amroutine->ammarkpos = NULL;
94
0
  amroutine->amrestrpos = NULL;
95
0
  amroutine->amestimateparallelscan = NULL;
96
0
  amroutine->aminitparallelscan = NULL;
97
0
  amroutine->amparallelrescan = NULL;
98
0
  amroutine->amtranslatestrategy = NULL;
99
0
  amroutine->amtranslatecmptype = NULL;
100
101
0
  PG_RETURN_POINTER(amroutine);
102
0
}
103
104
/*
105
 * GetIndexInputType
106
 *    Determine the nominal input data type for an index column
107
 *
108
 * We define the "nominal" input type as the associated opclass's opcintype,
109
 * or if that is a polymorphic type, the base type of the heap column or
110
 * expression that is the index's input.  The reason for preferring the
111
 * opcintype is that non-polymorphic opclasses probably don't want to hear
112
 * about binary-compatible input types.  For instance, if a text opclass
113
 * is being used with a varchar heap column, we want to report "text" not
114
 * "varchar".  Likewise, opclasses don't want to hear about domain types,
115
 * so if we do consult the actual input type, we make sure to flatten domains.
116
 *
117
 * At some point maybe this should go somewhere else, but it's not clear
118
 * if any other index AMs have a use for it.
119
 */
120
static Oid
121
GetIndexInputType(Relation index, AttrNumber indexcol)
122
0
{
123
0
  Oid     opcintype;
124
0
  AttrNumber  heapcol;
125
0
  List     *indexprs;
126
0
  ListCell   *indexpr_item;
127
128
0
  Assert(index->rd_index != NULL);
129
0
  Assert(indexcol > 0 && indexcol <= index->rd_index->indnkeyatts);
130
0
  opcintype = index->rd_opcintype[indexcol - 1];
131
0
  if (!IsPolymorphicType(opcintype))
132
0
    return opcintype;
133
0
  heapcol = index->rd_index->indkey.values[indexcol - 1];
134
0
  if (heapcol != 0)     /* Simple index column? */
135
0
    return getBaseType(get_atttype(index->rd_index->indrelid, heapcol));
136
137
  /*
138
   * If the index expressions are already cached, skip calling
139
   * RelationGetIndexExpressions, as it will make a copy which is overkill.
140
   * We're not going to modify the trees, and we're not going to do anything
141
   * that would invalidate the relcache entry before we're done.
142
   */
143
0
  if (index->rd_indexprs)
144
0
    indexprs = index->rd_indexprs;
145
0
  else
146
0
    indexprs = RelationGetIndexExpressions(index);
147
0
  indexpr_item = list_head(indexprs);
148
0
  for (int i = 1; i <= index->rd_index->indnkeyatts; i++)
149
0
  {
150
0
    if (index->rd_index->indkey.values[i - 1] == 0)
151
0
    {
152
      /* expression column */
153
0
      if (indexpr_item == NULL)
154
0
        elog(ERROR, "wrong number of index expressions");
155
0
      if (i == indexcol)
156
0
        return getBaseType(exprType((Node *) lfirst(indexpr_item)));
157
0
      indexpr_item = lnext(indexprs, indexpr_item);
158
0
    }
159
0
  }
160
0
  elog(ERROR, "wrong number of index expressions");
161
0
  return InvalidOid;     /* keep compiler quiet */
162
0
}
163
164
/* Fill in a SpGistTypeDesc struct with info about the specified data type */
165
static void
166
fillTypeDesc(SpGistTypeDesc *desc, Oid type)
167
0
{
168
0
  HeapTuple tp;
169
0
  Form_pg_type typtup;
170
171
0
  desc->type = type;
172
0
  tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
173
0
  if (!HeapTupleIsValid(tp))
174
0
    elog(ERROR, "cache lookup failed for type %u", type);
175
0
  typtup = (Form_pg_type) GETSTRUCT(tp);
176
0
  desc->attlen = typtup->typlen;
177
0
  desc->attbyval = typtup->typbyval;
178
0
  desc->attalign = typtup->typalign;
179
0
  desc->attstorage = typtup->typstorage;
180
0
  ReleaseSysCache(tp);
181
0
}
182
183
/*
184
 * Fetch local cache of AM-specific info about the index, initializing it
185
 * if necessary
186
 */
187
SpGistCache *
188
spgGetCache(Relation index)
189
0
{
190
0
  SpGistCache *cache;
191
192
0
  if (index->rd_amcache == NULL)
193
0
  {
194
0
    Oid     atttype;
195
0
    spgConfigIn in;
196
0
    FmgrInfo   *procinfo;
197
198
0
    cache = MemoryContextAllocZero(index->rd_indexcxt,
199
0
                     sizeof(SpGistCache));
200
201
    /* SPGiST must have one key column and can also have INCLUDE columns */
202
0
    Assert(IndexRelationGetNumberOfKeyAttributes(index) == 1);
203
0
    Assert(IndexRelationGetNumberOfAttributes(index) <= INDEX_MAX_KEYS);
204
205
    /*
206
     * Get the actual (well, nominal) data type of the key column.  We
207
     * pass this to the opclass config function so that polymorphic
208
     * opclasses are possible.
209
     */
210
0
    atttype = GetIndexInputType(index, spgKeyColumn + 1);
211
212
    /* Call the config function to get config info for the opclass */
213
0
    in.attType = atttype;
214
215
0
    procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
216
0
    FunctionCall2Coll(procinfo,
217
0
              index->rd_indcollation[spgKeyColumn],
218
0
              PointerGetDatum(&in),
219
0
              PointerGetDatum(&cache->config));
220
221
    /*
222
     * If leafType isn't specified, use the declared index column type,
223
     * which index.c will have derived from the opclass's opcintype.
224
     * (Although we now make spgvalidate.c warn if these aren't the same,
225
     * old user-defined opclasses may not set the STORAGE parameter
226
     * correctly, so believe leafType if it's given.)
227
     */
228
0
    if (!OidIsValid(cache->config.leafType))
229
0
    {
230
0
      cache->config.leafType =
231
0
        TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid;
232
233
      /*
234
       * If index column type is binary-coercible to atttype (for
235
       * example, it's a domain over atttype), treat it as plain atttype
236
       * to avoid thinking we need to compress.
237
       */
238
0
      if (cache->config.leafType != atttype &&
239
0
        IsBinaryCoercible(cache->config.leafType, atttype))
240
0
        cache->config.leafType = atttype;
241
0
    }
242
243
    /* Get the information we need about each relevant datatype */
244
0
    fillTypeDesc(&cache->attType, atttype);
245
246
0
    if (cache->config.leafType != atttype)
247
0
    {
248
0
      if (!OidIsValid(index_getprocid(index, 1, SPGIST_COMPRESS_PROC)))
249
0
        ereport(ERROR,
250
0
            (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
251
0
             errmsg("compress method must be defined when leaf type is different from input type")));
252
253
0
      fillTypeDesc(&cache->attLeafType, cache->config.leafType);
254
0
    }
255
0
    else
256
0
    {
257
      /* Save lookups in this common case */
258
0
      cache->attLeafType = cache->attType;
259
0
    }
260
261
0
    fillTypeDesc(&cache->attPrefixType, cache->config.prefixType);
262
0
    fillTypeDesc(&cache->attLabelType, cache->config.labelType);
263
264
    /*
265
     * Finally, if it's a real index (not a partitioned one), get the
266
     * lastUsedPages data from the metapage
267
     */
268
0
    if (index->rd_rel->relkind != RELKIND_PARTITIONED_INDEX)
269
0
    {
270
0
      Buffer    metabuffer;
271
0
      SpGistMetaPageData *metadata;
272
273
0
      metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
274
0
      LockBuffer(metabuffer, BUFFER_LOCK_SHARE);
275
276
0
      metadata = SpGistPageGetMeta(BufferGetPage(metabuffer));
277
278
0
      if (metadata->magicNumber != SPGIST_MAGIC_NUMBER)
279
0
        elog(ERROR, "index \"%s\" is not an SP-GiST index",
280
0
           RelationGetRelationName(index));
281
282
0
      cache->lastUsedPages = metadata->lastUsedPages;
283
284
0
      UnlockReleaseBuffer(metabuffer);
285
0
    }
286
287
0
    index->rd_amcache = cache;
288
0
  }
289
0
  else
290
0
  {
291
    /* assume it's up to date */
292
0
    cache = (SpGistCache *) index->rd_amcache;
293
0
  }
294
295
0
  return cache;
296
0
}
297
298
/*
299
 * Compute a tuple descriptor for leaf tuples or index-only-scan result tuples.
300
 *
301
 * We can use the relcache's tupdesc as-is in many cases, and it's always
302
 * OK so far as any INCLUDE columns are concerned.  However, the entry for
303
 * the key column has to match leafType in the first case or attType in the
304
 * second case.  While the relcache's tupdesc *should* show leafType, this
305
 * might not hold for legacy user-defined opclasses, since before v14 they
306
 * were not allowed to declare their true storage type in CREATE OPCLASS.
307
 * Also, attType can be different from what is in the relcache.
308
 *
309
 * This function gives back either a pointer to the relcache's tupdesc
310
 * if that is suitable, or a palloc'd copy that's been adjusted to match
311
 * the specified key column type.  We can avoid doing any catalog lookups
312
 * here by insisting that the caller pass an SpGistTypeDesc not just an OID.
313
 */
314
TupleDesc
315
getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType)
316
0
{
317
0
  TupleDesc outTupDesc;
318
0
  Form_pg_attribute att;
319
320
0
  if (keyType->type ==
321
0
    TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid)
322
0
    outTupDesc = RelationGetDescr(index);
323
0
  else
324
0
  {
325
0
    outTupDesc = CreateTupleDescCopy(RelationGetDescr(index));
326
0
    att = TupleDescAttr(outTupDesc, spgKeyColumn);
327
    /* It's sufficient to update the type-dependent fields of the column */
328
0
    att->atttypid = keyType->type;
329
0
    att->atttypmod = -1;
330
0
    att->attlen = keyType->attlen;
331
0
    att->attbyval = keyType->attbyval;
332
0
    att->attalign = keyType->attalign;
333
0
    att->attstorage = keyType->attstorage;
334
    /* We shouldn't need to bother with making these valid: */
335
0
    att->attcompression = InvalidCompressionMethod;
336
0
    att->attcollation = InvalidOid;
337
    /* In case we changed typlen, we'd better reset following offsets */
338
0
    for (int i = spgFirstIncludeColumn; i < outTupDesc->natts; i++)
339
0
      TupleDescCompactAttr(outTupDesc, i)->attcacheoff = -1;
340
341
0
    populate_compact_attribute(outTupDesc, spgKeyColumn);
342
0
  }
343
0
  return outTupDesc;
344
0
}
345
346
/* Initialize SpGistState for working with the given index */
347
void
348
initSpGistState(SpGistState *state, Relation index)
349
0
{
350
0
  SpGistCache *cache;
351
352
0
  state->index = index;
353
354
  /* Get cached static information about index */
355
0
  cache = spgGetCache(index);
356
357
0
  state->config = cache->config;
358
0
  state->attType = cache->attType;
359
0
  state->attLeafType = cache->attLeafType;
360
0
  state->attPrefixType = cache->attPrefixType;
361
0
  state->attLabelType = cache->attLabelType;
362
363
  /* Ensure we have a valid descriptor for leaf tuples */
364
0
  state->leafTupDesc = getSpGistTupleDesc(state->index, &state->attLeafType);
365
366
  /* Make workspace for constructing dead tuples */
367
0
  state->deadTupleStorage = palloc0(SGDTSIZE);
368
369
  /*
370
   * Set horizon XID to use in redirection tuples.  Use our own XID if we
371
   * have one, else use InvalidTransactionId.  The latter case can happen in
372
   * VACUUM or REINDEX CONCURRENTLY, and in neither case would it be okay to
373
   * force an XID to be assigned.  VACUUM won't create any redirection
374
   * tuples anyway, but REINDEX CONCURRENTLY can.  Fortunately, REINDEX
375
   * CONCURRENTLY doesn't mark the index valid until the end, so there could
376
   * never be any concurrent scans "in flight" to a redirection tuple it has
377
   * inserted.  And it locks out VACUUM until the end, too.  So it's okay
378
   * for VACUUM to immediately expire a redirection tuple that contains an
379
   * invalid xid.
380
   */
381
0
  state->redirectXid = GetTopTransactionIdIfAny();
382
383
  /* Assume we're not in an index build (spgbuild will override) */
384
0
  state->isBuild = false;
385
0
}
386
387
/*
388
 * Allocate a new page (either by recycling, or by extending the index file).
389
 *
390
 * The returned buffer is already pinned and exclusive-locked.
391
 * Caller is responsible for initializing the page by calling SpGistInitBuffer.
392
 */
393
Buffer
394
SpGistNewBuffer(Relation index)
395
0
{
396
0
  Buffer    buffer;
397
398
  /* First, try to get a page from FSM */
399
0
  for (;;)
400
0
  {
401
0
    BlockNumber blkno = GetFreeIndexPage(index);
402
403
0
    if (blkno == InvalidBlockNumber)
404
0
      break;       /* nothing known to FSM */
405
406
    /*
407
     * The fixed pages shouldn't ever be listed in FSM, but just in case
408
     * one is, ignore it.
409
     */
410
0
    if (SpGistBlockIsFixed(blkno))
411
0
      continue;
412
413
0
    buffer = ReadBuffer(index, blkno);
414
415
    /*
416
     * We have to guard against the possibility that someone else already
417
     * recycled this page; the buffer may be locked if so.
418
     */
419
0
    if (ConditionalLockBuffer(buffer))
420
0
    {
421
0
      Page    page = BufferGetPage(buffer);
422
423
0
      if (PageIsNew(page))
424
0
        return buffer; /* OK to use, if never initialized */
425
426
0
      if (SpGistPageIsDeleted(page) || PageIsEmpty(page))
427
0
        return buffer; /* OK to use */
428
429
0
      LockBuffer(buffer, BUFFER_LOCK_UNLOCK);
430
0
    }
431
432
    /* Can't use it, so release buffer and try again */
433
0
    ReleaseBuffer(buffer);
434
0
  }
435
436
0
  buffer = ExtendBufferedRel(BMR_REL(index), MAIN_FORKNUM, NULL,
437
0
                 EB_LOCK_FIRST);
438
439
0
  return buffer;
440
0
}
441
442
/*
443
 * Update index metapage's lastUsedPages info from local cache, if possible
444
 *
445
 * Updating meta page isn't critical for index working, so
446
 * 1 use ConditionalLockBuffer to improve concurrency
447
 * 2 don't WAL-log metabuffer changes to decrease WAL traffic
448
 */
449
void
450
SpGistUpdateMetaPage(Relation index)
451
0
{
452
0
  SpGistCache *cache = (SpGistCache *) index->rd_amcache;
453
454
0
  if (cache != NULL)
455
0
  {
456
0
    Buffer    metabuffer;
457
458
0
    metabuffer = ReadBuffer(index, SPGIST_METAPAGE_BLKNO);
459
460
0
    if (ConditionalLockBuffer(metabuffer))
461
0
    {
462
0
      Page    metapage = BufferGetPage(metabuffer);
463
0
      SpGistMetaPageData *metadata = SpGistPageGetMeta(metapage);
464
465
0
      metadata->lastUsedPages = cache->lastUsedPages;
466
467
      /*
468
       * Set pd_lower just past the end of the metadata.  This is
469
       * essential, because without doing so, metadata will be lost if
470
       * xlog.c compresses the page.  (We must do this here because
471
       * pre-v11 versions of PG did not set the metapage's pd_lower
472
       * correctly, so a pg_upgraded index might contain the wrong
473
       * value.)
474
       */
475
0
      ((PageHeader) metapage)->pd_lower =
476
0
        ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) metapage;
477
478
0
      MarkBufferDirty(metabuffer);
479
0
      UnlockReleaseBuffer(metabuffer);
480
0
    }
481
0
    else
482
0
    {
483
0
      ReleaseBuffer(metabuffer);
484
0
    }
485
0
  }
486
0
}
487
488
/* Macro to select proper element of lastUsedPages cache depending on flags */
489
/* Masking flags with SPGIST_CACHED_PAGES is just for paranoia's sake */
490
0
#define GET_LUP(c, f)  (&(c)->lastUsedPages.cachedPage[((unsigned int) (f)) % SPGIST_CACHED_PAGES])
491
492
/*
493
 * Allocate and initialize a new buffer of the type and parity specified by
494
 * flags.  The returned buffer is already pinned and exclusive-locked.
495
 *
496
 * When requesting an inner page, if we get one with the wrong parity,
497
 * we just release the buffer and try again.  We will get a different page
498
 * because GetFreeIndexPage will have marked the page used in FSM.  The page
499
 * is entered in our local lastUsedPages cache, so there's some hope of
500
 * making use of it later in this session, but otherwise we rely on VACUUM
501
 * to eventually re-enter the page in FSM, making it available for recycling.
502
 * Note that such a page does not get marked dirty here, so unless it's used
503
 * fairly soon, the buffer will just get discarded and the page will remain
504
 * as it was on disk.
505
 *
506
 * When we return a buffer to the caller, the page is *not* entered into
507
 * the lastUsedPages cache; we expect the caller will do so after it's taken
508
 * whatever space it will use.  This is because after the caller has used up
509
 * some space, the page might have less space than whatever was cached already
510
 * so we'd rather not trash the old cache entry.
511
 */
512
static Buffer
513
allocNewBuffer(Relation index, int flags)
514
0
{
515
0
  SpGistCache *cache = spgGetCache(index);
516
0
  uint16    pageflags = 0;
517
518
0
  if (GBUF_REQ_LEAF(flags))
519
0
    pageflags |= SPGIST_LEAF;
520
0
  if (GBUF_REQ_NULLS(flags))
521
0
    pageflags |= SPGIST_NULLS;
522
523
0
  for (;;)
524
0
  {
525
0
    Buffer    buffer;
526
527
0
    buffer = SpGistNewBuffer(index);
528
0
    SpGistInitBuffer(buffer, pageflags);
529
530
0
    if (pageflags & SPGIST_LEAF)
531
0
    {
532
      /* Leaf pages have no parity concerns, so just use it */
533
0
      return buffer;
534
0
    }
535
0
    else
536
0
    {
537
0
      BlockNumber blkno = BufferGetBlockNumber(buffer);
538
0
      int     blkFlags = GBUF_INNER_PARITY(blkno);
539
540
0
      if ((flags & GBUF_PARITY_MASK) == blkFlags)
541
0
      {
542
        /* Page has right parity, use it */
543
0
        return buffer;
544
0
      }
545
0
      else
546
0
      {
547
        /* Page has wrong parity, record it in cache and try again */
548
0
        if (pageflags & SPGIST_NULLS)
549
0
          blkFlags |= GBUF_NULLS;
550
0
        cache->lastUsedPages.cachedPage[blkFlags].blkno = blkno;
551
0
        cache->lastUsedPages.cachedPage[blkFlags].freeSpace =
552
0
          PageGetExactFreeSpace(BufferGetPage(buffer));
553
0
        UnlockReleaseBuffer(buffer);
554
0
      }
555
0
    }
556
0
  }
557
0
}
558
559
/*
560
 * Get a buffer of the type and parity specified by flags, having at least
561
 * as much free space as indicated by needSpace.  We use the lastUsedPages
562
 * cache to assign the same buffer previously requested when possible.
563
 * The returned buffer is already pinned and exclusive-locked.
564
 *
565
 * *isNew is set true if the page was initialized here, false if it was
566
 * already valid.
567
 */
568
Buffer
569
SpGistGetBuffer(Relation index, int flags, int needSpace, bool *isNew)
570
0
{
571
0
  SpGistCache *cache = spgGetCache(index);
572
0
  SpGistLastUsedPage *lup;
573
574
  /* Bail out if even an empty page wouldn't meet the demand */
575
0
  if (needSpace > SPGIST_PAGE_CAPACITY)
576
0
    elog(ERROR, "desired SPGiST tuple size is too big");
577
578
  /*
579
   * If possible, increase the space request to include relation's
580
   * fillfactor.  This ensures that when we add unrelated tuples to a page,
581
   * we try to keep 100-fillfactor% available for adding tuples that are
582
   * related to the ones already on it.  But fillfactor mustn't cause an
583
   * error for requests that would otherwise be legal.
584
   */
585
0
  needSpace += SpGistGetTargetPageFreeSpace(index);
586
0
  needSpace = Min(needSpace, SPGIST_PAGE_CAPACITY);
587
588
  /* Get the cache entry for this flags setting */
589
0
  lup = GET_LUP(cache, flags);
590
591
  /* If we have nothing cached, just turn it over to allocNewBuffer */
592
0
  if (lup->blkno == InvalidBlockNumber)
593
0
  {
594
0
    *isNew = true;
595
0
    return allocNewBuffer(index, flags);
596
0
  }
597
598
  /* fixed pages should never be in cache */
599
0
  Assert(!SpGistBlockIsFixed(lup->blkno));
600
601
  /* If cached freeSpace isn't enough, don't bother looking at the page */
602
0
  if (lup->freeSpace >= needSpace)
603
0
  {
604
0
    Buffer    buffer;
605
0
    Page    page;
606
607
0
    buffer = ReadBuffer(index, lup->blkno);
608
609
0
    if (!ConditionalLockBuffer(buffer))
610
0
    {
611
      /*
612
       * buffer is locked by another process, so return a new buffer
613
       */
614
0
      ReleaseBuffer(buffer);
615
0
      *isNew = true;
616
0
      return allocNewBuffer(index, flags);
617
0
    }
618
619
0
    page = BufferGetPage(buffer);
620
621
0
    if (PageIsNew(page) || SpGistPageIsDeleted(page) || PageIsEmpty(page))
622
0
    {
623
      /* OK to initialize the page */
624
0
      uint16    pageflags = 0;
625
626
0
      if (GBUF_REQ_LEAF(flags))
627
0
        pageflags |= SPGIST_LEAF;
628
0
      if (GBUF_REQ_NULLS(flags))
629
0
        pageflags |= SPGIST_NULLS;
630
0
      SpGistInitBuffer(buffer, pageflags);
631
0
      lup->freeSpace = PageGetExactFreeSpace(page) - needSpace;
632
0
      *isNew = true;
633
0
      return buffer;
634
0
    }
635
636
    /*
637
     * Check that page is of right type and has enough space.  We must
638
     * recheck this since our cache isn't necessarily up to date.
639
     */
640
0
    if ((GBUF_REQ_LEAF(flags) ? SpGistPageIsLeaf(page) : !SpGistPageIsLeaf(page)) &&
641
0
      (GBUF_REQ_NULLS(flags) ? SpGistPageStoresNulls(page) : !SpGistPageStoresNulls(page)))
642
0
    {
643
0
      int     freeSpace = PageGetExactFreeSpace(page);
644
645
0
      if (freeSpace >= needSpace)
646
0
      {
647
        /* Success, update freespace info and return the buffer */
648
0
        lup->freeSpace = freeSpace - needSpace;
649
0
        *isNew = false;
650
0
        return buffer;
651
0
      }
652
0
    }
653
654
    /*
655
     * fallback to allocation of new buffer
656
     */
657
0
    UnlockReleaseBuffer(buffer);
658
0
  }
659
660
  /* No success with cache, so return a new buffer */
661
0
  *isNew = true;
662
0
  return allocNewBuffer(index, flags);
663
0
}
664
665
/*
666
 * Update lastUsedPages cache when done modifying a page.
667
 *
668
 * We update the appropriate cache entry if it already contained this page
669
 * (its freeSpace is likely obsolete), or if this page has more space than
670
 * whatever we had cached.
671
 */
672
void
673
SpGistSetLastUsedPage(Relation index, Buffer buffer)
674
0
{
675
0
  SpGistCache *cache = spgGetCache(index);
676
0
  SpGistLastUsedPage *lup;
677
0
  int     freeSpace;
678
0
  Page    page = BufferGetPage(buffer);
679
0
  BlockNumber blkno = BufferGetBlockNumber(buffer);
680
0
  int     flags;
681
682
  /* Never enter fixed pages (root pages) in cache, though */
683
0
  if (SpGistBlockIsFixed(blkno))
684
0
    return;
685
686
0
  if (SpGistPageIsLeaf(page))
687
0
    flags = GBUF_LEAF;
688
0
  else
689
0
    flags = GBUF_INNER_PARITY(blkno);
690
0
  if (SpGistPageStoresNulls(page))
691
0
    flags |= GBUF_NULLS;
692
693
0
  lup = GET_LUP(cache, flags);
694
695
0
  freeSpace = PageGetExactFreeSpace(page);
696
0
  if (lup->blkno == InvalidBlockNumber || lup->blkno == blkno ||
697
0
    lup->freeSpace < freeSpace)
698
0
  {
699
0
    lup->blkno = blkno;
700
0
    lup->freeSpace = freeSpace;
701
0
  }
702
0
}
703
704
/*
705
 * Initialize an SPGiST page to empty, with specified flags
706
 */
707
void
708
SpGistInitPage(Page page, uint16 f)
709
0
{
710
0
  SpGistPageOpaque opaque;
711
712
0
  PageInit(page, BLCKSZ, sizeof(SpGistPageOpaqueData));
713
0
  opaque = SpGistPageGetOpaque(page);
714
0
  opaque->flags = f;
715
0
  opaque->spgist_page_id = SPGIST_PAGE_ID;
716
0
}
717
718
/*
719
 * Initialize a buffer's page to empty, with specified flags
720
 */
721
void
722
SpGistInitBuffer(Buffer b, uint16 f)
723
0
{
724
0
  Assert(BufferGetPageSize(b) == BLCKSZ);
725
0
  SpGistInitPage(BufferGetPage(b), f);
726
0
}
727
728
/*
729
 * Initialize metadata page
730
 */
731
void
732
SpGistInitMetapage(Page page)
733
0
{
734
0
  SpGistMetaPageData *metadata;
735
0
  int     i;
736
737
0
  SpGistInitPage(page, SPGIST_META);
738
0
  metadata = SpGistPageGetMeta(page);
739
0
  memset(metadata, 0, sizeof(SpGistMetaPageData));
740
0
  metadata->magicNumber = SPGIST_MAGIC_NUMBER;
741
742
  /* initialize last-used-page cache to empty */
743
0
  for (i = 0; i < SPGIST_CACHED_PAGES; i++)
744
0
    metadata->lastUsedPages.cachedPage[i].blkno = InvalidBlockNumber;
745
746
  /*
747
   * Set pd_lower just past the end of the metadata.  This is essential,
748
   * because without doing so, metadata will be lost if xlog.c compresses
749
   * the page.
750
   */
751
0
  ((PageHeader) page)->pd_lower =
752
0
    ((char *) metadata + sizeof(SpGistMetaPageData)) - (char *) page;
753
0
}
754
755
/*
756
 * reloptions processing for SPGiST
757
 */
758
bytea *
759
spgoptions(Datum reloptions, bool validate)
760
0
{
761
0
  static const relopt_parse_elt tab[] = {
762
0
    {"fillfactor", RELOPT_TYPE_INT, offsetof(SpGistOptions, fillfactor)},
763
0
  };
764
765
0
  return (bytea *) build_reloptions(reloptions, validate,
766
0
                    RELOPT_KIND_SPGIST,
767
0
                    sizeof(SpGistOptions),
768
0
                    tab, lengthof(tab));
769
0
}
770
771
/*
772
 * Get the space needed to store a non-null datum of the indicated type
773
 * in an inner tuple (that is, as a prefix or node label).
774
 * Note the result is already rounded up to a MAXALIGN boundary.
775
 * Here we follow the convention that pass-by-val types are just stored
776
 * in their Datum representation (compare memcpyInnerDatum).
777
 */
778
unsigned int
779
SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum)
780
0
{
781
0
  unsigned int size;
782
783
0
  if (att->attbyval)
784
0
    size = sizeof(Datum);
785
0
  else if (att->attlen > 0)
786
0
    size = att->attlen;
787
0
  else
788
0
    size = VARSIZE_ANY(datum);
789
790
0
  return MAXALIGN(size);
791
0
}
792
793
/*
794
 * Copy the given non-null datum to *target, in the inner-tuple case
795
 */
796
static void
797
memcpyInnerDatum(void *target, SpGistTypeDesc *att, Datum datum)
798
0
{
799
0
  unsigned int size;
800
801
0
  if (att->attbyval)
802
0
  {
803
0
    memcpy(target, &datum, sizeof(Datum));
804
0
  }
805
0
  else
806
0
  {
807
0
    size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
808
0
    memcpy(target, DatumGetPointer(datum), size);
809
0
  }
810
0
}
811
812
/*
813
 * Compute space required for a leaf tuple holding the given data.
814
 *
815
 * This must match the size-calculation portion of spgFormLeafTuple.
816
 */
817
Size
818
SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,
819
             const Datum *datums, const bool *isnulls)
820
0
{
821
0
  Size    size;
822
0
  Size    data_size;
823
0
  bool    needs_null_mask = false;
824
0
  int     natts = tupleDescriptor->natts;
825
826
  /*
827
   * Decide whether we need a nulls bitmask.
828
   *
829
   * If there is only a key attribute (natts == 1), never use a bitmask, for
830
   * compatibility with the pre-v14 layout of leaf tuples.  Otherwise, we
831
   * need one if any attribute is null.
832
   */
833
0
  if (natts > 1)
834
0
  {
835
0
    for (int i = 0; i < natts; i++)
836
0
    {
837
0
      if (isnulls[i])
838
0
      {
839
0
        needs_null_mask = true;
840
0
        break;
841
0
      }
842
0
    }
843
0
  }
844
845
  /*
846
   * Calculate size of the data part; same as for heap tuples.
847
   */
848
0
  data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
849
850
  /*
851
   * Compute total size.
852
   */
853
0
  size = SGLTHDRSZ(needs_null_mask);
854
0
  size += data_size;
855
0
  size = MAXALIGN(size);
856
857
  /*
858
   * Ensure that we can replace the tuple with a dead tuple later. This test
859
   * is unnecessary when there are any non-null attributes, but be safe.
860
   */
861
0
  if (size < SGDTSIZE)
862
0
    size = SGDTSIZE;
863
864
0
  return size;
865
0
}
866
867
/*
868
 * Construct a leaf tuple containing the given heap TID and datum values
869
 */
870
SpGistLeafTuple
871
spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
872
         const Datum *datums, const bool *isnulls)
873
0
{
874
0
  SpGistLeafTuple tup;
875
0
  TupleDesc tupleDescriptor = state->leafTupDesc;
876
0
  Size    size;
877
0
  Size    hoff;
878
0
  Size    data_size;
879
0
  bool    needs_null_mask = false;
880
0
  int     natts = tupleDescriptor->natts;
881
0
  char     *tp;       /* ptr to tuple data */
882
0
  uint16    tupmask = 0;  /* unused heap_fill_tuple output */
883
884
  /*
885
   * Decide whether we need a nulls bitmask.
886
   *
887
   * If there is only a key attribute (natts == 1), never use a bitmask, for
888
   * compatibility with the pre-v14 layout of leaf tuples.  Otherwise, we
889
   * need one if any attribute is null.
890
   */
891
0
  if (natts > 1)
892
0
  {
893
0
    for (int i = 0; i < natts; i++)
894
0
    {
895
0
      if (isnulls[i])
896
0
      {
897
0
        needs_null_mask = true;
898
0
        break;
899
0
      }
900
0
    }
901
0
  }
902
903
  /*
904
   * Calculate size of the data part; same as for heap tuples.
905
   */
906
0
  data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
907
908
  /*
909
   * Compute total size.
910
   */
911
0
  hoff = SGLTHDRSZ(needs_null_mask);
912
0
  size = hoff + data_size;
913
0
  size = MAXALIGN(size);
914
915
  /*
916
   * Ensure that we can replace the tuple with a dead tuple later. This test
917
   * is unnecessary when there are any non-null attributes, but be safe.
918
   */
919
0
  if (size < SGDTSIZE)
920
0
    size = SGDTSIZE;
921
922
  /* OK, form the tuple */
923
0
  tup = (SpGistLeafTuple) palloc0(size);
924
925
0
  tup->size = size;
926
0
  SGLT_SET_NEXTOFFSET(tup, InvalidOffsetNumber);
927
0
  tup->heapPtr = *heapPtr;
928
929
0
  tp = (char *) tup + hoff;
930
931
0
  if (needs_null_mask)
932
0
  {
933
0
    bits8    *bp;     /* ptr to null bitmap in tuple */
934
935
    /* Set nullmask presence bit in SpGistLeafTuple header */
936
0
    SGLT_SET_HASNULLMASK(tup, true);
937
    /* Fill the data area and null mask */
938
0
    bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
939
0
    heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
940
0
            &tupmask, bp);
941
0
  }
942
0
  else if (natts > 1 || !isnulls[spgKeyColumn])
943
0
  {
944
    /* Fill data area only */
945
0
    heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
946
0
            &tupmask, (bits8 *) NULL);
947
0
  }
948
  /* otherwise we have no data, nor a bitmap, to fill */
949
950
0
  return tup;
951
0
}
952
953
/*
954
 * Construct a node (to go into an inner tuple) containing the given label
955
 *
956
 * Note that the node's downlink is just set invalid here.  Caller will fill
957
 * it in later.
958
 */
959
SpGistNodeTuple
960
spgFormNodeTuple(SpGistState *state, Datum label, bool isnull)
961
0
{
962
0
  SpGistNodeTuple tup;
963
0
  unsigned int size;
964
0
  unsigned short infomask = 0;
965
966
  /* compute space needed (note result is already maxaligned) */
967
0
  size = SGNTHDRSZ;
968
0
  if (!isnull)
969
0
    size += SpGistGetInnerTypeSize(&state->attLabelType, label);
970
971
  /*
972
   * Here we make sure that the size will fit in the field reserved for it
973
   * in t_info.
974
   */
975
0
  if ((size & INDEX_SIZE_MASK) != size)
976
0
    ereport(ERROR,
977
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
978
0
         errmsg("index row requires %zu bytes, maximum size is %zu",
979
0
            (Size) size, (Size) INDEX_SIZE_MASK)));
980
981
0
  tup = (SpGistNodeTuple) palloc0(size);
982
983
0
  if (isnull)
984
0
    infomask |= INDEX_NULL_MASK;
985
  /* we don't bother setting the INDEX_VAR_MASK bit */
986
0
  infomask |= size;
987
0
  tup->t_info = infomask;
988
989
  /* The TID field will be filled in later */
990
0
  ItemPointerSetInvalid(&tup->t_tid);
991
992
0
  if (!isnull)
993
0
    memcpyInnerDatum(SGNTDATAPTR(tup), &state->attLabelType, label);
994
995
0
  return tup;
996
0
}
997
998
/*
999
 * Construct an inner tuple containing the given prefix and node array
1000
 */
1001
SpGistInnerTuple
1002
spgFormInnerTuple(SpGistState *state, bool hasPrefix, Datum prefix,
1003
          int nNodes, SpGistNodeTuple *nodes)
1004
0
{
1005
0
  SpGistInnerTuple tup;
1006
0
  unsigned int size;
1007
0
  unsigned int prefixSize;
1008
0
  int     i;
1009
0
  char     *ptr;
1010
1011
  /* Compute size needed */
1012
0
  if (hasPrefix)
1013
0
    prefixSize = SpGistGetInnerTypeSize(&state->attPrefixType, prefix);
1014
0
  else
1015
0
    prefixSize = 0;
1016
1017
0
  size = SGITHDRSZ + prefixSize;
1018
1019
  /* Note: we rely on node tuple sizes to be maxaligned already */
1020
0
  for (i = 0; i < nNodes; i++)
1021
0
    size += IndexTupleSize(nodes[i]);
1022
1023
  /*
1024
   * Ensure that we can replace the tuple with a dead tuple later.  This
1025
   * test is unnecessary given current tuple layouts, but let's be safe.
1026
   */
1027
0
  if (size < SGDTSIZE)
1028
0
    size = SGDTSIZE;
1029
1030
  /*
1031
   * Inner tuple should be small enough to fit on a page
1032
   */
1033
0
  if (size > SPGIST_PAGE_CAPACITY - sizeof(ItemIdData))
1034
0
    ereport(ERROR,
1035
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
1036
0
         errmsg("SP-GiST inner tuple size %zu exceeds maximum %zu",
1037
0
            (Size) size,
1038
0
            SPGIST_PAGE_CAPACITY - sizeof(ItemIdData)),
1039
0
         errhint("Values larger than a buffer page cannot be indexed.")));
1040
1041
  /*
1042
   * Check for overflow of header fields --- probably can't fail if the
1043
   * above succeeded, but let's be paranoid
1044
   */
1045
0
  if (size > SGITMAXSIZE ||
1046
0
    prefixSize > SGITMAXPREFIXSIZE ||
1047
0
    nNodes > SGITMAXNNODES)
1048
0
    elog(ERROR, "SPGiST inner tuple header field is too small");
1049
1050
  /* OK, form the tuple */
1051
0
  tup = (SpGistInnerTuple) palloc0(size);
1052
1053
0
  tup->nNodes = nNodes;
1054
0
  tup->prefixSize = prefixSize;
1055
0
  tup->size = size;
1056
1057
0
  if (hasPrefix)
1058
0
    memcpyInnerDatum(SGITDATAPTR(tup), &state->attPrefixType, prefix);
1059
1060
0
  ptr = (char *) SGITNODEPTR(tup);
1061
1062
0
  for (i = 0; i < nNodes; i++)
1063
0
  {
1064
0
    SpGistNodeTuple node = nodes[i];
1065
1066
0
    memcpy(ptr, node, IndexTupleSize(node));
1067
0
    ptr += IndexTupleSize(node);
1068
0
  }
1069
1070
0
  return tup;
1071
0
}
1072
1073
/*
1074
 * Construct a "dead" tuple to replace a tuple being deleted.
1075
 *
1076
 * The state can be SPGIST_REDIRECT, SPGIST_DEAD, or SPGIST_PLACEHOLDER.
1077
 * For a REDIRECT tuple, a pointer (blkno+offset) must be supplied, and
1078
 * the xid field is filled in automatically.
1079
 *
1080
 * This is called in critical sections, so we don't use palloc; the tuple
1081
 * is built in preallocated storage.  It should be copied before another
1082
 * call with different parameters can occur.
1083
 */
1084
SpGistDeadTuple
1085
spgFormDeadTuple(SpGistState *state, int tupstate,
1086
         BlockNumber blkno, OffsetNumber offnum)
1087
0
{
1088
0
  SpGistDeadTuple tuple = (SpGistDeadTuple) state->deadTupleStorage;
1089
1090
0
  tuple->tupstate = tupstate;
1091
0
  tuple->size = SGDTSIZE;
1092
0
  SGLT_SET_NEXTOFFSET(tuple, InvalidOffsetNumber);
1093
1094
0
  if (tupstate == SPGIST_REDIRECT)
1095
0
  {
1096
0
    ItemPointerSet(&tuple->pointer, blkno, offnum);
1097
0
    tuple->xid = state->redirectXid;
1098
0
  }
1099
0
  else
1100
0
  {
1101
0
    ItemPointerSetInvalid(&tuple->pointer);
1102
0
    tuple->xid = InvalidTransactionId;
1103
0
  }
1104
1105
0
  return tuple;
1106
0
}
1107
1108
/*
1109
 * Convert an SPGiST leaf tuple into Datum/isnull arrays.
1110
 *
1111
 * The caller must allocate sufficient storage for the output arrays.
1112
 * (INDEX_MAX_KEYS entries should be enough.)
1113
 */
1114
void
1115
spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor,
1116
           Datum *datums, bool *isnulls, bool keyColumnIsNull)
1117
0
{
1118
0
  bool    hasNullsMask = SGLT_GET_HASNULLMASK(tup);
1119
0
  char     *tp;       /* ptr to tuple data */
1120
0
  bits8    *bp;       /* ptr to null bitmap in tuple */
1121
1122
0
  if (keyColumnIsNull && tupleDescriptor->natts == 1)
1123
0
  {
1124
    /*
1125
     * Trivial case: there is only the key attribute and we're in a nulls
1126
     * tree.  The hasNullsMask bit in the tuple header should not be set
1127
     * (and thus we can't use index_deform_tuple_internal), but
1128
     * nonetheless the result is NULL.
1129
     *
1130
     * Note: currently this is dead code, because noplace calls this when
1131
     * there is only the key attribute.  But we should cover the case.
1132
     */
1133
0
    Assert(!hasNullsMask);
1134
1135
0
    datums[spgKeyColumn] = (Datum) 0;
1136
0
    isnulls[spgKeyColumn] = true;
1137
0
    return;
1138
0
  }
1139
1140
0
  tp = (char *) tup + SGLTHDRSZ(hasNullsMask);
1141
0
  bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
1142
1143
0
  index_deform_tuple_internal(tupleDescriptor,
1144
0
                datums, isnulls,
1145
0
                tp, bp, hasNullsMask);
1146
1147
  /*
1148
   * Key column isnull value from the tuple should be consistent with
1149
   * keyColumnIsNull flag from the caller.
1150
   */
1151
0
  Assert(keyColumnIsNull == isnulls[spgKeyColumn]);
1152
0
}
1153
1154
/*
1155
 * Extract the label datums of the nodes within innerTuple
1156
 *
1157
 * Returns NULL if label datums are NULLs
1158
 */
1159
Datum *
1160
spgExtractNodeLabels(SpGistState *state, SpGistInnerTuple innerTuple)
1161
0
{
1162
0
  Datum    *nodeLabels;
1163
0
  int     i;
1164
0
  SpGistNodeTuple node;
1165
1166
  /* Either all the labels must be NULL, or none. */
1167
0
  node = SGITNODEPTR(innerTuple);
1168
0
  if (IndexTupleHasNulls(node))
1169
0
  {
1170
0
    SGITITERATE(innerTuple, i, node)
1171
0
    {
1172
0
      if (!IndexTupleHasNulls(node))
1173
0
        elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
1174
0
    }
1175
    /* They're all null, so just return NULL */
1176
0
    return NULL;
1177
0
  }
1178
0
  else
1179
0
  {
1180
0
    nodeLabels = (Datum *) palloc(sizeof(Datum) * innerTuple->nNodes);
1181
0
    SGITITERATE(innerTuple, i, node)
1182
0
    {
1183
0
      if (IndexTupleHasNulls(node))
1184
0
        elog(ERROR, "some but not all node labels are null in SPGiST inner tuple");
1185
0
      nodeLabels[i] = SGNTDATUM(node, state);
1186
0
    }
1187
0
    return nodeLabels;
1188
0
  }
1189
0
}
1190
1191
/*
1192
 * Add a new item to the page, replacing a PLACEHOLDER item if possible.
1193
 * Return the location it's inserted at, or InvalidOffsetNumber on failure.
1194
 *
1195
 * If startOffset isn't NULL, we start searching for placeholders at
1196
 * *startOffset, and update that to the next place to search.  This is just
1197
 * an optimization for repeated insertions.
1198
 *
1199
 * If errorOK is false, we throw error when there's not enough room,
1200
 * rather than returning InvalidOffsetNumber.
1201
 */
1202
OffsetNumber
1203
SpGistPageAddNewItem(SpGistState *state, Page page, Item item, Size size,
1204
           OffsetNumber *startOffset, bool errorOK)
1205
0
{
1206
0
  SpGistPageOpaque opaque = SpGistPageGetOpaque(page);
1207
0
  OffsetNumber i,
1208
0
        maxoff,
1209
0
        offnum;
1210
1211
0
  if (opaque->nPlaceholder > 0 &&
1212
0
    PageGetExactFreeSpace(page) + SGDTSIZE >= MAXALIGN(size))
1213
0
  {
1214
    /* Try to replace a placeholder */
1215
0
    maxoff = PageGetMaxOffsetNumber(page);
1216
0
    offnum = InvalidOffsetNumber;
1217
1218
0
    for (;;)
1219
0
    {
1220
0
      if (startOffset && *startOffset != InvalidOffsetNumber)
1221
0
        i = *startOffset;
1222
0
      else
1223
0
        i = FirstOffsetNumber;
1224
0
      for (; i <= maxoff; i++)
1225
0
      {
1226
0
        SpGistDeadTuple it = (SpGistDeadTuple) PageGetItem(page,
1227
0
                                   PageGetItemId(page, i));
1228
1229
0
        if (it->tupstate == SPGIST_PLACEHOLDER)
1230
0
        {
1231
0
          offnum = i;
1232
0
          break;
1233
0
        }
1234
0
      }
1235
1236
      /* Done if we found a placeholder */
1237
0
      if (offnum != InvalidOffsetNumber)
1238
0
        break;
1239
1240
0
      if (startOffset && *startOffset != InvalidOffsetNumber)
1241
0
      {
1242
        /* Hint was no good, re-search from beginning */
1243
0
        *startOffset = InvalidOffsetNumber;
1244
0
        continue;
1245
0
      }
1246
1247
      /* Hmm, no placeholder found? */
1248
0
      opaque->nPlaceholder = 0;
1249
0
      break;
1250
0
    }
1251
1252
0
    if (offnum != InvalidOffsetNumber)
1253
0
    {
1254
      /* Replace the placeholder tuple */
1255
0
      PageIndexTupleDelete(page, offnum);
1256
1257
0
      offnum = PageAddItem(page, item, size, offnum, false, false);
1258
1259
      /*
1260
       * We should not have failed given the size check at the top of
1261
       * the function, but test anyway.  If we did fail, we must PANIC
1262
       * because we've already deleted the placeholder tuple, and
1263
       * there's no other way to keep the damage from getting to disk.
1264
       */
1265
0
      if (offnum != InvalidOffsetNumber)
1266
0
      {
1267
0
        Assert(opaque->nPlaceholder > 0);
1268
0
        opaque->nPlaceholder--;
1269
0
        if (startOffset)
1270
0
          *startOffset = offnum + 1;
1271
0
      }
1272
0
      else
1273
0
        elog(PANIC, "failed to add item of size %zu to SPGiST index page",
1274
0
           size);
1275
1276
0
      return offnum;
1277
0
    }
1278
0
  }
1279
1280
  /* No luck in replacing a placeholder, so just add it to the page */
1281
0
  offnum = PageAddItem(page, item, size,
1282
0
             InvalidOffsetNumber, false, false);
1283
1284
0
  if (offnum == InvalidOffsetNumber && !errorOK)
1285
0
    elog(ERROR, "failed to add item of size %zu to SPGiST index page",
1286
0
       size);
1287
1288
0
  return offnum;
1289
0
}
1290
1291
/*
1292
 *  spgproperty() -- Check boolean properties of indexes.
1293
 *
1294
 * This is optional for most AMs, but is required for SP-GiST because the core
1295
 * property code doesn't support AMPROP_DISTANCE_ORDERABLE.
1296
 */
1297
bool
1298
spgproperty(Oid index_oid, int attno,
1299
      IndexAMProperty prop, const char *propname,
1300
      bool *res, bool *isnull)
1301
0
{
1302
0
  Oid     opclass,
1303
0
        opfamily,
1304
0
        opcintype;
1305
0
  CatCList   *catlist;
1306
0
  int     i;
1307
1308
  /* Only answer column-level inquiries */
1309
0
  if (attno == 0)
1310
0
    return false;
1311
1312
0
  switch (prop)
1313
0
  {
1314
0
    case AMPROP_DISTANCE_ORDERABLE:
1315
0
      break;
1316
0
    default:
1317
0
      return false;
1318
0
  }
1319
1320
  /*
1321
   * Currently, SP-GiST distance-ordered scans require that there be a
1322
   * distance operator in the opclass with the default types. So we assume
1323
   * that if such an operator exists, then there's a reason for it.
1324
   */
1325
1326
  /* First we need to know the column's opclass. */
1327
0
  opclass = get_index_column_opclass(index_oid, attno);
1328
0
  if (!OidIsValid(opclass))
1329
0
  {
1330
0
    *isnull = true;
1331
0
    return true;
1332
0
  }
1333
1334
  /* Now look up the opclass family and input datatype. */
1335
0
  if (!get_opclass_opfamily_and_input_type(opclass, &opfamily, &opcintype))
1336
0
  {
1337
0
    *isnull = true;
1338
0
    return true;
1339
0
  }
1340
1341
  /* And now we can check whether the operator is provided. */
1342
0
  catlist = SearchSysCacheList1(AMOPSTRATEGY,
1343
0
                  ObjectIdGetDatum(opfamily));
1344
1345
0
  *res = false;
1346
1347
0
  for (i = 0; i < catlist->n_members; i++)
1348
0
  {
1349
0
    HeapTuple amoptup = &catlist->members[i]->tuple;
1350
0
    Form_pg_amop amopform = (Form_pg_amop) GETSTRUCT(amoptup);
1351
1352
0
    if (amopform->amoppurpose == AMOP_ORDER &&
1353
0
      (amopform->amoplefttype == opcintype ||
1354
0
       amopform->amoprighttype == opcintype) &&
1355
0
      opfamily_can_sort_type(amopform->amopsortfamily,
1356
0
                   get_op_rettype(amopform->amopopr)))
1357
0
    {
1358
0
      *res = true;
1359
0
      break;
1360
0
    }
1361
0
  }
1362
1363
0
  ReleaseSysCacheList(catlist);
1364
1365
0
  *isnull = false;
1366
1367
0
  return true;
1368
0
}