Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/access/hash/hashinsert.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * hashinsert.c
4
 *    Item insertion in hash tables for Postgres.
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/access/hash/hashinsert.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
16
#include "postgres.h"
17
18
#include "access/hash.h"
19
#include "access/hash_xlog.h"
20
#include "access/xloginsert.h"
21
#include "miscadmin.h"
22
#include "storage/predicate.h"
23
#include "utils/rel.h"
24
25
static void _hash_vacuum_one_page(Relation rel, Relation hrel,
26
                  Buffer metabuf, Buffer buf);
27
28
/*
29
 *  _hash_doinsert() -- Handle insertion of a single index tuple.
30
 *
31
 *    This routine is called by the public interface routines, hashbuild
32
 *    and hashinsert.  By here, itup is completely filled in.
33
 *
34
 * 'sorted' must only be passed as 'true' when inserts are done in hashkey
35
 * order.
36
 */
37
void
38
_hash_doinsert(Relation rel, IndexTuple itup, Relation heapRel, bool sorted)
39
0
{
40
0
  Buffer    buf = InvalidBuffer;
41
0
  Buffer    bucket_buf;
42
0
  Buffer    metabuf;
43
0
  HashMetaPage metap;
44
0
  HashMetaPage usedmetap = NULL;
45
0
  Page    metapage;
46
0
  Page    page;
47
0
  HashPageOpaque pageopaque;
48
0
  Size    itemsz;
49
0
  bool    do_expand;
50
0
  uint32    hashkey;
51
0
  Bucket    bucket;
52
0
  OffsetNumber itup_off;
53
54
  /*
55
   * Get the hash key for the item (it's stored in the index tuple itself).
56
   */
57
0
  hashkey = _hash_get_indextuple_hashkey(itup);
58
59
  /* compute item size too */
60
0
  itemsz = IndexTupleSize(itup);
61
0
  itemsz = MAXALIGN(itemsz); /* be safe, PageAddItem will do this but we
62
                 * need to be consistent */
63
64
0
restart_insert:
65
66
  /*
67
   * Read the metapage.  We don't lock it yet; HashMaxItemSize() will
68
   * examine pd_pagesize_version, but that can't change so we can examine it
69
   * without a lock.
70
   */
71
0
  metabuf = _hash_getbuf(rel, HASH_METAPAGE, HASH_NOLOCK, LH_META_PAGE);
72
0
  metapage = BufferGetPage(metabuf);
73
74
  /*
75
   * Check whether the item can fit on a hash page at all. (Eventually, we
76
   * ought to try to apply TOAST methods if not.)  Note that at this point,
77
   * itemsz doesn't include the ItemId.
78
   *
79
   * XXX this is useless code if we are only storing hash keys.
80
   */
81
0
  if (itemsz > HashMaxItemSize(metapage))
82
0
    ereport(ERROR,
83
0
        (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED),
84
0
         errmsg("index row size %zu exceeds hash maximum %zu",
85
0
            itemsz, HashMaxItemSize(metapage)),
86
0
         errhint("Values larger than a buffer page cannot be indexed.")));
87
88
  /* Lock the primary bucket page for the target bucket. */
89
0
  buf = _hash_getbucketbuf_from_hashkey(rel, hashkey, HASH_WRITE,
90
0
                      &usedmetap);
91
0
  Assert(usedmetap != NULL);
92
93
0
  CheckForSerializableConflictIn(rel, NULL, BufferGetBlockNumber(buf));
94
95
  /* remember the primary bucket buffer to release the pin on it at end. */
96
0
  bucket_buf = buf;
97
98
0
  page = BufferGetPage(buf);
99
0
  pageopaque = HashPageGetOpaque(page);
100
0
  bucket = pageopaque->hasho_bucket;
101
102
  /*
103
   * If this bucket is in the process of being split, try to finish the
104
   * split before inserting, because that might create room for the
105
   * insertion to proceed without allocating an additional overflow page.
106
   * It's only interesting to finish the split if we're trying to insert
107
   * into the bucket from which we're removing tuples (the "old" bucket),
108
   * not if we're trying to insert into the bucket into which tuples are
109
   * being moved (the "new" bucket).
110
   */
111
0
  if (H_BUCKET_BEING_SPLIT(pageopaque) && IsBufferCleanupOK(buf))
112
0
  {
113
    /* release the lock on bucket buffer, before completing the split. */
114
0
    LockBuffer(buf, BUFFER_LOCK_UNLOCK);
115
116
0
    _hash_finish_split(rel, metabuf, buf, bucket,
117
0
               usedmetap->hashm_maxbucket,
118
0
               usedmetap->hashm_highmask,
119
0
               usedmetap->hashm_lowmask);
120
121
    /* release the pin on old and meta buffer.  retry for insert. */
122
0
    _hash_dropbuf(rel, buf);
123
0
    _hash_dropbuf(rel, metabuf);
124
0
    goto restart_insert;
125
0
  }
