Coverage Report

Created: 2025-06-13 06:06

/src/postgres/src/backend/access/transam/slru.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * slru.c
4
 *    Simple LRU buffering for wrap-around-able permanent metadata
5
 *
6
 * This module is used to maintain various pieces of transaction status
7
 * indexed by TransactionId (such as commit status, parent transaction ID,
8
 * commit timestamp), as well as storage for multixacts, serializable
9
 * isolation locks and NOTIFY traffic.  Extensions can define their own
10
 * SLRUs, too.
11
 *
12
 * Under ordinary circumstances we expect that write traffic will occur
13
 * mostly to the latest page (and to the just-prior page, soon after a
14
 * page transition).  Read traffic will probably touch a larger span of
15
 * pages, but a relatively small number of buffers should be sufficient.
16
 *
17
 * We use a simple least-recently-used scheme to manage a pool of shared
18
 * page buffers, split in banks by the lowest bits of the page number, and
19
 * the management algorithm only processes the bank to which the desired
20
 * page belongs, so a linear search is sufficient; there's no need for a
21
 * hashtable or anything fancy.  The algorithm is straight LRU except that
22
 * we will never swap out the latest page (since we know it's going to be
23
 * hit again eventually).
24
 *
25
 * We use per-bank control LWLocks to protect the shared data structures,
26
 * plus per-buffer LWLocks that synchronize I/O for each buffer.  The
27
 * bank's control lock must be held to examine or modify any of the bank's
28
 * shared state.  A process that is reading in or writing out a page
29
 * buffer does not hold the control lock, only the per-buffer lock for the
30
 * buffer it is working on.  One exception is latest_page_number, which is
31
 * read and written using atomic ops.
32
 *
33
 * "Holding the bank control lock" means exclusive lock in all cases
34
 * except for SimpleLruReadPage_ReadOnly(); see comments for
35
 * SlruRecentlyUsed() for the implications of that.
36
 *
37
 * When initiating I/O on a buffer, we acquire the per-buffer lock exclusively
38
 * before releasing the control lock.  The per-buffer lock is released after
39
 * completing the I/O, re-acquiring the control lock, and updating the shared
40
 * state.  (Deadlock is not possible here, because we never try to initiate
41
 * I/O when someone else is already doing I/O on the same buffer.)
42
 * To wait for I/O to complete, release the control lock, acquire the
43
 * per-buffer lock in shared mode, immediately release the per-buffer lock,
44
 * reacquire the control lock, and then recheck state (since arbitrary things
45
 * could have happened while we didn't have the lock).
46
 *
47
 * As with the regular buffer manager, it is possible for another process
48
 * to re-dirty a page that is currently being written out.  This is handled
49
 * by re-setting the page's page_dirty flag.
50
 *
51
 *
52
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
53
 * Portions Copyright (c) 1994, Regents of the University of California
54
 *
55
 * src/backend/access/transam/slru.c
56
 *
57
 *-------------------------------------------------------------------------
58
 */
59
#include "postgres.h"
60
61
#include <fcntl.h>
62
#include <sys/stat.h>
63
#include <unistd.h>
64
65
#include "access/slru.h"
66
#include "access/transam.h"
67
#include "access/xlog.h"
68
#include "access/xlogutils.h"
69
#include "miscadmin.h"
70
#include "pgstat.h"
71
#include "storage/fd.h"
72
#include "storage/shmem.h"
73
#include "utils/guc.h"
74
75
/*
76
 * Converts segment number to the filename of the segment.
77
 *
78
 * "path" should point to a buffer at least MAXPGPATH characters long.
79
 *
80
 * If ctl->long_segment_names is true, segno can be in the range [0, 2^60-1].
81
 * The resulting file name is made of 15 characters, e.g. dir/123456789ABCDEF.
82
 *
83
 * If ctl->long_segment_names is false, segno can be in the range [0, 2^24-1].
84
 * The resulting file name is made of 4 to 6 characters, as of:
85
 *
86
 *  dir/1234   for [0, 2^16-1]
87
 *  dir/12345  for [2^16, 2^20-1]
88
 *  dir/123456 for [2^20, 2^24-1]
89
 */
90
static inline int
91
SlruFileName(SlruCtl ctl, char *path, int64 segno)
92
0
{
93
0
  if (ctl->long_segment_names)
94
0
  {
95
    /*
96
     * We could use 16 characters here but the disadvantage would be that
97
     * the SLRU segments will be hard to distinguish from WAL segments.
98
     *
99
     * For this reason we use 15 characters. It is enough but also means
100
     * that in the future we can't decrease SLRU_PAGES_PER_SEGMENT easily.
101
     */
102
0
    Assert(segno >= 0 && segno <= INT64CONST(0xFFFFFFFFFFFFFFF));
103
0
    return snprintf(path, MAXPGPATH, "%s/%015" PRIX64, ctl->Dir, segno);
104
0
  }
105
0
  else
106
0
  {
107
    /*
108
     * Despite the fact that %04X format string is used up to 24 bit
109
     * integers are allowed. See SlruCorrectSegmentFilenameLength()
110
     */
111
0
    Assert(segno >= 0 && segno <= INT64CONST(0xFFFFFF));
112
0
    return snprintf(path, MAXPGPATH, "%s/%04X", (ctl)->Dir,
113
0
            (unsigned int) segno);
114
0
  }
115
0
}
116
117
/*
118
 * During SimpleLruWriteAll(), we will usually not need to write more than one
119
 * or two physical files, but we may need to write several pages per file.  We
120
 * can consolidate the I/O requests by leaving files open until control returns
121
 * to SimpleLruWriteAll().  This data structure remembers which files are open.
122
 */
123
0
#define MAX_WRITEALL_BUFFERS  16
124
125
typedef struct SlruWriteAllData
126
{
127
  int     num_files;    /* # files actually open */
128
  int     fd[MAX_WRITEALL_BUFFERS]; /* their FD's */
129
  int64   segno[MAX_WRITEALL_BUFFERS];  /* their log seg#s */
130
} SlruWriteAllData;
131
132
typedef struct SlruWriteAllData *SlruWriteAll;
133
134
135
/*
136
 * Bank size for the slot array.  Pages are assigned a bank according to their
137
 * page number, with each bank being this size.  We want a power of 2 so that
138
 * we can determine the bank number for a page with just bit shifting; we also
139
 * want to keep the bank size small so that LRU victim search is fast.  16
140
 * buffers per bank seems a good number.
141
 */
142
0
#define SLRU_BANK_BITSHIFT    4
143
0
#define SLRU_BANK_SIZE      (1 << SLRU_BANK_BITSHIFT)
144
145
/*
146
 * Macro to get the bank number to which the slot belongs.
147
 */
148
0
#define SlotGetBankNumber(slotno) ((slotno) >> SLRU_BANK_BITSHIFT)
149
150
151
/*
152
 * Populate a file tag describing a segment file.  We only use the segment
153
 * number, since we can derive everything else we need by having separate
154
 * sync handler functions for clog, multixact etc.
155
 */
156
0
#define INIT_SLRUFILETAG(a,xx_handler,xx_segno) \
157
0
( \
158
0
  memset(&(a), 0, sizeof(FileTag)), \
159
0
  (a).handler = (xx_handler), \
160
0
  (a).segno = (xx_segno) \
161
0
)
162
163
/* Saved info for SlruReportIOError */
164
typedef enum
165
{
166
  SLRU_OPEN_FAILED,
167
  SLRU_SEEK_FAILED,
168
  SLRU_READ_FAILED,
169
  SLRU_WRITE_FAILED,
170
  SLRU_FSYNC_FAILED,
171
  SLRU_CLOSE_FAILED,
172
} SlruErrorCause;
173
174
static SlruErrorCause slru_errcause;
175
static int  slru_errno;
176
177
178
static void SimpleLruZeroLSNs(SlruCtl ctl, int slotno);
179
static void SimpleLruWaitIO(SlruCtl ctl, int slotno);
180
static void SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata);
181
static bool SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno);
182
static bool SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno,
183
                  SlruWriteAll fdata);
184
static void SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid);
185
static int  SlruSelectLRUPage(SlruCtl ctl, int64 pageno);
186
187
static bool SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename,
188
                    int64 segpage, void *data);
189
static void SlruInternalDeleteSegment(SlruCtl ctl, int64 segno);
190
static inline void SlruRecentlyUsed(SlruShared shared, int slotno);
191
192
193
/*
194
 * Initialization of shared memory
195
 */
196
197
Size
198
SimpleLruShmemSize(int nslots, int nlsns)
199
0
{
200
0
  int     nbanks = nslots / SLRU_BANK_SIZE;
201
0
  Size    sz;
202
203
0
  Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
204
0
  Assert(nslots % SLRU_BANK_SIZE == 0);
205
206
  /* we assume nslots isn't so large as to risk overflow */
207
0
  sz = MAXALIGN(sizeof(SlruSharedData));
208
0
  sz += MAXALIGN(nslots * sizeof(char *));  /* page_buffer[] */
209
0
  sz += MAXALIGN(nslots * sizeof(SlruPageStatus));  /* page_status[] */
210
0
  sz += MAXALIGN(nslots * sizeof(bool));  /* page_dirty[] */
211
0
  sz += MAXALIGN(nslots * sizeof(int64)); /* page_number[] */
212
0
  sz += MAXALIGN(nslots * sizeof(int)); /* page_lru_count[] */
213
0
  sz += MAXALIGN(nslots * sizeof(LWLockPadded));  /* buffer_locks[] */
214
0
  sz += MAXALIGN(nbanks * sizeof(LWLockPadded));  /* bank_locks[] */
215
0
  sz += MAXALIGN(nbanks * sizeof(int)); /* bank_cur_lru_count[] */
216
217
0
  if (nlsns > 0)
218
0
    sz += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));  /* group_lsn[] */
219
220
0
  return BUFFERALIGN(sz) + BLCKSZ * nslots;
221
0
}
222
223
/*
224
 * Determine a number of SLRU buffers to use.
225
 *
226
 * We simply divide shared_buffers by the divisor given and cap
227
 * that at the maximum given; but always at least SLRU_BANK_SIZE.
228
 * Round down to the nearest multiple of SLRU_BANK_SIZE.
229
 */
230
int
231
SimpleLruAutotuneBuffers(int divisor, int max)
232
0
{
233
0
  return Min(max - (max % SLRU_BANK_SIZE),
234
0
         Max(SLRU_BANK_SIZE,
235
0
           NBuffers / divisor - (NBuffers / divisor) % SLRU_BANK_SIZE));
236
0
}
237
238
/*
239
 * Initialize, or attach to, a simple LRU cache in shared memory.
240
 *
241
 * ctl: address of local (unshared) control structure.
242
 * name: name of SLRU.  (This is user-visible, pick with care!)
243
 * nslots: number of page slots to use.
244
 * nlsns: number of LSN groups per page (set to zero if not relevant).
245
 * subdir: PGDATA-relative subdirectory that will contain the files.
246
 * buffer_tranche_id: tranche ID to use for the SLRU's per-buffer LWLocks.
247
 * bank_tranche_id: tranche ID to use for the bank LWLocks.
248
 * sync_handler: which set of functions to use to handle sync requests
249
 */
250
void
251
SimpleLruInit(SlruCtl ctl, const char *name, int nslots, int nlsns,
252
        const char *subdir, int buffer_tranche_id, int bank_tranche_id,
253
        SyncRequestHandler sync_handler, bool long_segment_names)
254
0
{
255
0
  SlruShared  shared;
256
0
  bool    found;
257
0
  int     nbanks = nslots / SLRU_BANK_SIZE;
258
259
0
  Assert(nslots <= SLRU_MAX_ALLOWED_BUFFERS);
260
261
0
  shared = (SlruShared) ShmemInitStruct(name,
262
0
                      SimpleLruShmemSize(nslots, nlsns),
263
0
                      &found);
264
265
0
  if (!IsUnderPostmaster)
266
0
  {
267
    /* Initialize locks and shared memory area */
268
0
    char     *ptr;
269
0
    Size    offset;
270
271
0
    Assert(!found);
272
273
0
    memset(shared, 0, sizeof(SlruSharedData));
274
275
0
    shared->num_slots = nslots;
276
0
    shared->lsn_groups_per_page = nlsns;
277
278
0
    pg_atomic_init_u64(&shared->latest_page_number, 0);
279
280
0
    shared->slru_stats_idx = pgstat_get_slru_index(name);
281
282
0
    ptr = (char *) shared;
283
0
    offset = MAXALIGN(sizeof(SlruSharedData));
284
0
    shared->page_buffer = (char **) (ptr + offset);
285
0
    offset += MAXALIGN(nslots * sizeof(char *));
286
0
    shared->page_status = (SlruPageStatus *) (ptr + offset);
287
0
    offset += MAXALIGN(nslots * sizeof(SlruPageStatus));
288
0
    shared->page_dirty = (bool *) (ptr + offset);
289
0
    offset += MAXALIGN(nslots * sizeof(bool));
290
0
    shared->page_number = (int64 *) (ptr + offset);
291
0
    offset += MAXALIGN(nslots * sizeof(int64));
292
0
    shared->page_lru_count = (int *) (ptr + offset);
293
0
    offset += MAXALIGN(nslots * sizeof(int));
294
295
    /* Initialize LWLocks */
296
0
    shared->buffer_locks = (LWLockPadded *) (ptr + offset);
297
0
    offset += MAXALIGN(nslots * sizeof(LWLockPadded));
298
0
    shared->bank_locks = (LWLockPadded *) (ptr + offset);
299
0
    offset += MAXALIGN(nbanks * sizeof(LWLockPadded));
300
0
    shared->bank_cur_lru_count = (int *) (ptr + offset);
301
0
    offset += MAXALIGN(nbanks * sizeof(int));
302
303
0
    if (nlsns > 0)
304
0
    {
305
0
      shared->group_lsn = (XLogRecPtr *) (ptr + offset);
306
0
      offset += MAXALIGN(nslots * nlsns * sizeof(XLogRecPtr));
307
0
    }
308
309
0
    ptr += BUFFERALIGN(offset);
310
0
    for (int slotno = 0; slotno < nslots; slotno++)
311
0
    {
312
0
      LWLockInitialize(&shared->buffer_locks[slotno].lock,
313
0
               buffer_tranche_id);
314
315
0
      shared->page_buffer[slotno] = ptr;
316
0
      shared->page_status[slotno] = SLRU_PAGE_EMPTY;
317
0
      shared->page_dirty[slotno] = false;
318
0
      shared->page_lru_count[slotno] = 0;
319
0
      ptr += BLCKSZ;
320
0
    }
321
322
    /* Initialize the slot banks. */
323
0
    for (int bankno = 0; bankno < nbanks; bankno++)
324
0
    {
325
0
      LWLockInitialize(&shared->bank_locks[bankno].lock, bank_tranche_id);
326
0
      shared->bank_cur_lru_count[bankno] = 0;
327
0
    }
328
329
    /* Should fit to estimated shmem size */
330
0
    Assert(ptr - (char *) shared <= SimpleLruShmemSize(nslots, nlsns));
331
0
  }
332
0
  else
333
0
  {
334
0
    Assert(found);
335
0
    Assert(shared->num_slots == nslots);
336
0
  }
337
338
  /*
339
   * Initialize the unshared control struct, including directory path. We
340
   * assume caller set PagePrecedes.
341
   */
342
0
  ctl->shared = shared;
343
0
  ctl->sync_handler = sync_handler;
344
0
  ctl->long_segment_names = long_segment_names;
345
0
  ctl->nbanks = nbanks;
346
0
  strlcpy(ctl->Dir, subdir, sizeof(ctl->Dir));
347
0
}
348
349
/*
350
 * Helper function for GUC check_hook to check whether slru buffers are in
351
 * multiples of SLRU_BANK_SIZE.
352
 */
353
bool
354
check_slru_buffers(const char *name, int *newval)
355
0
{
356
  /* Valid values are multiples of SLRU_BANK_SIZE */
357
0
  if (*newval % SLRU_BANK_SIZE == 0)
358
0
    return true;
359
360
0
  GUC_check_errdetail("\"%s\" must be a multiple of %d.", name,
361
0
            SLRU_BANK_SIZE);
362
0
  return false;
363
0
}
364
365
/*
366
 * Initialize (or reinitialize) a page to zeroes.
367
 *
368
 * The page is not actually written, just set up in shared memory.
369
 * The slot number of the new page is returned.
370
 *
371
 * Bank lock must be held at entry, and will be held at exit.
372
 */
373
int
374
SimpleLruZeroPage(SlruCtl ctl, int64 pageno)
375
0
{
376
0
  SlruShared  shared = ctl->shared;
377
0
  int     slotno;
378
379
0
  Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
380
381
  /* Find a suitable buffer slot for the page */
382
0
  slotno = SlruSelectLRUPage(ctl, pageno);
383
0
  Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
384
0
       (shared->page_status[slotno] == SLRU_PAGE_VALID &&
385
0
      !shared->page_dirty[slotno]) ||
386
0
       shared->page_number[slotno] == pageno);
387
388
  /* Mark the slot as containing this page */
389
0
  shared->page_number[slotno] = pageno;
390
0
  shared->page_status[slotno] = SLRU_PAGE_VALID;
391
0
  shared->page_dirty[slotno] = true;
392
0
  SlruRecentlyUsed(shared, slotno);
393
394
  /* Set the buffer to zeroes */
395
0
  MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
396
397
  /* Set the LSNs for this new page to zero */
398
0
  SimpleLruZeroLSNs(ctl, slotno);
399
400
  /*
401
   * Assume this page is now the latest active page.
402
   *
403
   * Note that because both this routine and SlruSelectLRUPage run with
404
   * ControlLock held, it is not possible for this to be zeroing a page that
405
   * SlruSelectLRUPage is going to evict simultaneously.  Therefore, there's
406
   * no memory barrier here.
407
   */
408
0
  pg_atomic_write_u64(&shared->latest_page_number, pageno);
409
410
  /* update the stats counter of zeroed pages */
411
0
  pgstat_count_slru_page_zeroed(shared->slru_stats_idx);
412
413
0
  return slotno;
414
0
}
415
416
/*
417
 * Zero all the LSNs we store for this slru page.
418
 *
419
 * This should be called each time we create a new page, and each time we read
420
 * in a page from disk into an existing buffer.  (Such an old page cannot
421
 * have any interesting LSNs, since we'd have flushed them before writing
422
 * the page in the first place.)
423
 *
424
 * This assumes that InvalidXLogRecPtr is bitwise-all-0.
425
 */
426
static void
427
SimpleLruZeroLSNs(SlruCtl ctl, int slotno)
428
0
{
429
0
  SlruShared  shared = ctl->shared;
430
431
0
  if (shared->lsn_groups_per_page > 0)
432
0
    MemSet(&shared->group_lsn[slotno * shared->lsn_groups_per_page], 0,
433
0
         shared->lsn_groups_per_page * sizeof(XLogRecPtr));
434
0
}
435
436
/*
437
 * Wait for any active I/O on a page slot to finish.  (This does not
438
 * guarantee that new I/O hasn't been started before we return, though.
439
 * In fact the slot might not even contain the same page anymore.)
440
 *
441
 * Bank lock must be held at entry, and will be held at exit.
442
 */
443
static void
444
SimpleLruWaitIO(SlruCtl ctl, int slotno)
445
0
{
446
0
  SlruShared  shared = ctl->shared;
447
0
  int     bankno = SlotGetBankNumber(slotno);
448
449
0
  Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
450
451
  /* See notes at top of file */
452
0
  LWLockRelease(&shared->bank_locks[bankno].lock);
453
0
  LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED);