126
127
  /* Do the insertion */
128
0
  while (PageGetFreeSpace(page) < itemsz)
129
0
  {
130
0
    BlockNumber nextblkno;
131
132
    /*
133
     * Check if current page has any DEAD tuples. If yes, delete these
134
     * tuples and see if we can get a space for the new item to be
135
     * inserted before moving to the next page in the bucket chain.
136
     */
137
0
    if (H_HAS_DEAD_TUPLES(pageopaque))
138
0
    {
139
140
0
      if (IsBufferCleanupOK(buf))
141
0
      {
142
0
        _hash_vacuum_one_page(rel, heapRel, metabuf, buf);
143
144
0
        if (PageGetFreeSpace(page) >= itemsz)
145
0
          break;   /* OK, now we have enough space */
146
0
      }
147
0
    }
148
149
    /*
150
     * no space on this page; check for an overflow page
151
     */
152
0
    nextblkno = pageopaque->hasho_nextblkno;
153
154
0
    if (BlockNumberIsValid(nextblkno))
155
0
    {
156
      /*
157
       * ovfl page exists; go get it.  if it doesn't have room, we'll
158
       * find out next pass through the loop test above.  we always
159
       * release both the lock and pin if this is an overflow page, but
160
       * only the lock if this is the primary bucket page, since the pin
161
       * on the primary bucket must be retained throughout the scan.
162
       */
163
0
      if (buf != bucket_buf)
164
0
        _hash_relbuf(rel, buf);
165
0
      else
166
0
        LockBuffer(buf, BUFFER_LOCK_UNLOCK);
167
0
      buf = _hash_getbuf(rel, nextblkno, HASH_WRITE, LH_OVERFLOW_PAGE);
168
0
      page = BufferGetPage(buf);
169
0
    }
170
0
    else
171
0
    {
172
      /*
173
       * we're at the end of the bucket chain and we haven't found a
174
       * page with enough room.  allocate a new overflow page.
175
       */
176
177
      /* release our write lock without modifying buffer */
178
0
      LockBuffer(buf, BUFFER_LOCK_UNLOCK);
179
180
      /* chain to a new overflow page */
181
0
      buf = _hash_addovflpage(rel, metabuf, buf, (buf == bucket_buf));
182
0
      page = BufferGetPage(buf);
183
184
      /* should fit now, given test above */
185
0
      Assert(PageGetFreeSpace(page) >= itemsz);
186
0
    }
187
0
    pageopaque = HashPageGetOpaque(page);
188
0
    Assert((pageopaque->hasho_flag & LH_PAGE_TYPE) == LH_OVERFLOW_PAGE);
189
0
    Assert(pageopaque->hasho_bucket == bucket);
190
0
  }
191
192
  /*
193
   * Write-lock the metapage so we can increment the tuple count. After
194
   * incrementing it, check to see if it's time for a split.
195
   */
196
0
  LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
197
198
  /* Do the update.  No ereport(ERROR) until changes are logged */
199
0
  START_CRIT_SECTION();
200
201
  /* found page with enough space, so add the item here */
202
0
  itup_off = _hash_pgaddtup(rel, buf, itemsz, itup, sorted);
203
0
  MarkBufferDirty(buf);
204
205
  /* metapage operations */
206
0
  metap = HashPageGetMeta(metapage);
207
0
  metap->hashm_ntuples += 1;
208
209
  /* Make sure this stays in sync with _hash_expandtable() */
210
0
  do_expand = metap->hashm_ntuples >
211
0
    (double) metap->hashm_ffactor * (metap->hashm_maxbucket + 1);
212
213
0
  MarkBufferDirty(metabuf);
214
215
  /* XLOG stuff */
216
0
  if (RelationNeedsWAL(rel))
217
0
  {
218
0
    xl_hash_insert xlrec;
219
0
    XLogRecPtr  recptr;
220
221
0
    xlrec.offnum = itup_off;
222
223
0
    XLogBeginInsert();
224
0
    XLogRegisterData(&xlrec, SizeOfHashInsert);
225
226
0
    XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
227
228
0
    XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
229
0
    XLogRegisterBufData(0, itup, IndexTupleSize(itup));
230
231
0
    recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_INSERT);
232
233
0
    PageSetLSN(BufferGetPage(buf), recptr);
234
0
    PageSetLSN(BufferGetPage(metabuf), recptr);
235
0
  }