454
0
  LWLockRelease(&shared->buffer_locks[slotno].lock);
455
0
  LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
456
457
  /*
458
   * If the slot is still in an io-in-progress state, then either someone
459
   * already started a new I/O on the slot, or a previous I/O failed and
460
   * neglected to reset the page state.  That shouldn't happen, really, but
461
   * it seems worth a few extra cycles to check and recover from it. We can
462
   * cheaply test for failure by seeing if the buffer lock is still held (we
463
   * assume that transaction abort would release the lock).
464
   */
465
0
  if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
466
0
    shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS)
467
0
  {
468
0
    if (LWLockConditionalAcquire(&shared->buffer_locks[slotno].lock, LW_SHARED))
469
0
    {
470
      /* indeed, the I/O must have failed */
471
0
      if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS)
472
0
        shared->page_status[slotno] = SLRU_PAGE_EMPTY;
473
0
      else        /* write_in_progress */
474
0
      {
475
0
        shared->page_status[slotno] = SLRU_PAGE_VALID;
476
0
        shared->page_dirty[slotno] = true;
477
0
      }
478
0
      LWLockRelease(&shared->buffer_locks[slotno].lock);
479
0
    }
480
0
  }
481
0
}
482
483
/*
484
 * Find a page in a shared buffer, reading it in if necessary.
485
 * The page number must correspond to an already-initialized page.
486
 *
487
 * If write_ok is true then it is OK to return a page that is in
488
 * WRITE_IN_PROGRESS state; it is the caller's responsibility to be sure
489
 * that modification of the page is safe.  If write_ok is false then we
490
 * will not return the page until it is not undergoing active I/O.
491
 *
492
 * The passed-in xid is used only for error reporting, and may be
493
 * InvalidTransactionId if no specific xid is associated with the action.
494
 *
495
 * Return value is the shared-buffer slot number now holding the page.
496
 * The buffer's LRU access info is updated.
497
 *
498
 * The correct bank lock must be held at entry, and will be held at exit.
499
 */
500
int
501
SimpleLruReadPage(SlruCtl ctl, int64 pageno, bool write_ok,
502
          TransactionId xid)
503
0
{
504
0
  SlruShared  shared = ctl->shared;
505
0
  LWLock     *banklock = SimpleLruGetBankLock(ctl, pageno);
506
507
0
  Assert(LWLockHeldByMeInMode(banklock, LW_EXCLUSIVE));
508
509
  /* Outer loop handles restart if we must wait for someone else's I/O */
510
0
  for (;;)
511
0
  {
512
0
    int     slotno;
513
0
    bool    ok;
514
515
    /* See if page already is in memory; if not, pick victim slot */
516
0
    slotno = SlruSelectLRUPage(ctl, pageno);
517
518
    /* Did we find the page in memory? */
519
0
    if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
520
0
      shared->page_number[slotno] == pageno)
521
0
    {
522
      /*
523
       * If page is still being read in, we must wait for I/O.  Likewise
524
       * if the page is being written and the caller said that's not OK.
525
       */
526
0
      if (shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS ||
527
0
        (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
528
0
         !write_ok))
529
0
      {
530
0
        SimpleLruWaitIO(ctl, slotno);
531
        /* Now we must recheck state from the top */
532
0
        continue;
533
0
      }
534
      /* Otherwise, it's ready to use */
535
0
      SlruRecentlyUsed(shared, slotno);
536
537
      /* update the stats counter of pages found in the SLRU */
538
0
      pgstat_count_slru_page_hit(shared->slru_stats_idx);
539
540
0
      return slotno;
541
0
    }
542
543
    /* We found no match; assert we selected a freeable slot */
544
0
    Assert(shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
545
0
         (shared->page_status[slotno] == SLRU_PAGE_VALID &&
546
0
        !shared->page_dirty[slotno]));
547
548
    /* Mark the slot read-busy */
549
0
    shared->page_number[slotno] = pageno;
550
0
    shared->page_status[slotno] = SLRU_PAGE_READ_IN_PROGRESS;
551
0
    shared->page_dirty[slotno] = false;
552
553
    /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
554
0
    LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
555
556
    /* Release bank lock while doing I/O */
557
0
    LWLockRelease(banklock);
558
559
    /* Do the read */
560
0
    ok = SlruPhysicalReadPage(ctl, pageno, slotno);
561
562
    /* Set the LSNs for this newly read-in page to zero */
563
0
    SimpleLruZeroLSNs(ctl, slotno);
564
565
    /* Re-acquire bank control lock and update page state */
566
0
    LWLockAcquire(banklock, LW_EXCLUSIVE);
567
568
0
    Assert(shared->page_number[slotno] == pageno &&
569
0
         shared->page_status[slotno] == SLRU_PAGE_READ_IN_PROGRESS &&
570
0
         !shared->page_dirty[slotno]);
571
572
0
    shared->page_status[slotno] = ok ? SLRU_PAGE_VALID : SLRU_PAGE_EMPTY;
573
574
0
    LWLockRelease(&shared->buffer_locks[slotno].lock);
575
576
    /* Now it's okay to ereport if we failed */
577
0
    if (!ok)
578
0
      SlruReportIOError(ctl, pageno, xid);
579
580
0
    SlruRecentlyUsed(shared, slotno);
581
582
    /* update the stats counter of pages not found in SLRU */
583
0
    pgstat_count_slru_page_read(shared->slru_stats_idx);
584
585
0
    return slotno;
586
0
  }
587
0
}
588
589
/*
590
 * Find a page in a shared buffer, reading it in if necessary.
591
 * The page number must correspond to an already-initialized page.
592
 * The caller must intend only read-only access to the page.
593
 *
594
 * The passed-in xid is used only for error reporting, and may be
595
 * InvalidTransactionId if no specific xid is associated with the action.
596
 *
597
 * Return value is the shared-buffer slot number now holding the page.
598
 * The buffer's LRU access info is updated.
599
 *
600
 * Bank control lock must NOT be held at entry, but will be held at exit.
601
 * It is unspecified whether the lock will be shared or exclusive.
602
 */
603
int
604
SimpleLruReadPage_ReadOnly(SlruCtl ctl, int64 pageno, TransactionId xid)
605
0
{
606
0
  SlruShared  shared = ctl->shared;
607
0
  LWLock     *banklock = SimpleLruGetBankLock(ctl, pageno);
608
0
  int     bankno = pageno % ctl->nbanks;
609
0
  int     bankstart = bankno * SLRU_BANK_SIZE;
610
0
  int     bankend = bankstart + SLRU_BANK_SIZE;
611
612
  /* Try to find the page while holding only shared lock */
613
0
  LWLockAcquire(banklock, LW_SHARED);
614
615
  /* See if page is already in a buffer */
616
0
  for (int slotno = bankstart; slotno < bankend; slotno++)
617
0
  {
618
0
    if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
619
0
      shared->page_number[slotno] == pageno &&
620
0
      shared->page_status[slotno] != SLRU_PAGE_READ_IN_PROGRESS)
621
0
    {
622
      /* See comments for SlruRecentlyUsed macro */
623
0
      SlruRecentlyUsed(shared, slotno);
624
625
      /* update the stats counter of pages found in the SLRU */
626
0
      pgstat_count_slru_page_hit(shared->slru_stats_idx);
627
628
0
      return slotno;
629
0
    }
630
0
  }
631
632
  /* No luck, so switch to normal exclusive lock and do regular read */
633
0
  LWLockRelease(banklock);
634
0
  LWLockAcquire(banklock, LW_EXCLUSIVE);
635
636
0
  return SimpleLruReadPage(ctl, pageno, true, xid);
637
0
}
638
639
/*
640
 * Write a page from a shared buffer, if necessary.
641
 * Does nothing if the specified slot is not dirty.
642
 *
643
 * NOTE: only one write attempt is made here.  Hence, it is possible that
644
 * the page is still dirty at exit (if someone else re-dirtied it during
645
 * the write).  However, we *do* attempt a fresh write even if the page
646
 * is already being written; this is for checkpoints.
647
 *
648
 * Bank lock must be held at entry, and will be held at exit.
649
 */
650
static void
651
SlruInternalWritePage(SlruCtl ctl, int slotno, SlruWriteAll fdata)
652
0
{
653
0
  SlruShared  shared = ctl->shared;
654
0
  int64   pageno = shared->page_number[slotno];
655
0
  int     bankno = SlotGetBankNumber(slotno);
656
0
  bool    ok;
657
658
0
  Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
659
0
  Assert(LWLockHeldByMeInMode(SimpleLruGetBankLock(ctl, pageno), LW_EXCLUSIVE));
660
661
  /* If a write is in progress, wait for it to finish */
662
0
  while (shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS &&
663
0
       shared->page_number[slotno] == pageno)
664
0
  {
665
0
    SimpleLruWaitIO(ctl, slotno);
666
0
  }
667
668
  /*
669
   * Do nothing if page is not dirty, or if buffer no longer contains the
670
   * same page we were called for.
671
   */
672
0
  if (!shared->page_dirty[slotno] ||
673
0
    shared->page_status[slotno] != SLRU_PAGE_VALID ||
674
0
    shared->page_number[slotno] != pageno)
675
0
    return;
676
677
  /*
678
   * Mark the slot write-busy, and clear the dirtybit.  After this point, a
679
   * transaction status update on this page will mark it dirty again.
680
   */
681
0
  shared->page_status[slotno] = SLRU_PAGE_WRITE_IN_PROGRESS;
682
0
  shared->page_dirty[slotno] = false;
683
684
  /* Acquire per-buffer lock (cannot deadlock, see notes at top) */
685
0
  LWLockAcquire(&shared->buffer_locks[slotno].lock, LW_EXCLUSIVE);
686
687
  /* Release bank lock while doing I/O */
688
0
  LWLockRelease(&shared->bank_locks[bankno].lock);
689
690
  /* Do the write */
691
0
  ok = SlruPhysicalWritePage(ctl, pageno, slotno, fdata);
692
693
  /* If we failed, and we're in a flush, better close the files */
694
0
  if (!ok && fdata)
695
0
  {
696
0
    for (int i = 0; i < fdata->num_files; i++)
697
0
      CloseTransientFile(fdata->fd[i]);
698
0
  }
699
700
  /* Re-acquire bank lock and update page state */
701
0
  LWLockAcquire(&shared->bank_locks[bankno].lock, LW_EXCLUSIVE);
702
703
0
  Assert(shared->page_number[slotno] == pageno &&
704
0
       shared->page_status[slotno] == SLRU_PAGE_WRITE_IN_PROGRESS);
705
706
  /* If we failed to write, mark the page dirty again */
707
0
  if (!ok)
708
0
    shared->page_dirty[slotno] = true;
709
710
0
  shared->page_status[slotno] = SLRU_PAGE_VALID;
711
712
0
  LWLockRelease(&shared->buffer_locks[slotno].lock);
713
714
  /* Now it's okay to ereport if we failed */
715
0
  if (!ok)
716
0
    SlruReportIOError(ctl, pageno, InvalidTransactionId);
717
718
  /* If part of a checkpoint, count this as a SLRU buffer written. */
719
0
  if (fdata)
720
0
  {
721
0
    CheckpointStats.ckpt_slru_written++;
722
0
    PendingCheckpointerStats.slru_written++;
723
0
  }
724
0
}
725
726
/*
727
 * Wrapper of SlruInternalWritePage, for external callers.
728
 * fdata is always passed a NULL here.
729
 */
730
void
731
SimpleLruWritePage(SlruCtl ctl, int slotno)
732
0
{
733
0
  Assert(ctl->shared->page_status[slotno] != SLRU_PAGE_EMPTY);
734
735
0
  SlruInternalWritePage(ctl, slotno, NULL);
736
0
}
737
738
/*
739
 * Return whether the given page exists on disk.
740
 *
741
 * A false return means that either the file does not exist, or that it's not
742
 * large enough to contain the given page.
743
 */
744
bool
745
SimpleLruDoesPhysicalPageExist(SlruCtl ctl, int64 pageno)
746
0
{
747
0
  int64   segno = pageno / SLRU_PAGES_PER_SEGMENT;
748
0
  int     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
749
0
  int     offset = rpageno * BLCKSZ;
750
0
  char    path[MAXPGPATH];
751
0
  int     fd;
752
0
  bool    result;
753
0
  off_t   endpos;
754
755
  /* update the stats counter of checked pages */
756
0
  pgstat_count_slru_page_exists(ctl->shared->slru_stats_idx);
757
758
0
  SlruFileName(ctl, path, segno);
759
760
0
  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
761
0
  if (fd < 0)
762
0
  {
763
    /* expected: file doesn't exist */
764
0
    if (errno == ENOENT)
765
0
      return false;
766
767
    /* report error normally */
768
0
    slru_errcause = SLRU_OPEN_FAILED;
769
0
    slru_errno = errno;
770
0
    SlruReportIOError(ctl, pageno, 0);
771
0
  }
772
773
0
  if ((endpos = lseek(fd, 0, SEEK_END)) < 0)
774
0
  {
775
0
    slru_errcause = SLRU_SEEK_FAILED;
776
0
    slru_errno = errno;
777
0
    SlruReportIOError(ctl, pageno, 0);
778
0
  }
779
780
0
  result = endpos >= (off_t) (offset + BLCKSZ);
781
782
0
  if (CloseTransientFile(fd) != 0)
783
0
  {
784
0
    slru_errcause = SLRU_CLOSE_FAILED;
785
0
    slru_errno = errno;
786
0
    return false;
787
0
  }
788
789
0
  return result;
790
0
}
791
792
/*
793
 * Physical read of a (previously existing) page into a buffer slot
794
 *
795
 * On failure, we cannot just ereport(ERROR) since caller has put state in
796
 * shared memory that must be undone.  So, we return false and save enough
797
 * info in static variables to let SlruReportIOError make the report.
798
 *
799
 * For now, assume it's not worth keeping a file pointer open across
800
 * read/write operations.  We could cache one virtual file pointer ...
801
 */
802
static bool
803
SlruPhysicalReadPage(SlruCtl ctl, int64 pageno, int slotno)
804
0
{
805
0
  SlruShared  shared = ctl->shared;
806
0
  int64   segno = pageno / SLRU_PAGES_PER_SEGMENT;
807
0
  int     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
808
0
  off_t   offset = rpageno * BLCKSZ;
809
0
  char    path[MAXPGPATH];
810
0
  int     fd;
811
812
0
  SlruFileName(ctl, path, segno);
813
814
  /*
815
   * In a crash-and-restart situation, it's possible for us to receive
816
   * commands to set the commit status of transactions whose bits are in
817
   * already-truncated segments of the commit log (see notes in
818
   * SlruPhysicalWritePage).  Hence, if we are InRecovery, allow the case
819
   * where the file doesn't exist, and return zeroes instead.
820
   */
821
0
  fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
822
0
  if (fd < 0)
823
0
  {
824
0
    if (errno != ENOENT || !InRecovery)
825
0
    {
826
0
      slru_errcause = SLRU_OPEN_FAILED;
827
0
      slru_errno = errno;
828
0
      return false;
829
0
    }
830
831
0
    ereport(LOG,
832
0
        (errmsg("file \"%s\" doesn't exist, reading as zeroes",
833
0
            path)));
834
0
    MemSet(shared->page_buffer[slotno], 0, BLCKSZ);
835
0
    return true;
836
0
  }
837
838
0
  errno = 0;
839
0
  pgstat_report_wait_start(WAIT_EVENT_SLRU_READ);
840
0
  if (pg_pread(fd, shared->page_buffer[slotno], BLCKSZ, offset) != BLCKSZ)
841
0
  {
842
0
    pgstat_report_wait_end();
843
0
    slru_errcause = SLRU_READ_FAILED;
844
0
    slru_errno = errno;
845
0
    CloseTransientFile(fd);
846
0
    return false;
847
0
  }
848
0
  pgstat_report_wait_end();
849
850
0
  if (CloseTransientFile(fd) != 0)
851
0
  {
852
0
    slru_errcause = SLRU_CLOSE_FAILED;
853
0
    slru_errno = errno;
854
0
    return false;
855
0
  }
856
857
0
  return true;
858
0
}
859
860
/*
861
 * Physical write of a page from a buffer slot
862
 *
863
 * On failure, we cannot just ereport(ERROR) since caller has put state in
864
 * shared memory that must be undone.  So, we return false and save enough
865
 * info in static variables to let SlruReportIOError make the report.
866
 *
867
 * For now, assume it's not worth keeping a file pointer open across
868
 * independent read/write operations.  We do batch operations during
869
 * SimpleLruWriteAll, though.
870
 *
871
 * fdata is NULL for a standalone write, pointer to open-file info during
872
 * SimpleLruWriteAll.
873
 */