236
237
0
  END_CRIT_SECTION();
238
239
  /* drop lock on metapage, but keep pin */
240
0
  LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
241
242
  /*
243
   * Release the modified page and ensure to release the pin on primary
244
   * page.
245
   */
246
0
  _hash_relbuf(rel, buf);
247
0
  if (buf != bucket_buf)
248
0
    _hash_dropbuf(rel, bucket_buf);
249
250
  /* Attempt to split if a split is needed */
251
0
  if (do_expand)
252
0
    _hash_expandtable(rel, metabuf);
253
254
  /* Finally drop our pin on the metapage */
255
0
  _hash_dropbuf(rel, metabuf);
256
0
}
257
258
/*
259
 *  _hash_pgaddtup() -- add a tuple to a particular page in the index.
260
 *
261
 * This routine adds the tuple to the page as requested; it does not write out
262
 * the page.  It is an error to call this function without pin and write lock
263
 * on the target buffer.
264
 *
265
 * Returns the offset number at which the tuple was inserted.  This function
266
 * is responsible for preserving the condition that tuples in a hash index
267
 * page are sorted by hashkey value, however, if the caller is certain that
268
 * the hashkey for the tuple being added is >= the hashkeys of all existing
269
 * tuples on the page, then the 'appendtup' flag may be passed as true.  This
270
 * saves from having to binary search for the correct location to insert the
271
 * tuple.
272
 */
273
OffsetNumber
274
_hash_pgaddtup(Relation rel, Buffer buf, Size itemsize, IndexTuple itup,
275
         bool appendtup)
276
0
{
277
0
  OffsetNumber itup_off;
278
0
  Page    page;
279
280
0
  _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
281
0
  page = BufferGetPage(buf);
282
283
  /*
284
   * Find where to insert the tuple (preserving page's hashkey ordering). If
285
   * 'appendtup' is true then we just insert it at the end.
286
   */
287
0
  if (appendtup)
288
0
  {
289
0
    itup_off = PageGetMaxOffsetNumber(page) + 1;
290
291
#ifdef USE_ASSERT_CHECKING
292
    /* ensure this tuple's hashkey is >= the final existing tuple */
293
    if (PageGetMaxOffsetNumber(page) > 0)
294
    {
295
      IndexTuple  lasttup;
296
      ItemId    itemid;
297
298
      itemid = PageGetItemId(page, PageGetMaxOffsetNumber(page));
299
      lasttup = (IndexTuple) PageGetItem(page, itemid);
300
301
      Assert(_hash_get_indextuple_hashkey(lasttup) <=
302
           _hash_get_indextuple_hashkey(itup));
303
    }
304
#endif
305
0
  }
306
0
  else
307
0
  {
308
0
    uint32    hashkey = _hash_get_indextuple_hashkey(itup);
309
310
0
    itup_off = _hash_binsearch(page, hashkey);
311
0
  }
312
313
0
  if (PageAddItem(page, (Item) itup, itemsize, itup_off, false, false)
314
0
    == InvalidOffsetNumber)
315
0
    elog(ERROR, "failed to add index item to \"%s\"",
316
0
       RelationGetRelationName(rel));
317
318
0
  return itup_off;
319
0
}
320
321
/*
322
 *  _hash_pgaddmultitup() -- add a tuple vector to a particular page in the
323
 *               index.
324
 *
325
 * This routine has same requirements for locking and tuple ordering as
326
 * _hash_pgaddtup().
327
 *
328
 * Returns the offset number array at which the tuples were inserted.
329
 */
330
void
331
_hash_pgaddmultitup(Relation rel, Buffer buf, IndexTuple *itups,
332
          OffsetNumber *itup_offsets, uint16 nitups)
333
0
{
334
0
  OffsetNumber itup_off;
335
0
  Page    page;
336
0
  uint32    hashkey;
337
0
  int     i;
338
339
0
  _hash_checkpage(rel, buf, LH_BUCKET_PAGE | LH_OVERFLOW_PAGE);
340
0
  page = BufferGetPage(buf);
341
342
0
  for (i = 0; i < nitups; i++)
343
0
  {
344
0
    Size    itemsize;
345
346
0
    itemsize = IndexTupleSize(itups[i]);
347
0
    itemsize = MAXALIGN(itemsize);
348
349
    /* Find where to insert the tuple (preserving page's hashkey ordering) */
350
0
    hashkey = _hash_get_indextuple_hashkey(itups[i]);
351
0
    itup_off = _hash_binsearch(page, hashkey);
352
353
0
    itup_offsets[i] = itup_off;
354
355
0
    if (PageAddItem(page, (Item) itups[i], itemsize, itup_off, false, false)
356
0
      == InvalidOffsetNumber)
357
0
      elog(ERROR, "failed to add index item to \"%s\"",
358
0
         RelationGetRelationName(rel));
359
0
  }
360
0
}
361
362
/*
363
 * _hash_vacuum_one_page - vacuum just one index page.
364
 *
365
 * Try to remove LP_DEAD items from the given page. We must acquire cleanup
366
 * lock on the page being modified before calling this function.
367
 */