874
static bool
875
SlruPhysicalWritePage(SlruCtl ctl, int64 pageno, int slotno, SlruWriteAll fdata)
876
0
{
877
0
  SlruShared  shared = ctl->shared;
878
0
  int64   segno = pageno / SLRU_PAGES_PER_SEGMENT;
879
0
  int     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
880
0
  off_t   offset = rpageno * BLCKSZ;
881
0
  char    path[MAXPGPATH];
882
0
  int     fd = -1;
883
884
  /* update the stats counter of written pages */
885
0
  pgstat_count_slru_page_written(shared->slru_stats_idx);
886
887
  /*
888
   * Honor the write-WAL-before-data rule, if appropriate, so that we do not
889
   * write out data before associated WAL records.  This is the same action
890
   * performed during FlushBuffer() in the main buffer manager.
891
   */
892
0
  if (shared->group_lsn != NULL)
893
0
  {
894
    /*
895
     * We must determine the largest async-commit LSN for the page. This
896
     * is a bit tedious, but since this entire function is a slow path
897
     * anyway, it seems better to do this here than to maintain a per-page
898
     * LSN variable (which'd need an extra comparison in the
899
     * transaction-commit path).
900
     */
901
0
    XLogRecPtr  max_lsn;
902
0
    int     lsnindex;
903
904
0
    lsnindex = slotno * shared->lsn_groups_per_page;
905
0
    max_lsn = shared->group_lsn[lsnindex++];
906
0
    for (int lsnoff = 1; lsnoff < shared->lsn_groups_per_page; lsnoff++)
907
0
    {
908
0
      XLogRecPtr  this_lsn = shared->group_lsn[lsnindex++];
909
910
0
      if (max_lsn < this_lsn)
911
0
        max_lsn = this_lsn;
912
0
    }
913
914
0
    if (!XLogRecPtrIsInvalid(max_lsn))
915
0
    {
916
      /*
917
       * As noted above, elog(ERROR) is not acceptable here, so if
918
       * XLogFlush were to fail, we must PANIC.  This isn't much of a
919
       * restriction because XLogFlush is just about all critical
920
       * section anyway, but let's make sure.
921
       */
922
0
      START_CRIT_SECTION();
923
0
      XLogFlush(max_lsn);
924
0
      END_CRIT_SECTION();
925
0
    }
926
0
  }
927
928
  /*
929
   * During a SimpleLruWriteAll, we may already have the desired file open.
930
   */
931
0
  if (fdata)
932
0
  {
933
0
    for (int i = 0; i < fdata->num_files; i++)
934
0
    {
935
0
      if (fdata->segno[i] == segno)
936
0
      {
937
0
        fd = fdata->fd[i];
938
0
        break;
939
0
      }
940
0
    }
941
0
  }
942
943
0
  if (fd < 0)
944
0
  {
945
    /*
946
     * If the file doesn't already exist, we should create it.  It is
947
     * possible for this to need to happen when writing a page that's not
948
     * first in its segment; we assume the OS can cope with that. (Note:
949
     * it might seem that it'd be okay to create files only when
950
     * SimpleLruZeroPage is called for the first page of a segment.
951
     * However, if after a crash and restart the REDO logic elects to
952
     * replay the log from a checkpoint before the latest one, then it's
953
     * possible that we will get commands to set transaction status of
954
     * transactions that have already been truncated from the commit log.
955
     * Easiest way to deal with that is to accept references to
956
     * nonexistent files here and in SlruPhysicalReadPage.)
957
     *
958
     * Note: it is possible for more than one backend to be executing this
959
     * code simultaneously for different pages of the same file. Hence,
960
     * don't use O_EXCL or O_TRUNC or anything like that.
961
     */
962
0
    SlruFileName(ctl, path, segno);
963
0
    fd = OpenTransientFile(path, O_RDWR | O_CREAT | PG_BINARY);
964
0
    if (fd < 0)
965
0
    {
966
0
      slru_errcause = SLRU_OPEN_FAILED;
967
0
      slru_errno = errno;
968
0
      return false;
969
0
    }
970
971
0
    if (fdata)
972
0
    {
973
0
      if (fdata->num_files < MAX_WRITEALL_BUFFERS)
974
0
      {
975
0
        fdata->fd[fdata->num_files] = fd;
976
0
        fdata->segno[fdata->num_files] = segno;
977
0
        fdata->num_files++;
978
0
      }
979
0
      else
980
0
      {
981
        /*
982
         * In the unlikely event that we exceed MAX_WRITEALL_BUFFERS,
983
         * fall back to treating it as a standalone write.
984
         */
985
0
        fdata = NULL;
986
0
      }
987
0
    }
988
0
  }
989
990
0
  errno = 0;
991
0
  pgstat_report_wait_start(WAIT_EVENT_SLRU_WRITE);
992
0
  if (pg_pwrite(fd, shared->page_buffer[slotno], BLCKSZ, offset) != BLCKSZ)
993
0
  {
994
0
    pgstat_report_wait_end();
995
    /* if write didn't set errno, assume problem is no disk space */
996
0
    if (errno == 0)
997
0
      errno = ENOSPC;
998
0
    slru_errcause = SLRU_WRITE_FAILED;
999
0
    slru_errno = errno;
1000
0
    if (!fdata)
1001
0
      CloseTransientFile(fd);
1002
0
    return false;
1003
0
  }
1004
0
  pgstat_report_wait_end();
1005
1006
  /* Queue up a sync request for the checkpointer. */
1007
0
  if (ctl->sync_handler != SYNC_HANDLER_NONE)
1008
0
  {
1009
0
    FileTag   tag;
1010
1011
0
    INIT_SLRUFILETAG(tag, ctl->sync_handler, segno);
1012
0
    if (!RegisterSyncRequest(&tag, SYNC_REQUEST, false))
1013
0
    {
1014
      /* No space to enqueue sync request.  Do it synchronously. */
1015
0
      pgstat_report_wait_start(WAIT_EVENT_SLRU_SYNC);
1016
0
      if (pg_fsync(fd) != 0)
1017
0
      {
1018
0
        pgstat_report_wait_end();
1019
0
        slru_errcause = SLRU_FSYNC_FAILED;
1020
0
        slru_errno = errno;
1021
0
        CloseTransientFile(fd);
1022
0
        return false;
1023
0
      }
1024
0
      pgstat_report_wait_end();
1025
0
    }
1026
0
  }
1027
1028
  /* Close file, unless part of flush request. */
1029
0
  if (!fdata)
1030
0
  {
1031
0
    if (CloseTransientFile(fd) != 0)
1032
0
    {
1033
0
      slru_errcause = SLRU_CLOSE_FAILED;
1034
0
      slru_errno = errno;
1035
0
      return false;
1036
0
    }
1037
0
  }
1038
1039
0
  return true;
1040
0
}
1041
1042
/*
1043
 * Issue the error message after failure of SlruPhysicalReadPage or
1044
 * SlruPhysicalWritePage.  Call this after cleaning up shared-memory state.
1045
 */
1046
static void
1047
SlruReportIOError(SlruCtl ctl, int64 pageno, TransactionId xid)
1048
0
{
1049
0
  int64   segno = pageno / SLRU_PAGES_PER_SEGMENT;
1050
0
  int     rpageno = pageno % SLRU_PAGES_PER_SEGMENT;
1051
0
  int     offset = rpageno * BLCKSZ;
1052
0
  char    path[MAXPGPATH];
1053
1054
0
  SlruFileName(ctl, path, segno);
1055
0
  errno = slru_errno;
1056
0
  switch (slru_errcause)
1057
0
  {
1058
0
    case SLRU_OPEN_FAILED:
1059
0
      ereport(ERROR,
1060
0
          (errcode_for_file_access(),
1061
0
           errmsg("could not access status of transaction %u", xid),
1062
0
           errdetail("Could not open file \"%s\": %m.", path)));
1063
0
      break;
1064
0
    case SLRU_SEEK_FAILED:
1065
0
      ereport(ERROR,
1066
0
          (errcode_for_file_access(),
1067
0
           errmsg("could not access status of transaction %u", xid),
1068
0
           errdetail("Could not seek in file \"%s\" to offset %d: %m.",
1069
0
                 path, offset)));
1070
0
      break;
1071
0
    case SLRU_READ_FAILED:
1072
0
      if (errno)
1073
0
        ereport(ERROR,
1074
0
            (errcode_for_file_access(),
1075
0
             errmsg("could not access status of transaction %u", xid),
1076
0
             errdetail("Could not read from file \"%s\" at offset %d: %m.",
1077
0
                   path, offset)));
1078
0
      else
1079
0
        ereport(ERROR,
1080
0
            (errmsg("could not access status of transaction %u", xid),
1081
0
             errdetail("Could not read from file \"%s\" at offset %d: read too few bytes.", path, offset)));
1082
0
      break;
1083
0
    case SLRU_WRITE_FAILED:
1084
0
      if (errno)
1085
0
        ereport(ERROR,
1086
0
            (errcode_for_file_access(),
1087
0
             errmsg("could not access status of transaction %u", xid),
1088
0
             errdetail("Could not write to file \"%s\" at offset %d: %m.",
1089
0
                   path, offset)));
1090
0
      else
1091
0
        ereport(ERROR,
1092
0
            (errmsg("could not access status of transaction %u", xid),
1093
0
             errdetail("Could not write to file \"%s\" at offset %d: wrote too few bytes.",
1094
0
                   path, offset)));
1095
0
      break;
1096
0
    case SLRU_FSYNC_FAILED:
1097
0
      ereport(data_sync_elevel(ERROR),
1098
0
          (errcode_for_file_access(),
1099
0
           errmsg("could not access status of transaction %u", xid),
1100
0
           errdetail("Could not fsync file \"%s\": %m.",
1101
0
                 path)));
1102
0
      break;
1103
0
    case SLRU_CLOSE_FAILED:
1104
0
      ereport(ERROR,
1105
0
          (errcode_for_file_access(),
1106
0
           errmsg("could not access status of transaction %u", xid),
1107
0
           errdetail("Could not close file \"%s\": %m.",
1108
0
                 path)));
1109
0
      break;
1110
0
    default:
1111
      /* can't get here, we trust */
1112
0
      elog(ERROR, "unrecognized SimpleLru error cause: %d",
1113
0
         (int) slru_errcause);
1114
0
      break;
1115
0
  }
1116
0
}
1117
1118
/*
1119
 * Mark a buffer slot "most recently used".
1120
 */
1121
static inline void
1122
SlruRecentlyUsed(SlruShared shared, int slotno)
1123
0
{
1124
0
  int     bankno = SlotGetBankNumber(slotno);
1125
0
  int     new_lru_count = shared->bank_cur_lru_count[bankno];
1126
1127
0
  Assert(shared->page_status[slotno] != SLRU_PAGE_EMPTY);
1128
1129
  /*
1130
   * The reason for the if-test is that there are often many consecutive
1131
   * accesses to the same page (particularly the latest page).  By
1132
   * suppressing useless increments of bank_cur_lru_count, we reduce the
1133
   * probability that old pages' counts will "wrap around" and make them
1134
   * appear recently used.
1135
   *
1136
   * We allow this code to be executed concurrently by multiple processes
1137
   * within SimpleLruReadPage_ReadOnly().  As long as int reads and writes
1138
   * are atomic, this should not cause any completely-bogus values to enter
1139
   * the computation.  However, it is possible for either bank_cur_lru_count
1140
   * or individual page_lru_count entries to be "reset" to lower values than
1141
   * they should have, in case a process is delayed while it executes this
1142
   * function.  With care in SlruSelectLRUPage(), this does little harm, and
1143
   * in any case the absolute worst possible consequence is a nonoptimal
1144
   * choice of page to evict.  The gain from allowing concurrent reads of
1145
   * SLRU pages seems worth it.
1146
   */
1147
0
  if (new_lru_count != shared->page_lru_count[slotno])
1148
0
  {
1149
0
    shared->bank_cur_lru_count[bankno] = ++new_lru_count;
1150
0
    shared->page_lru_count[slotno] = new_lru_count;
1151
0
  }
1152
0
}
1153
1154
/*
1155
 * Select the slot to re-use when we need a free slot for the given page.
1156
 *
1157
 * The target page number is passed not only because we need to know the
1158
 * correct bank to use, but also because we need to consider the possibility
1159
 * that some other process reads in the target page while we are doing I/O to
1160
 * free a slot.  Hence, check or recheck to see if any slot already holds the
1161
 * target page, and return that slot if so.  Thus, the returned slot is
1162
 * *either* a slot already holding the pageno (could be any state except
1163
 * EMPTY), *or* a freeable slot (state EMPTY or CLEAN).
1164
 *
1165
 * The correct bank lock must be held at entry, and will be held at exit.
1166
 */
1167
static int
1168
SlruSelectLRUPage(SlruCtl ctl, int64 pageno)
1169
0
{
1170
0
  SlruShared  shared = ctl->shared;
1171
1172
  /* Outer loop handles restart after I/O */
1173
0
  for (;;)
1174
0
  {
1175
0
    int     cur_count;
1176
0
    int     bestvalidslot = 0;  /* keep compiler quiet */
1177
0
    int     best_valid_delta = -1;
1178
0
    int64   best_valid_page_number = 0; /* keep compiler quiet */
1179
0
    int     bestinvalidslot = 0;  /* keep compiler quiet */
1180
0
    int     best_invalid_delta = -1;
1181
0
    int64   best_invalid_page_number = 0; /* keep compiler quiet */
1182
0
    int     bankno = pageno % ctl->nbanks;
1183
0
    int     bankstart = bankno * SLRU_BANK_SIZE;
1184
0
    int     bankend = bankstart + SLRU_BANK_SIZE;
1185
1186
0
    Assert(LWLockHeldByMe(SimpleLruGetBankLock(ctl, pageno)));
1187
1188
    /* See if page already has a buffer assigned */
1189
0
    for (int slotno = bankstart; slotno < bankend; slotno++)
1190
0
    {
1191
0
      if (shared->page_status[slotno] != SLRU_PAGE_EMPTY &&
1192
0
        shared->page_number[slotno] == pageno)
1193
0
        return slotno;
1194
0
    }
1195
1196
    /*
1197
     * If we find any EMPTY slot, just select that one. Else choose a
1198
     * victim page to replace.  We normally take the least recently used
1199
     * valid page, but we will never take the slot containing
1200
     * latest_page_number, even if it appears least recently used.  We
1201
     * will select a slot that is already I/O busy only if there is no
1202
     * other choice: a read-busy slot will not be least recently used once
1203
     * the read finishes, and waiting for an I/O on a write-busy slot is
1204
     * inferior to just picking some other slot.  Testing shows the slot
1205
     * we pick instead will often be clean, allowing us to begin a read at
1206
     * once.
1207
     *
1208
     * Normally the page_lru_count values will all be different and so
1209
     * there will be a well-defined LRU page.  But since we allow
1210
     * concurrent execution of SlruRecentlyUsed() within
1211
     * SimpleLruReadPage_ReadOnly(), it is possible that multiple pages
1212
     * acquire the same lru_count values.  In that case we break ties by
1213
     * choosing the furthest-back page.
1214
     *
1215
     * Notice that this next line forcibly advances cur_lru_count to a
1216
     * value that is certainly beyond any value that will be in the
1217
     * page_lru_count array after the loop finishes.  This ensures that
1218
     * the next execution of SlruRecentlyUsed will mark the page newly
1219
     * used, even if it's for a page that has the current counter value.
1220
     * That gets us back on the path to having good data when there are
1221
     * multiple pages with the same lru_count.
1222
     */
1223
0
    cur_count = (shared->bank_cur_lru_count[bankno])++;
1224
0
    for (int slotno = bankstart; slotno < bankend; slotno++)
1225
0
    {
1226
0
      int     this_delta;
1227
0
      int64   this_page_number;
1228
1229
0
      if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1230
0
        return slotno;
1231
1232
0
      this_delta = cur_count - shared->page_lru_count[slotno];
1233
0
      if (this_delta < 0)
1234
0
      {
1235
        /*
1236
         * Clean up in case shared updates have caused cur_count
1237
         * increments to get "lost".  We back off the page counts,
1238
         * rather than trying to increase cur_count, to avoid any
1239
         * question of infinite loops or failure in the presence of
1240
         * wrapped-around counts.
1241
         */
1242
0
        shared->page_lru_count[slotno] = cur_count;
1243
0
        this_delta = 0;
1244
0
      }
1245
1246
      /*
1247
       * If this page is the one most recently zeroed, don't consider it
1248
       * an eviction candidate. See comments in SimpleLruZeroPage for an
1249
       * explanation about the lack of a memory barrier here.
1250
       */
1251
0
      this_page_number = shared->page_number[slotno];
1252
0
      if (this_page_number ==
1253
0
        pg_atomic_read_u64(&shared->latest_page_number))
1254
0
        continue;
1255
1256
0
      if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1257
0
      {
1258
0
        if (this_delta > best_valid_delta ||
1259
0
          (this_delta == best_valid_delta &&
1260
0
           ctl->PagePrecedes(this_page_number,
1261
0
                     best_valid_page_number)))
1262
0
        {
1263
0
          bestvalidslot = slotno;
1264
0
          best_valid_delta = this_delta;
1265
0
          best_valid_page_number = this_page_number;
1266
0
        }
1267
0
      }
1268
0
      else
1269
0
      {
1270
0
        if (this_delta > best_invalid_delta ||
1271
0
          (this_delta == best_invalid_delta &&
1272
0
           ctl->PagePrecedes(this_page_number,
1273
0
                     best_invalid_page_number)))
1274
0
        {
1275
0
          bestinvalidslot = slotno;
1276
0
          best_invalid_delta = this_delta;
1277
0
          best_invalid_page_number = this_page_number;
1278
0
        }
1279
0
      }
1280
0
    }
1281
1282
    /*
1283
     * If all pages (except possibly the latest one) are I/O busy, we'll
1284
     * have to wait for an I/O to complete and then retry.  In that
1285
     * unhappy case, we choose to wait for the I/O on the least recently
1286
     * used slot, on the assumption that it was likely initiated first of
1287
     * all the I/Os in progress and may therefore finish first.
1288
     */
1289
0
    if (best_valid_delta < 0)
1290
0
    {
1291
0
      SimpleLruWaitIO(ctl, bestinvalidslot);
1292
0
      continue;
1293
0
    }
1294
1295
    /*
1296
     * If the selected page is clean, we're set.
1297
     */
1298
0
    if (!shared->page_dirty[bestvalidslot])
1299
0
      return bestvalidslot;
1300
1301
    /*
1302
     * Write the page.
1303
     */
1304
0
    SlruInternalWritePage(ctl, bestvalidslot, NULL);
1305
1306
    /*
1307
     * Now loop back and try again.  This is the easiest way of dealing
1308
     * with corner cases such as the victim page being re-dirtied while we
1309
     * wrote it.
1310
     */
1311
0
  }
1312
0
}
1313
1314
/*
1315
 * Write dirty pages to disk during checkpoint or database shutdown.  Flushing
1316
 * is deferred until the next call to ProcessSyncRequests(), though we do fsync
1317
 * the containing directory here to make sure that newly created directory
1318
 * entries are on disk.
1319
 */
1320
void
1321
SimpleLruWriteAll(SlruCtl ctl, bool allow_redirtied)
1322
0
{
1323
0
  SlruShared  shared = ctl->shared;
1324
0
  SlruWriteAllData fdata;
1325
0
  int64   pageno = 0;
1326
0
  int     prevbank = SlotGetBankNumber(0);
1327
0
  bool    ok;
1328
1329
  /* update the stats counter of flushes */
1330
0
  pgstat_count_slru_flush(shared->slru_stats_idx);
1331
1332
  /*
1333
   * Find and write dirty pages
1334
   */
1335
0
  fdata.num_files = 0;
1336
1337
0
  LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
1338
1339
0
  for (int slotno = 0; slotno < shared->num_slots; slotno++)
1340
0
  {
1341
0
    int     curbank = SlotGetBankNumber(slotno);
1342
1343
    /*
1344
     * If the current bank lock is not same as the previous bank lock then
1345
     * release the previous lock and acquire the new lock.
1346
     */
1347
0
    if (curbank != prevbank)
1348
0
    {
1349
0
      LWLockRelease(&shared->bank_locks[prevbank].lock);
1350
0
      LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
1351
0
      prevbank = curbank;
1352
0
    }
1353
1354
    /* Do nothing if slot is unused */
1355
0
    if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1356
0
      continue;
1357
1358
0
    SlruInternalWritePage(ctl, slotno, &fdata);
1359
1360
    /*
1361
     * In some places (e.g. checkpoints), we cannot assert that the slot
1362
     * is clean now, since another process might have re-dirtied it
1363
     * already.  That's okay.
1364
     */
1365
0
    Assert(allow_redirtied ||
1366
0
         shared->page_status[slotno] == SLRU_PAGE_EMPTY ||
1367
0
         (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1368
0
        !shared->page_dirty[slotno]));
1369
0
  }
1370
1371
0
  LWLockRelease(&shared->bank_locks[prevbank].lock);
1372
1373
  /*
1374
   * Now close any files that were open
1375
   */
1376
0
  ok = true;
1377
0
  for (int i = 0; i < fdata.num_files; i++)
1378
0
  {
1379
0
    if (CloseTransientFile(fdata.fd[i]) != 0)
1380
0
    {
1381
0
      slru_errcause = SLRU_CLOSE_FAILED;
1382
0
      slru_errno = errno;
1383
0
      pageno = fdata.segno[i] * SLRU_PAGES_PER_SEGMENT;
1384
0
      ok = false;
1385
0
    }
1386
0
  }
1387
0
  if (!ok)
1388
0
    SlruReportIOError(ctl, pageno, InvalidTransactionId);
1389
1390
  /* Ensure that directory entries for new files are on disk. */
1391
0
  if (ctl->sync_handler != SYNC_HANDLER_NONE)
1392
0
    fsync_fname(ctl->Dir, true);
1393
0
}
1394
1395
/*
1396
 * Remove all segments before the one holding the passed page number
1397
 *
1398
 * All SLRUs prevent concurrent calls to this function, either with an LWLock
1399
 * or by calling it only as part of a checkpoint.  Mutual exclusion must begin
1400
 * before computing cutoffPage.  Mutual exclusion must end after any limit
1401
 * update that would permit other backends to write fresh data into the
1402
 * segment immediately preceding the one containing cutoffPage.  Otherwise,
1403
 * when the SLRU is quite full, SimpleLruTruncate() might delete that segment
1404
 * after it has accrued freshly-written data.
1405
 */