368
369
static void
370
_hash_vacuum_one_page(Relation rel, Relation hrel, Buffer metabuf, Buffer buf)
371
0
{
372
0
  OffsetNumber deletable[MaxOffsetNumber];
373
0
  int     ndeletable = 0;
374
0
  OffsetNumber offnum,
375
0
        maxoff;
376
0
  Page    page = BufferGetPage(buf);
377
0
  HashPageOpaque pageopaque;
378
0
  HashMetaPage metap;
379
380
  /* Scan each tuple in page to see if it is marked as LP_DEAD */
381
0
  maxoff = PageGetMaxOffsetNumber(page);
382
0
  for (offnum = FirstOffsetNumber;
383
0
     offnum <= maxoff;
384
0
     offnum = OffsetNumberNext(offnum))
385
0
  {
386
0
    ItemId    itemId = PageGetItemId(page, offnum);
387
388
0
    if (ItemIdIsDead(itemId))
389
0
      deletable[ndeletable++] = offnum;
390
0
  }
391
392
0
  if (ndeletable > 0)
393
0
  {
394
0
    TransactionId snapshotConflictHorizon;
395
396
0
    snapshotConflictHorizon =
397
0
      index_compute_xid_horizon_for_tuples(rel, hrel, buf,
398
0
                         deletable, ndeletable);
399
400
    /*
401
     * Write-lock the meta page so that we can decrement tuple count.
402
     */
403
0
    LockBuffer(metabuf, BUFFER_LOCK_EXCLUSIVE);
404
405
    /* No ereport(ERROR) until changes are logged */
406
0
    START_CRIT_SECTION();
407
408
0
    PageIndexMultiDelete(page, deletable, ndeletable);
409
410
    /*
411
     * Mark the page as not containing any LP_DEAD items. This is not
412
     * certainly true (there might be some that have recently been marked,
413
     * but weren't included in our target-item list), but it will almost
414
     * always be true and it doesn't seem worth an additional page scan to
415
     * check it. Remember that LH_PAGE_HAS_DEAD_TUPLES is only a hint
416
     * anyway.
417
     */
418
0
    pageopaque = HashPageGetOpaque(page);
419
0
    pageopaque->hasho_flag &= ~LH_PAGE_HAS_DEAD_TUPLES;
420
421
0
    metap = HashPageGetMeta(BufferGetPage(metabuf));
422
0
    metap->hashm_ntuples -= ndeletable;
423
424
0
    MarkBufferDirty(buf);
425
0
    MarkBufferDirty(metabuf);
426
427
    /* XLOG stuff */
428
0
    if (RelationNeedsWAL(rel))
429
0
    {
430
0
      xl_hash_vacuum_one_page xlrec;
431
0
      XLogRecPtr  recptr;
432
433
0
      xlrec.isCatalogRel = RelationIsAccessibleInLogicalDecoding(hrel);
434
0
      xlrec.snapshotConflictHorizon = snapshotConflictHorizon;
435
0
      xlrec.ntuples = ndeletable;
436
437
0
      XLogBeginInsert();
438
0
      XLogRegisterBuffer(0, buf, REGBUF_STANDARD);
439
0
      XLogRegisterData(&xlrec, SizeOfHashVacuumOnePage);
440
441
      /*
442
       * We need the target-offsets array whether or not we store the
443
       * whole buffer, to allow us to find the snapshotConflictHorizon
444
       * on a standby server.
445
       */
446
0
      XLogRegisterData(deletable,
447
0
               ndeletable * sizeof(OffsetNumber));
448
449
0
      XLogRegisterBuffer(1, metabuf, REGBUF_STANDARD);
450
451
0
      recptr = XLogInsert(RM_HASH_ID, XLOG_HASH_VACUUM_ONE_PAGE);
452
453
0
      PageSetLSN(BufferGetPage(buf), recptr);
454
0
      PageSetLSN(BufferGetPage(metabuf), recptr);
455
0
    }
456
457
0
    END_CRIT_SECTION();
458
459
    /*
460
     * Releasing write lock on meta page as we have updated the tuple
461
     * count.
462
     */
463
0
    LockBuffer(metabuf, BUFFER_LOCK_UNLOCK);
464
0
  }
465
0
}