1406
void
1407
SimpleLruTruncate(SlruCtl ctl, int64 cutoffPage)
1408
0
{
1409
0
  SlruShared  shared = ctl->shared;
1410
0
  int     prevbank;
1411
1412
  /* update the stats counter of truncates */
1413
0
  pgstat_count_slru_truncate(shared->slru_stats_idx);
1414
1415
  /*
1416
   * Scan shared memory and remove any pages preceding the cutoff page, to
1417
   * ensure we won't rewrite them later.  (Since this is normally called in
1418
   * or just after a checkpoint, any dirty pages should have been flushed
1419
   * already ... we're just being extra careful here.)
1420
   */
1421
0
restart:
1422
1423
  /*
1424
   * An important safety check: the current endpoint page must not be
1425
   * eligible for removal.  This check is just a backstop against wraparound
1426
   * bugs elsewhere in SLRU handling, so we don't care if we read a slightly
1427
   * outdated value; therefore we don't add a memory barrier.
1428
   */
1429
0
  if (ctl->PagePrecedes(pg_atomic_read_u64(&shared->latest_page_number),
1430
0
              cutoffPage))
1431
0
  {
1432
0
    ereport(LOG,
1433
0
        (errmsg("could not truncate directory \"%s\": apparent wraparound",
1434
0
            ctl->Dir)));
1435
0
    return;
1436
0
  }
1437
1438
0
  prevbank = SlotGetBankNumber(0);
1439
0
  LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
1440
0
  for (int slotno = 0; slotno < shared->num_slots; slotno++)
1441
0
  {
1442
0
    int     curbank = SlotGetBankNumber(slotno);
1443
1444
    /*
1445
     * If the current bank lock is not same as the previous bank lock then
1446
     * release the previous lock and acquire the new lock.
1447
     */
1448
0
    if (curbank != prevbank)
1449
0
    {
1450
0
      LWLockRelease(&shared->bank_locks[prevbank].lock);
1451
0
      LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
1452
0
      prevbank = curbank;
1453
0
    }
1454
1455
0
    if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1456
0
      continue;
1457
0
    if (!ctl->PagePrecedes(shared->page_number[slotno], cutoffPage))
1458
0
      continue;
1459
1460
    /*
1461
     * If page is clean, just change state to EMPTY (expected case).
1462
     */
1463
0
    if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1464
0
      !shared->page_dirty[slotno])
1465
0
    {
1466
0
      shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1467
0
      continue;
1468
0
    }
1469
1470
    /*
1471
     * Hmm, we have (or may have) I/O operations acting on the page, so
1472
     * we've got to wait for them to finish and then start again. This is
1473
     * the same logic as in SlruSelectLRUPage.  (XXX if page is dirty,
1474
     * wouldn't it be OK to just discard it without writing it?
1475
     * SlruMayDeleteSegment() uses a stricter qualification, so we might
1476
     * not delete this page in the end; even if we don't delete it, we
1477
     * won't have cause to read its data again.  For now, keep the logic
1478
     * the same as it was.)
1479
     */
1480
0
    if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1481
0
      SlruInternalWritePage(ctl, slotno, NULL);
1482
0
    else
1483
0
      SimpleLruWaitIO(ctl, slotno);
1484
1485
0
    LWLockRelease(&shared->bank_locks[prevbank].lock);
1486
0
    goto restart;
1487
0
  }
1488
1489
0
  LWLockRelease(&shared->bank_locks[prevbank].lock);
1490
1491
  /* Now we can remove the old segment(s) */
1492
0
  (void) SlruScanDirectory(ctl, SlruScanDirCbDeleteCutoff, &cutoffPage);
1493
0
}
1494
1495
/*
1496
 * Delete an individual SLRU segment.
1497
 *
1498
 * NB: This does not touch the SLRU buffers themselves, callers have to ensure
1499
 * they either can't yet contain anything, or have already been cleaned out.
1500
 */
1501
static void
1502
SlruInternalDeleteSegment(SlruCtl ctl, int64 segno)
1503
0
{
1504
0
  char    path[MAXPGPATH];
1505
1506
  /* Forget any fsync requests queued for this segment. */
1507
0
  if (ctl->sync_handler != SYNC_HANDLER_NONE)
1508
0
  {
1509
0
    FileTag   tag;
1510
1511
0
    INIT_SLRUFILETAG(tag, ctl->sync_handler, segno);
1512
0
    RegisterSyncRequest(&tag, SYNC_FORGET_REQUEST, true);
1513
0
  }
1514
1515
  /* Unlink the file. */
1516
0
  SlruFileName(ctl, path, segno);
1517
0
  ereport(DEBUG2, (errmsg_internal("removing file \"%s\"", path)));
1518
0
  unlink(path);
1519
0
}
1520
1521
/*
1522
 * Delete an individual SLRU segment, identified by the segment number.
1523
 */
1524
void
1525
SlruDeleteSegment(SlruCtl ctl, int64 segno)
1526
0
{
1527
0
  SlruShared  shared = ctl->shared;
1528
0
  int     prevbank = SlotGetBankNumber(0);
1529
0
  bool    did_write;
1530
1531
  /* Clean out any possibly existing references to the segment. */
1532
0
  LWLockAcquire(&shared->bank_locks[prevbank].lock, LW_EXCLUSIVE);
1533
0
restart:
1534
0
  did_write = false;
1535
0
  for (int slotno = 0; slotno < shared->num_slots; slotno++)
1536
0
  {
1537
0
    int64   pagesegno;
1538
0
    int     curbank = SlotGetBankNumber(slotno);
1539
1540
    /*
1541
     * If the current bank lock is not same as the previous bank lock then
1542
     * release the previous lock and acquire the new lock.
1543
     */
1544
0
    if (curbank != prevbank)
1545
0
    {
1546
0
      LWLockRelease(&shared->bank_locks[prevbank].lock);
1547
0
      LWLockAcquire(&shared->bank_locks[curbank].lock, LW_EXCLUSIVE);
1548
0
      prevbank = curbank;
1549
0
    }
1550
1551
0
    if (shared->page_status[slotno] == SLRU_PAGE_EMPTY)
1552
0
      continue;
1553
1554
0
    pagesegno = shared->page_number[slotno] / SLRU_PAGES_PER_SEGMENT;
1555
    /* not the segment we're looking for */
1556
0
    if (pagesegno != segno)
1557
0
      continue;
1558
1559
    /* If page is clean, just change state to EMPTY (expected case). */
1560
0
    if (shared->page_status[slotno] == SLRU_PAGE_VALID &&
1561
0
      !shared->page_dirty[slotno])
1562
0
    {
1563
0
      shared->page_status[slotno] = SLRU_PAGE_EMPTY;
1564
0
      continue;
1565
0
    }
1566
1567
    /* Same logic as SimpleLruTruncate() */
1568
0
    if (shared->page_status[slotno] == SLRU_PAGE_VALID)
1569
0
      SlruInternalWritePage(ctl, slotno, NULL);
1570
0
    else
1571
0
      SimpleLruWaitIO(ctl, slotno);
1572
1573
0
    did_write = true;
1574
0
  }
1575
1576
  /*
1577
   * Be extra careful and re-check. The IO functions release the control
1578
   * lock, so new pages could have been read in.
1579
   */
1580
0
  if (did_write)
1581
0
    goto restart;
1582
1583
0
  SlruInternalDeleteSegment(ctl, segno);
1584
1585
0
  LWLockRelease(&shared->bank_locks[prevbank].lock);
1586
0
}
1587
1588
/*
1589
 * Determine whether a segment is okay to delete.
1590
 *
1591
 * segpage is the first page of the segment, and cutoffPage is the oldest (in
1592
 * PagePrecedes order) page in the SLRU containing still-useful data.  Since
1593
 * every core PagePrecedes callback implements "wrap around", check the
1594
 * segment's first and last pages:
1595
 *
1596
 * first<cutoff  && last<cutoff:  yes
1597
 * first<cutoff  && last>=cutoff: no; cutoff falls inside this segment
1598
 * first>=cutoff && last<cutoff:  no; wrap point falls inside this segment
1599
 * first>=cutoff && last>=cutoff: no; every page of this segment is too young
1600
 */
1601
static bool
1602
SlruMayDeleteSegment(SlruCtl ctl, int64 segpage, int64 cutoffPage)
1603
0
{
1604
0
  int64   seg_last_page = segpage + SLRU_PAGES_PER_SEGMENT - 1;
1605
1606
0
  Assert(segpage % SLRU_PAGES_PER_SEGMENT == 0);
1607
1608
0
  return (ctl->PagePrecedes(segpage, cutoffPage) &&
1609
0
      ctl->PagePrecedes(seg_last_page, cutoffPage));
1610
0
}
1611
1612
#ifdef USE_ASSERT_CHECKING
1613
static void
1614
SlruPagePrecedesTestOffset(SlruCtl ctl, int per_page, uint32 offset)
1615
{
1616
  TransactionId lhs,
1617
        rhs;
1618
  int64   newestPage,
1619
        oldestPage;
1620
  TransactionId newestXact,
1621
        oldestXact;
1622
1623
  /*
1624
   * Compare an XID pair having undefined order (see RFC 1982), a pair at
1625
   * "opposite ends" of the XID space.  TransactionIdPrecedes() treats each
1626
   * as preceding the other.  If RHS is oldestXact, LHS is the first XID we
1627
   * must not assign.
1628
   */
1629
  lhs = per_page + offset;  /* skip first page to avoid non-normal XIDs */
1630
  rhs = lhs + (1U << 31);
1631
  Assert(TransactionIdPrecedes(lhs, rhs));
1632
  Assert(TransactionIdPrecedes(rhs, lhs));
1633
  Assert(!TransactionIdPrecedes(lhs - 1, rhs));
1634
  Assert(TransactionIdPrecedes(rhs, lhs - 1));
1635
  Assert(TransactionIdPrecedes(lhs + 1, rhs));
1636
  Assert(!TransactionIdPrecedes(rhs, lhs + 1));
1637
  Assert(!TransactionIdFollowsOrEquals(lhs, rhs));
1638
  Assert(!TransactionIdFollowsOrEquals(rhs, lhs));
1639
  Assert(!ctl->PagePrecedes(lhs / per_page, lhs / per_page));
1640
  Assert(!ctl->PagePrecedes(lhs / per_page, rhs / per_page));
1641
  Assert(!ctl->PagePrecedes(rhs / per_page, lhs / per_page));
1642
  Assert(!ctl->PagePrecedes((lhs - per_page) / per_page, rhs / per_page));
1643
  Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 3 * per_page) / per_page));
1644
  Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 2 * per_page) / per_page));
1645
  Assert(ctl->PagePrecedes(rhs / per_page, (lhs - 1 * per_page) / per_page)
1646
       || (1U << 31) % per_page != 0);  /* See CommitTsPagePrecedes() */
1647
  Assert(ctl->PagePrecedes((lhs + 1 * per_page) / per_page, rhs / per_page)
1648
       || (1U << 31) % per_page != 0);
1649
  Assert(ctl->PagePrecedes((lhs + 2 * per_page) / per_page, rhs / per_page));
1650
  Assert(ctl->PagePrecedes((lhs + 3 * per_page) / per_page, rhs / per_page));
1651
  Assert(!ctl->PagePrecedes(rhs / per_page, (lhs + per_page) / per_page));
1652
1653
  /*
1654
   * GetNewTransactionId() has assigned the last XID it can safely use, and
1655
   * that XID is in the *LAST* page of the second segment.  We must not
1656
   * delete that segment.
1657
   */
1658
  newestPage = 2 * SLRU_PAGES_PER_SEGMENT - 1;
1659
  newestXact = newestPage * per_page + offset;
1660
  Assert(newestXact / per_page == newestPage);
1661
  oldestXact = newestXact + 1;
1662
  oldestXact -= 1U << 31;
1663
  oldestPage = oldestXact / per_page;
1664
  Assert(!SlruMayDeleteSegment(ctl,
1665
                 (newestPage -
1666
                  newestPage % SLRU_PAGES_PER_SEGMENT),
1667
                 oldestPage));
1668
1669
  /*
1670
   * GetNewTransactionId() has assigned the last XID it can safely use, and
1671
   * that XID is in the *FIRST* page of the second segment.  We must not
1672
   * delete that segment.
1673
   */
1674
  newestPage = SLRU_PAGES_PER_SEGMENT;
1675
  newestXact = newestPage * per_page + offset;
1676
  Assert(newestXact / per_page == newestPage);
1677
  oldestXact = newestXact + 1;
1678
  oldestXact -= 1U << 31;
1679
  oldestPage = oldestXact / per_page;
1680
  Assert(!SlruMayDeleteSegment(ctl,
1681
                 (newestPage -
1682
                  newestPage % SLRU_PAGES_PER_SEGMENT),
1683
                 oldestPage));
1684
}
1685
1686
/*
1687
 * Unit-test a PagePrecedes function.
1688
 *
1689
 * This assumes every uint32 >= FirstNormalTransactionId is a valid key.  It
1690
 * assumes each value occupies a contiguous, fixed-size region of SLRU bytes.
1691
 * (MultiXactMemberCtl separates flags from XIDs.  NotifyCtl has
1692
 * variable-length entries, no keys, and no random access.  These unit tests
1693
 * do not apply to them.)
1694
 */
1695
void
1696
SlruPagePrecedesUnitTests(SlruCtl ctl, int per_page)
1697
{
1698
  /* Test first, middle and last entries of a page. */
1699
  SlruPagePrecedesTestOffset(ctl, per_page, 0);
1700
  SlruPagePrecedesTestOffset(ctl, per_page, per_page / 2);
1701
  SlruPagePrecedesTestOffset(ctl, per_page, per_page - 1);
1702
}
1703
#endif
1704
1705
/*
1706
 * SlruScanDirectory callback
1707
 *    This callback reports true if there's any segment wholly prior to the
1708
 *    one containing the page passed as "data".
1709
 */
1710
bool
1711
SlruScanDirCbReportPresence(SlruCtl ctl, char *filename, int64 segpage,
1712
              void *data)
1713
0
{
1714
0
  int64   cutoffPage = *(int64 *) data;
1715
1716
0
  if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
1717
0
    return true;     /* found one; don't iterate any more */
1718
1719
0
  return false;       /* keep going */
1720
0
}
1721
1722
/*
1723
 * SlruScanDirectory callback.
1724
 *    This callback deletes segments prior to the one passed in as "data".
1725
 */
1726
static bool
1727
SlruScanDirCbDeleteCutoff(SlruCtl ctl, char *filename, int64 segpage,
1728
              void *data)
1729
0
{
1730
0
  int64   cutoffPage = *(int64 *) data;
1731
1732
0
  if (SlruMayDeleteSegment(ctl, segpage, cutoffPage))
1733
0
    SlruInternalDeleteSegment(ctl, segpage / SLRU_PAGES_PER_SEGMENT);
1734
1735
0
  return false;       /* keep going */
1736
0
}
1737
1738
/*
1739
 * SlruScanDirectory callback.
1740
 *    This callback deletes all segments.
1741
 */
1742
bool
1743
SlruScanDirCbDeleteAll(SlruCtl ctl, char *filename, int64 segpage, void *data)
1744
0
{
1745
0
  SlruInternalDeleteSegment(ctl, segpage / SLRU_PAGES_PER_SEGMENT);
1746
1747
0
  return false;       /* keep going */
1748
0
}
1749
1750
/*
1751
 * An internal function used by SlruScanDirectory().
1752
 *
1753
 * Returns true if a file with a name of a given length may be a correct
1754
 * SLRU segment.
1755
 */
1756
static inline bool
1757
SlruCorrectSegmentFilenameLength(SlruCtl ctl, size_t len)
1758
0
{
1759
0
  if (ctl->long_segment_names)
1760
0
    return (len == 15);   /* see SlruFileName() */
1761
0
  else
1762
1763
    /*
1764
     * Commit 638cf09e76d allowed 5-character lengths. Later commit
1765
     * 73c986adde5 allowed 6-character length.
1766
     *
1767
     * Note: There is an ongoing plan to migrate all SLRUs to 64-bit page
1768
     * numbers, and the corresponding 15-character file names, which may
1769
     * eventually deprecate the support for 4, 5, and 6-character names.
1770
     */
1771
0
    return (len == 4 || len == 5 || len == 6);
1772
0
}
1773
1774
/*
1775
 * Scan the SimpleLru directory and apply a callback to each file found in it.
1776
 *
1777
 * If the callback returns true, the scan is stopped.  The last return value
1778
 * from the callback is returned.
1779
 *
1780
 * The callback receives the following arguments: 1. the SlruCtl struct for the
1781
 * slru being truncated; 2. the filename being considered; 3. the page number
1782
 * for the first page of that file; 4. a pointer to the opaque data given to us
1783
 * by the caller.
1784
 *
1785
 * Note that the ordering in which the directory is scanned is not guaranteed.
1786
 *
1787
 * Note that no locking is applied.
1788
 */
1789
bool
1790
SlruScanDirectory(SlruCtl ctl, SlruScanCallback callback, void *data)
1791
0
{
1792
0
  bool    retval = false;
1793
0
  DIR      *cldir;
1794
0
  struct dirent *clde;
1795
0
  int64   segno;
1796
0
  int64   segpage;
1797
1798
0
  cldir = AllocateDir(ctl->Dir);
1799
0
  while ((clde = ReadDir(cldir, ctl->Dir)) != NULL)
1800
0
  {
1801
0
    size_t    len;
1802
1803
0
    len = strlen(clde->d_name);
1804
1805
0
    if (SlruCorrectSegmentFilenameLength(ctl, len) &&
1806
0
      strspn(clde->d_name, "0123456789ABCDEF") == len)
1807
0
    {
1808
0
      segno = strtoi64(clde->d_name, NULL, 16);
1809
0
      segpage = segno * SLRU_PAGES_PER_SEGMENT;
1810
1811
0
      elog(DEBUG2, "SlruScanDirectory invoking callback on %s/%s",
1812
0
         ctl->Dir, clde->d_name);
1813
0
      retval = callback(ctl, clde->d_name, segpage, data);
1814
0
      if (retval)
1815
0
        break;
1816
0
    }
1817
0
  }
1818
0
  FreeDir(cldir);
1819
1820
0
  return retval;
1821
0
}
1822
1823
/*
1824
 * Individual SLRUs (clog, ...) have to provide a sync.c handler function so
1825
 * that they can provide the correct "SlruCtl" (otherwise we don't know how to
1826
 * build the path), but they just forward to this common implementation that
1827
 * performs the fsync.
1828
 */
1829
int
1830
SlruSyncFileTag(SlruCtl ctl, const FileTag *ftag, char *path)
1831
0
{
1832
0
  int     fd;
1833
0
  int     save_errno;
1834
0
  int     result;
1835
1836
0
  SlruFileName(ctl, path, ftag->segno);
1837
1838
0
  fd = OpenTransientFile(path, O_RDWR | PG_BINARY);
1839
0
  if (fd < 0)
1840
0
    return -1;
1841
1842
0
  pgstat_report_wait_start(WAIT_EVENT_SLRU_FLUSH_SYNC);
1843
0
  result = pg_fsync(fd);
1844
0
  pgstat_report_wait_end();
1845
0
  save_errno = errno;
1846
1847
0
  CloseTransientFile(fd);
1848
1849
0
  errno = save_errno;
1850
0
  return result;
1851
0
}