Coverage Report

Created: 2025-07-03 06:49

/src/postgres/src/backend/access/transam/commit_ts.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * commit_ts.c
4
 *    PostgreSQL commit timestamp manager
5
 *
6
 * This module is a pg_xact-like system that stores the commit timestamp
7
 * for each transaction.
8
 *
9
 * XLOG interactions: this module generates an XLOG record whenever a new
10
 * CommitTs page is initialized to zeroes.  Other writes of CommitTS come
11
 * from recording of transaction commit in xact.c, which generates its own
12
 * XLOG records for these events and will re-perform the status update on
13
 * redo; so we need make no additional XLOG entry here.
14
 *
15
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
16
 * Portions Copyright (c) 1994, Regents of the University of California
17
 *
18
 * src/backend/access/transam/commit_ts.c
19
 *
20
 *-------------------------------------------------------------------------
21
 */
22
#include "postgres.h"
23
24
#include "access/commit_ts.h"
25
#include "access/htup_details.h"
26
#include "access/slru.h"
27
#include "access/transam.h"
28
#include "access/xloginsert.h"
29
#include "access/xlogutils.h"
30
#include "funcapi.h"
31
#include "miscadmin.h"
32
#include "storage/shmem.h"
33
#include "utils/fmgrprotos.h"
34
#include "utils/guc_hooks.h"
35
#include "utils/timestamp.h"
36
37
/*
38
 * Defines for CommitTs page sizes.  A page is the same BLCKSZ as is used
39
 * everywhere else in Postgres.
40
 *
41
 * Note: because TransactionIds are 32 bits and wrap around at 0xFFFFFFFF,
42
 * CommitTs page numbering also wraps around at
43
 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE, and CommitTs segment numbering at
44
 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE/SLRU_PAGES_PER_SEGMENT.  We need take no
45
 * explicit notice of that fact in this module, except when comparing segment
46
 * and page numbers in TruncateCommitTs (see CommitTsPagePrecedes).
47
 */
48
49
/*
50
 * We need 8+2 bytes per xact.  Note that enlarging this struct might mean
51
 * the largest possible file name is more than 5 chars long; see
52
 * SlruScanDirectory.
53
 */
54
typedef struct CommitTimestampEntry
55
{
56
  TimestampTz time;
57
  RepOriginId nodeid;
58
} CommitTimestampEntry;
59
60
0
#define SizeOfCommitTimestampEntry (offsetof(CommitTimestampEntry, nodeid) + \
61
0
                  sizeof(RepOriginId))
62
63
#define COMMIT_TS_XACTS_PER_PAGE \
64
0
  (BLCKSZ / SizeOfCommitTimestampEntry)
65
66
67
/*
68
 * Although we return an int64 the actual value can't currently exceed
69
 * 0xFFFFFFFF/COMMIT_TS_XACTS_PER_PAGE.
70
 */
71
static inline int64
72
TransactionIdToCTsPage(TransactionId xid)
73
0
{
74
0
  return xid / (int64) COMMIT_TS_XACTS_PER_PAGE;
75
0
}
76
77
#define TransactionIdToCTsEntry(xid)  \
78
0
  ((xid) % (TransactionId) COMMIT_TS_XACTS_PER_PAGE)
79
80
/*
81
 * Link to shared-memory data structures for CommitTs control
82
 */
83
static SlruCtlData CommitTsCtlData;
84
85
0
#define CommitTsCtl (&CommitTsCtlData)
86
87
/*
88
 * We keep a cache of the last value set in shared memory.
89
 *
90
 * This is also good place to keep the activation status.  We keep this
91
 * separate from the GUC so that the standby can activate the module if the
92
 * primary has it active independently of the value of the GUC.
93
 *
94
 * This is protected by CommitTsLock.  In some places, we use commitTsActive
95
 * without acquiring the lock; where this happens, a comment explains the
96
 * rationale for it.
97
 */
98
typedef struct CommitTimestampShared
99
{
100
  TransactionId xidLastCommit;
101
  CommitTimestampEntry dataLastCommit;
102
  bool    commitTsActive;
103
} CommitTimestampShared;
104
105
static CommitTimestampShared *commitTsShared;
106
107
108
/* GUC variable */
109
bool    track_commit_timestamp;
110
111
static void SetXidCommitTsInPage(TransactionId xid, int nsubxids,
112
                 TransactionId *subxids, TimestampTz ts,
113
                 RepOriginId nodeid, int64 pageno);
114
static void TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
115
                   RepOriginId nodeid, int slotno);
116
static void error_commit_ts_disabled(void);
117
static int  ZeroCommitTsPage(int64 pageno, bool writeXlog);
118
static bool CommitTsPagePrecedes(int64 page1, int64 page2);
119
static void ActivateCommitTs(void);
120
static void DeactivateCommitTs(void);
121
static void WriteZeroPageXlogRec(int64 pageno);
122
static void WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid);
123
124
/*
125
 * TransactionTreeSetCommitTsData
126
 *
127
 * Record the final commit timestamp of transaction entries in the commit log
128
 * for a transaction and its subtransaction tree, as efficiently as possible.
129
 *
130
 * xid is the top level transaction id.
131
 *
132
 * subxids is an array of xids of length nsubxids, representing subtransactions
133
 * in the tree of xid. In various cases nsubxids may be zero.
134
 * The reason why tracking just the parent xid commit timestamp is not enough
135
 * is that the subtrans SLRU does not stay valid across crashes (it's not
136
 * permanent) so we need to keep the information about them here. If the
137
 * subtrans implementation changes in the future, we might want to revisit the
138
 * decision of storing timestamp info for each subxid.
139
 */
140
void
141
TransactionTreeSetCommitTsData(TransactionId xid, int nsubxids,
142
                 TransactionId *subxids, TimestampTz timestamp,
143
                 RepOriginId nodeid)
144
0
{
145
0
  int     i;
146
0
  TransactionId headxid;
147
0
  TransactionId newestXact;
148
149
  /*
150
   * No-op if the module is not active.
151
   *
152
   * An unlocked read here is fine, because in a standby (the only place
153
   * where the flag can change in flight) this routine is only called by the
154
   * recovery process, which is also the only process which can change the
155
   * flag.
156
   */
157
0
  if (!commitTsShared->commitTsActive)
158
0
    return;
159
160
  /*
161
   * Figure out the latest Xid in this batch: either the last subxid if
162
   * there's any, otherwise the parent xid.
163
   */
164
0
  if (nsubxids > 0)
165
0
    newestXact = subxids[nsubxids - 1];
166
0
  else
167
0
    newestXact = xid;
168
169
  /*
170
   * We split the xids to set the timestamp to in groups belonging to the
171
   * same SLRU page; the first element in each such set is its head.  The
172
   * first group has the main XID as the head; subsequent sets use the first
173
   * subxid not on the previous page as head.  This way, we only have to
174
   * lock/modify each SLRU page once.
175
   */
176
0
  headxid = xid;
177
0
  i = 0;
178
0
  for (;;)
179
0
  {
180
0
    int64   pageno = TransactionIdToCTsPage(headxid);
181
0
    int     j;
182
183
0
    for (j = i; j < nsubxids; j++)
184
0
    {
185
0
      if (TransactionIdToCTsPage(subxids[j]) != pageno)
186
0
        break;
187
0
    }
188
    /* subxids[i..j] are on the same page as the head */
189
190
0
    SetXidCommitTsInPage(headxid, j - i, subxids + i, timestamp, nodeid,
191
0
               pageno);
192
193
    /* if we wrote out all subxids, we're done. */
194
0
    if (j >= nsubxids)
195
0
      break;
196
197
    /*
198
     * Set the new head and skip over it, as well as over the subxids we
199
     * just wrote.
200
     */
201
0
    headxid = subxids[j];
202
0
    i = j + 1;
203
0
  }
204
205
  /* update the cached value in shared memory */
206
0
  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
207
0
  commitTsShared->xidLastCommit = xid;
208
0
  commitTsShared->dataLastCommit.time = timestamp;
209
0
  commitTsShared->dataLastCommit.nodeid = nodeid;
210
211
  /* and move forwards our endpoint, if needed */
212
0
  if (TransactionIdPrecedes(TransamVariables->newestCommitTsXid, newestXact))
213
0
    TransamVariables->newestCommitTsXid = newestXact;
214
0
  LWLockRelease(CommitTsLock);
215
0
}
216
217
/*
218
 * Record the commit timestamp of transaction entries in the commit log for all
219
 * entries on a single page.  Atomic only on this page.
220
 */
221
static void
222
SetXidCommitTsInPage(TransactionId xid, int nsubxids,
223
           TransactionId *subxids, TimestampTz ts,
224
           RepOriginId nodeid, int64 pageno)
225
0
{
226
0
  LWLock     *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
227
0
  int     slotno;
228
0
  int     i;
229
230
0
  LWLockAcquire(lock, LW_EXCLUSIVE);
231
232
0
  slotno = SimpleLruReadPage(CommitTsCtl, pageno, true, xid);
233
234
0
  TransactionIdSetCommitTs(xid, ts, nodeid, slotno);
235
0
  for (i = 0; i < nsubxids; i++)
236
0
    TransactionIdSetCommitTs(subxids[i], ts, nodeid, slotno);
237
238
0
  CommitTsCtl->shared->page_dirty[slotno] = true;
239
240
0
  LWLockRelease(lock);
241
0
}
242
243
/*
244
 * Sets the commit timestamp of a single transaction.
245
 *
246
 * Caller must hold the correct SLRU bank lock, will be held at exit
247
 */
248
static void
249
TransactionIdSetCommitTs(TransactionId xid, TimestampTz ts,
250
             RepOriginId nodeid, int slotno)
251
0
{
252
0
  int     entryno = TransactionIdToCTsEntry(xid);
253
0
  CommitTimestampEntry entry;
254
255
0
  Assert(TransactionIdIsNormal(xid));
256
257
0
  entry.time = ts;
258
0
  entry.nodeid = nodeid;
259
260
0
  memcpy(CommitTsCtl->shared->page_buffer[slotno] +
261
0
       SizeOfCommitTimestampEntry * entryno,
262
0
       &entry, SizeOfCommitTimestampEntry);
263
0
}
264
265
/*
266
 * Interrogate the commit timestamp of a transaction.
267
 *
268
 * The return value indicates whether a commit timestamp record was found for
269
 * the given xid.  The timestamp value is returned in *ts (which may not be
270
 * null), and the origin node for the Xid is returned in *nodeid, if it's not
271
 * null.
272
 */
273
bool
274
TransactionIdGetCommitTsData(TransactionId xid, TimestampTz *ts,
275
               RepOriginId *nodeid)
276
0
{
277
0
  int64   pageno = TransactionIdToCTsPage(xid);
278
0
  int     entryno = TransactionIdToCTsEntry(xid);
279
0
  int     slotno;
280
0
  CommitTimestampEntry entry;
281
0
  TransactionId oldestCommitTsXid;
282
0
  TransactionId newestCommitTsXid;
283
284
0
  if (!TransactionIdIsValid(xid))
285
0
    ereport(ERROR,
286
0
        (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
287
0
         errmsg("cannot retrieve commit timestamp for transaction %u", xid)));
288
0
  else if (!TransactionIdIsNormal(xid))
289
0
  {
290
    /* frozen and bootstrap xids are always committed far in the past */
291
0
    *ts = 0;
292
0
    if (nodeid)
293
0
      *nodeid = 0;
294
0
    return false;
295
0
  }
296
297
0
  LWLockAcquire(CommitTsLock, LW_SHARED);
298
299
  /* Error if module not enabled */
300
0
  if (!commitTsShared->commitTsActive)
301
0
    error_commit_ts_disabled();
302
303
  /*
304
   * If we're asked for the cached value, return that.  Otherwise, fall
305
   * through to read from SLRU.
306
   */
307
0
  if (commitTsShared->xidLastCommit == xid)
308
0
  {
309
0
    *ts = commitTsShared->dataLastCommit.time;
310
0
    if (nodeid)
311
0
      *nodeid = commitTsShared->dataLastCommit.nodeid;
312
313
0
    LWLockRelease(CommitTsLock);
314
0
    return *ts != 0;
315
0
  }
316
317
0
  oldestCommitTsXid = TransamVariables->oldestCommitTsXid;
318
0
  newestCommitTsXid = TransamVariables->newestCommitTsXid;
319
  /* neither is invalid, or both are */
320
0
  Assert(TransactionIdIsValid(oldestCommitTsXid) == TransactionIdIsValid(newestCommitTsXid));
321
0
  LWLockRelease(CommitTsLock);
322
323
  /*
324
   * Return empty if the requested value is outside our valid range.
325
   */
326
0
  if (!TransactionIdIsValid(oldestCommitTsXid) ||
327
0
    TransactionIdPrecedes(xid, oldestCommitTsXid) ||
328
0
    TransactionIdPrecedes(newestCommitTsXid, xid))
329
0
  {
330
0
    *ts = 0;
331
0
    if (nodeid)
332
0
      *nodeid = InvalidRepOriginId;
333
0
    return false;
334
0
  }
335
336
  /* lock is acquired by SimpleLruReadPage_ReadOnly */
337
0
  slotno = SimpleLruReadPage_ReadOnly(CommitTsCtl, pageno, xid);
338
0
  memcpy(&entry,
339
0
       CommitTsCtl->shared->page_buffer[slotno] +
340
0
       SizeOfCommitTimestampEntry * entryno,
341
0
       SizeOfCommitTimestampEntry);
342
343
0
  *ts = entry.time;
344
0
  if (nodeid)
345
0
    *nodeid = entry.nodeid;
346
347
0
  LWLockRelease(SimpleLruGetBankLock(CommitTsCtl, pageno));
348
0
  return *ts != 0;
349
0
}
350
351
/*
352
 * Return the Xid of the latest committed transaction.  (As far as this module
353
 * is concerned, anyway; it's up to the caller to ensure the value is useful
354
 * for its purposes.)
355
 *
356
 * ts and nodeid are filled with the corresponding data; they can be passed
357
 * as NULL if not wanted.
358
 */
359
TransactionId
360
GetLatestCommitTsData(TimestampTz *ts, RepOriginId *nodeid)
361
0
{
362
0
  TransactionId xid;
363
364
0
  LWLockAcquire(CommitTsLock, LW_SHARED);
365
366
  /* Error if module not enabled */
367
0
  if (!commitTsShared->commitTsActive)
368
0
    error_commit_ts_disabled();
369
370
0
  xid = commitTsShared->xidLastCommit;
371
0
  if (ts)
372
0
    *ts = commitTsShared->dataLastCommit.time;
373
0
  if (nodeid)
374
0
    *nodeid = commitTsShared->dataLastCommit.nodeid;
375
0
  LWLockRelease(CommitTsLock);
376
377
0
  return xid;
378
0
}
379
380
static void
381
error_commit_ts_disabled(void)
382
0
{
383
0
  ereport(ERROR,
384
0
      (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
385
0
       errmsg("could not get commit timestamp data"),
386
0
       RecoveryInProgress() ?
387
0
       errhint("Make sure the configuration parameter \"%s\" is set on the primary server.",
388
0
           "track_commit_timestamp") :
389
0
       errhint("Make sure the configuration parameter \"%s\" is set.",
390
0
           "track_commit_timestamp")));
391
0
}
392
393
/*
394
 * SQL-callable wrapper to obtain commit time of a transaction
395
 */
396
Datum
397
pg_xact_commit_timestamp(PG_FUNCTION_ARGS)
398
0
{
399
0
  TransactionId xid = PG_GETARG_TRANSACTIONID(0);
400
0
  TimestampTz ts;
401
0
  bool    found;
402
403
0
  found = TransactionIdGetCommitTsData(xid, &ts, NULL);
404
405
0
  if (!found)
406
0
    PG_RETURN_NULL();
407
408
0
  PG_RETURN_TIMESTAMPTZ(ts);
409
0
}
410
411
412
/*
413
 * pg_last_committed_xact
414
 *
415
 * SQL-callable wrapper to obtain some information about the latest
416
 * committed transaction: transaction ID, timestamp and replication
417
 * origin.
418
 */
419
Datum
420
pg_last_committed_xact(PG_FUNCTION_ARGS)
421
0
{
422
0
  TransactionId xid;
423
0
  RepOriginId nodeid;
424
0
  TimestampTz ts;
425
0
  Datum   values[3];
426
0
  bool    nulls[3];
427
0
  TupleDesc tupdesc;
428
0
  HeapTuple htup;
429
430
  /* and construct a tuple with our data */
431
0
  xid = GetLatestCommitTsData(&ts, &nodeid);
432
433
0
  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
434
0
    elog(ERROR, "return type must be a row type");
435
436
0
  if (!TransactionIdIsNormal(xid))
437
0
  {
438
0
    memset(nulls, true, sizeof(nulls));
439
0
  }
440
0
  else
441
0
  {
442
0
    values[0] = TransactionIdGetDatum(xid);
443
0
    nulls[0] = false;
444
445
0
    values[1] = TimestampTzGetDatum(ts);
446
0
    nulls[1] = false;
447
448
0
    values[2] = ObjectIdGetDatum((Oid) nodeid);
449
0
    nulls[2] = false;
450
0
  }
451
452
0
  htup = heap_form_tuple(tupdesc, values, nulls);
453
454
0
  PG_RETURN_DATUM(HeapTupleGetDatum(htup));
455
0
}
456
457
/*
458
 * pg_xact_commit_timestamp_origin
459
 *
460
 * SQL-callable wrapper to obtain commit timestamp and replication origin
461
 * of a given transaction.
462
 */
463
Datum
464
pg_xact_commit_timestamp_origin(PG_FUNCTION_ARGS)
465
0
{
466
0
  TransactionId xid = PG_GETARG_TRANSACTIONID(0);
467
0
  RepOriginId nodeid;
468
0
  TimestampTz ts;
469
0
  Datum   values[2];
470
0
  bool    nulls[2];
471
0
  TupleDesc tupdesc;
472
0
  HeapTuple htup;
473
0
  bool    found;
474
475
0
  found = TransactionIdGetCommitTsData(xid, &ts, &nodeid);
476
477
0
  if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
478
0
    elog(ERROR, "return type must be a row type");
479
480
0
  if (!found)
481
0
  {
482
0
    memset(nulls, true, sizeof(nulls));
483
0
  }
484
0
  else
485
0
  {
486
0
    values[0] = TimestampTzGetDatum(ts);
487
0
    nulls[0] = false;
488
489
0
    values[1] = ObjectIdGetDatum((Oid) nodeid);
490
0
    nulls[1] = false;
491
0
  }
492
493
0
  htup = heap_form_tuple(tupdesc, values, nulls);
494
495
0
  PG_RETURN_DATUM(HeapTupleGetDatum(htup));
496
0
}
497
498
/*
499
 * Number of shared CommitTS buffers.
500
 *
501
 * If asked to autotune, use 2MB for every 1GB of shared buffers, up to 8MB.
502
 * Otherwise just cap the configured amount to be between 16 and the maximum
503
 * allowed.
504
 */
505
static int
506
CommitTsShmemBuffers(void)
507
0
{
508
  /* auto-tune based on shared buffers */
509
0
  if (commit_timestamp_buffers == 0)
510
0
    return SimpleLruAutotuneBuffers(512, 1024);
511
512
0
  return Min(Max(16, commit_timestamp_buffers), SLRU_MAX_ALLOWED_BUFFERS);
513
0
}
514
515
/*
516
 * Shared memory sizing for CommitTs
517
 */
518
Size
519
CommitTsShmemSize(void)
520
0
{
521
0
  return SimpleLruShmemSize(CommitTsShmemBuffers(), 0) +
522
0
    sizeof(CommitTimestampShared);
523
0
}
524
525
/*
526
 * Initialize CommitTs at system startup (postmaster start or standalone
527
 * backend)
528
 */
529
void
530
CommitTsShmemInit(void)
531
0
{
532
0
  bool    found;
533
534
  /* If auto-tuning is requested, now is the time to do it */
535
0
  if (commit_timestamp_buffers == 0)
536
0
  {
537
0
    char    buf[32];
538
539
0
    snprintf(buf, sizeof(buf), "%d", CommitTsShmemBuffers());
540
0
    SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
541
0
            PGC_S_DYNAMIC_DEFAULT);
542
543
    /*
544
     * We prefer to report this value's source as PGC_S_DYNAMIC_DEFAULT.
545
     * However, if the DBA explicitly set commit_timestamp_buffers = 0 in
546
     * the config file, then PGC_S_DYNAMIC_DEFAULT will fail to override
547
     * that and we must force the matter with PGC_S_OVERRIDE.
548
     */
549
0
    if (commit_timestamp_buffers == 0) /* failed to apply it? */
550
0
      SetConfigOption("commit_timestamp_buffers", buf, PGC_POSTMASTER,
551
0
              PGC_S_OVERRIDE);
552
0
  }
553
0
  Assert(commit_timestamp_buffers != 0);
554
555
0
  CommitTsCtl->PagePrecedes = CommitTsPagePrecedes;
556
0
  SimpleLruInit(CommitTsCtl, "commit_timestamp", CommitTsShmemBuffers(), 0,
557
0
          "pg_commit_ts", LWTRANCHE_COMMITTS_BUFFER,
558
0
          LWTRANCHE_COMMITTS_SLRU,
559
0
          SYNC_HANDLER_COMMIT_TS,
560
0
          false);
561
0
  SlruPagePrecedesUnitTests(CommitTsCtl, COMMIT_TS_XACTS_PER_PAGE);
562
563
0
  commitTsShared = ShmemInitStruct("CommitTs shared",
564
0
                   sizeof(CommitTimestampShared),
565
0
                   &found);
566
567
0
  if (!IsUnderPostmaster)
568
0
  {
569
0
    Assert(!found);
570
571
0
    commitTsShared->xidLastCommit = InvalidTransactionId;
572
0
    TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
573
0
    commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
574
0
    commitTsShared->commitTsActive = false;
575
0
  }
576
0
  else
577
0
    Assert(found);
578
0
}
579
580
/*
581
 * GUC check_hook for commit_timestamp_buffers
582
 */
583
bool
584
check_commit_ts_buffers(int *newval, void **extra, GucSource source)
585
2
{
586
2
  return check_slru_buffers("commit_timestamp_buffers", newval);
587
2
}
588
589
/*
590
 * This function must be called ONCE on system install.
591
 *
592
 * (The CommitTs directory is assumed to have been created by initdb, and
593
 * CommitTsShmemInit must have been called already.)
594
 */
595
void
596
BootStrapCommitTs(void)
597
0
{
598
  /*
599
   * Nothing to do here at present, unlike most other SLRU modules; segments
600
   * are created when the server is started with this module enabled. See
601
   * ActivateCommitTs.
602
   */
603
0
}
604
605
/*
606
 * Initialize (or reinitialize) a page of CommitTs to zeroes.
607
 * If writeXlog is true, also emit an XLOG record saying we did this.
608
 *
609
 * The page is not actually written, just set up in shared memory.
610
 * The slot number of the new page is returned.
611
 *
612
 * Control lock must be held at entry, and will be held at exit.
613
 */
614
static int
615
ZeroCommitTsPage(int64 pageno, bool writeXlog)
616
0
{
617
0
  int     slotno;
618
619
0
  slotno = SimpleLruZeroPage(CommitTsCtl, pageno);
620
621
0
  if (writeXlog)
622
0
    WriteZeroPageXlogRec(pageno);
623
624
0
  return slotno;
625
0
}
626
627
/*
628
 * This must be called ONCE during postmaster or standalone-backend startup,
629
 * after StartupXLOG has initialized TransamVariables->nextXid.
630
 */
631
void
632
StartupCommitTs(void)
633
0
{
634
0
  ActivateCommitTs();
635
0
}
636
637
/*
638
 * This must be called ONCE during postmaster or standalone-backend startup,
639
 * after recovery has finished.
640
 */
641
void
642
CompleteCommitTsInitialization(void)
643
0
{
644
  /*
645
   * If the feature is not enabled, turn it off for good.  This also removes
646
   * any leftover data.
647
   *
648
   * Conversely, we activate the module if the feature is enabled.  This is
649
   * necessary for primary and standby as the activation depends on the
650
   * control file contents at the beginning of recovery or when a
651
   * XLOG_PARAMETER_CHANGE is replayed.
652
   */
653
0
  if (!track_commit_timestamp)
654
0
    DeactivateCommitTs();
655
0
  else
656
0
    ActivateCommitTs();
657
0
}
658
659
/*
660
 * Activate or deactivate CommitTs' upon reception of a XLOG_PARAMETER_CHANGE
661
 * XLog record during recovery.
662
 */
663
void
664
CommitTsParameterChange(bool newvalue, bool oldvalue)
665
0
{
666
  /*
667
   * If the commit_ts module is disabled in this server and we get word from
668
   * the primary server that it is enabled there, activate it so that we can
669
   * replay future WAL records involving it; also mark it as active on
670
   * pg_control.  If the old value was already set, we already did this, so
671
   * don't do anything.
672
   *
673
   * If the module is disabled in the primary, disable it here too, unless
674
   * the module is enabled locally.
675
   *
676
   * Note this only runs in the recovery process, so an unlocked read is
677
   * fine.
678
   */
679
0
  if (newvalue)
680
0
  {
681
0
    if (!commitTsShared->commitTsActive)
682
0
      ActivateCommitTs();
683
0
  }
684
0
  else if (commitTsShared->commitTsActive)
685
0
    DeactivateCommitTs();
686
0
}
687
688
/*
689
 * Activate this module whenever necessary.
690
 *    This must happen during postmaster or standalone-backend startup,
691
 *    or during WAL replay anytime the track_commit_timestamp setting is
692
 *    changed in the primary.
693
 *
694
 * The reason why this SLRU needs separate activation/deactivation functions is
695
 * that it can be enabled/disabled during start and the activation/deactivation
696
 * on the primary is propagated to the standby via replay. Other SLRUs don't
697
 * have this property and they can be just initialized during normal startup.
698
 *
699
 * This is in charge of creating the currently active segment, if it's not
700
 * already there.  The reason for this is that the server might have been
701
 * running with this module disabled for a while and thus might have skipped
702
 * the normal creation point.
703
 */
704
static void
705
ActivateCommitTs(void)
706
0
{
707
0
  TransactionId xid;
708
0
  int64   pageno;
709
710
  /* If we've done this already, there's nothing to do */
711
0
  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
712
0
  if (commitTsShared->commitTsActive)
713
0
  {
714
0
    LWLockRelease(CommitTsLock);
715
0
    return;
716
0
  }
717
0
  LWLockRelease(CommitTsLock);
718
719
0
  xid = XidFromFullTransactionId(TransamVariables->nextXid);
720
0
  pageno = TransactionIdToCTsPage(xid);
721
722
  /*
723
   * Re-Initialize our idea of the latest page number.
724
   */
725
0
  pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number, pageno);
726
727
  /*
728
   * If CommitTs is enabled, but it wasn't in the previous server run, we
729
   * need to set the oldest and newest values to the next Xid; that way, we
730
   * will not try to read data that might not have been set.
731
   *
732
   * XXX does this have a problem if a server is started with commitTs
733
   * enabled, then started with commitTs disabled, then restarted with it
734
   * enabled again?  It doesn't look like it does, because there should be a
735
   * checkpoint that sets the value to InvalidTransactionId at end of
736
   * recovery; and so any chance of injecting new transactions without
737
   * CommitTs values would occur after the oldestCommitTsXid has been set to
738
   * Invalid temporarily.
739
   */
740
0
  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
741
0
  if (TransamVariables->oldestCommitTsXid == InvalidTransactionId)
742
0
  {
743
0
    TransamVariables->oldestCommitTsXid =
744
0
      TransamVariables->newestCommitTsXid = ReadNextTransactionId();
745
0
  }
746
0
  LWLockRelease(CommitTsLock);
747
748
  /* Create the current segment file, if necessary */
749
0
  if (!SimpleLruDoesPhysicalPageExist(CommitTsCtl, pageno))
750
0
  {
751
0
    LWLock     *lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
752
0
    int     slotno;
753
754
0
    LWLockAcquire(lock, LW_EXCLUSIVE);
755
0
    slotno = ZeroCommitTsPage(pageno, false);
756
0
    SimpleLruWritePage(CommitTsCtl, slotno);
757
0
    Assert(!CommitTsCtl->shared->page_dirty[slotno]);
758
0
    LWLockRelease(lock);
759
0
  }
760
761
  /* Change the activation status in shared memory. */
762
0
  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
763
0
  commitTsShared->commitTsActive = true;
764
0
  LWLockRelease(CommitTsLock);
765
0
}
766
767
/*
768
 * Deactivate this module.
769
 *
770
 * This must be called when the track_commit_timestamp parameter is turned off.
771
 * This happens during postmaster or standalone-backend startup, or during WAL
772
 * replay.
773
 *
774
 * Resets CommitTs into invalid state to make sure we don't hand back
775
 * possibly-invalid data; also removes segments of old data.
776
 */
777
static void
778
DeactivateCommitTs(void)
779
0
{
780
  /*
781
   * Cleanup the status in the shared memory.
782
   *
783
   * We reset everything in the commitTsShared record to prevent user from
784
   * getting confusing data about last committed transaction on the standby
785
   * when the module was activated repeatedly on the primary.
786
   */
787
0
  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
788
789
0
  commitTsShared->commitTsActive = false;
790
0
  commitTsShared->xidLastCommit = InvalidTransactionId;
791
0
  TIMESTAMP_NOBEGIN(commitTsShared->dataLastCommit.time);
792
0
  commitTsShared->dataLastCommit.nodeid = InvalidRepOriginId;
793
794
0
  TransamVariables->oldestCommitTsXid = InvalidTransactionId;
795
0
  TransamVariables->newestCommitTsXid = InvalidTransactionId;
796
797
  /*
798
   * Remove *all* files.  This is necessary so that there are no leftover
799
   * files; in the case where this feature is later enabled after running
800
   * with it disabled for some time there may be a gap in the file sequence.
801
   * (We can probably tolerate out-of-sequence files, as they are going to
802
   * be overwritten anyway when we wrap around, but it seems better to be
803
   * tidy.)
804
   *
805
   * Note that we do this with CommitTsLock acquired in exclusive mode. This
806
   * is very heavy-handed, but since this routine can only be called in the
807
   * replica and should happen very rarely, we don't worry too much about
808
   * it.  Note also that no process should be consulting this SLRU if we
809
   * have just deactivated it.
810
   */
811
0
  (void) SlruScanDirectory(CommitTsCtl, SlruScanDirCbDeleteAll, NULL);
812
813
0
  LWLockRelease(CommitTsLock);
814
0
}
815
816
/*
817
 * Perform a checkpoint --- either during shutdown, or on-the-fly
818
 */
819
void
820
CheckPointCommitTs(void)
821
0
{
822
  /*
823
   * Write dirty CommitTs pages to disk.  This may result in sync requests
824
   * queued for later handling by ProcessSyncRequests(), as part of the
825
   * checkpoint.
826
   */
827
0
  SimpleLruWriteAll(CommitTsCtl, true);
828
0
}
829
830
/*
831
 * Make sure that CommitTs has room for a newly-allocated XID.
832
 *
833
 * NB: this is called while holding XidGenLock.  We want it to be very fast
834
 * most of the time; even when it's not so fast, no actual I/O need happen
835
 * unless we're forced to write out a dirty CommitTs or xlog page to make room
836
 * in shared memory.
837
 *
838
 * NB: the current implementation relies on track_commit_timestamp being
839
 * PGC_POSTMASTER.
840
 */
841
void
842
ExtendCommitTs(TransactionId newestXact)
843
0
{
844
0
  int64   pageno;
845
0
  LWLock     *lock;
846
847
  /*
848
   * Nothing to do if module not enabled.  Note we do an unlocked read of
849
   * the flag here, which is okay because this routine is only called from
850
   * GetNewTransactionId, which is never called in a standby.
851
   */
852
0
  Assert(!InRecovery);
853
0
  if (!commitTsShared->commitTsActive)
854
0
    return;
855
856
  /*
857
   * No work except at first XID of a page.  But beware: just after
858
   * wraparound, the first XID of page zero is FirstNormalTransactionId.
859
   */
860
0
  if (TransactionIdToCTsEntry(newestXact) != 0 &&
861
0
    !TransactionIdEquals(newestXact, FirstNormalTransactionId))
862
0
    return;
863
864
0
  pageno = TransactionIdToCTsPage(newestXact);
865
866
0
  lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
867
868
0
  LWLockAcquire(lock, LW_EXCLUSIVE);
869
870
  /* Zero the page and make an XLOG entry about it */
871
0
  ZeroCommitTsPage(pageno, !InRecovery);
872
873
0
  LWLockRelease(lock);
874
0
}
875
876
/*
877
 * Remove all CommitTs segments before the one holding the passed
878
 * transaction ID.
879
 *
880
 * Note that we don't need to flush XLOG here.
881
 */
882
void
883
TruncateCommitTs(TransactionId oldestXact)
884
0
{
885
0
  int64   cutoffPage;
886
887
  /*
888
   * The cutoff point is the start of the segment containing oldestXact. We
889
   * pass the *page* containing oldestXact to SimpleLruTruncate.
890
   */
891
0
  cutoffPage = TransactionIdToCTsPage(oldestXact);
892
893
  /* Check to see if there's any files that could be removed */
894
0
  if (!SlruScanDirectory(CommitTsCtl, SlruScanDirCbReportPresence,
895
0
               &cutoffPage))
896
0
    return;         /* nothing to remove */
897
898
  /* Write XLOG record */
899
0
  WriteTruncateXlogRec(cutoffPage, oldestXact);
900
901
  /* Now we can remove the old CommitTs segment(s) */
902
0
  SimpleLruTruncate(CommitTsCtl, cutoffPage);
903
0
}
904
905
/*
906
 * Set the limit values between which commit TS can be consulted.
907
 */
908
void
909
SetCommitTsLimit(TransactionId oldestXact, TransactionId newestXact)
910
0
{
911
  /*
912
   * Be careful not to overwrite values that are either further into the
913
   * "future" or signal a disabled committs.
914
   */
915
0
  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
916
0
  if (TransamVariables->oldestCommitTsXid != InvalidTransactionId)
917
0
  {
918
0
    if (TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
919
0
      TransamVariables->oldestCommitTsXid = oldestXact;
920
0
    if (TransactionIdPrecedes(newestXact, TransamVariables->newestCommitTsXid))
921
0
      TransamVariables->newestCommitTsXid = newestXact;
922
0
  }
923
0
  else
924
0
  {
925
0
    Assert(TransamVariables->newestCommitTsXid == InvalidTransactionId);
926
0
    TransamVariables->oldestCommitTsXid = oldestXact;
927
0
    TransamVariables->newestCommitTsXid = newestXact;
928
0
  }
929
0
  LWLockRelease(CommitTsLock);
930
0
}
931
932
/*
933
 * Move forwards the oldest commitTS value that can be consulted
934
 */
935
void
936
AdvanceOldestCommitTsXid(TransactionId oldestXact)
937
0
{
938
0
  LWLockAcquire(CommitTsLock, LW_EXCLUSIVE);
939
0
  if (TransamVariables->oldestCommitTsXid != InvalidTransactionId &&
940
0
    TransactionIdPrecedes(TransamVariables->oldestCommitTsXid, oldestXact))
941
0
    TransamVariables->oldestCommitTsXid = oldestXact;
942
0
  LWLockRelease(CommitTsLock);
943
0
}
944
945
946
/*
947
 * Decide whether a commitTS page number is "older" for truncation purposes.
948
 * Analogous to CLOGPagePrecedes().
949
 *
950
 * At default BLCKSZ, (1 << 31) % COMMIT_TS_XACTS_PER_PAGE == 128.  This
951
 * introduces differences compared to CLOG and the other SLRUs having (1 <<
952
 * 31) % per_page == 0.  This function never tests exactly
953
 * TransactionIdPrecedes(x-2^31, x).  When the system reaches xidStopLimit,
954
 * there are two possible counts of page boundaries between oldestXact and the
955
 * latest XID assigned, depending on whether oldestXact is within the first
956
 * 128 entries of its page.  Since this function doesn't know the location of
957
 * oldestXact within page2, it returns false for one page that actually is
958
 * expendable.  This is a wider (yet still negligible) version of the
959
 * truncation opportunity that CLOGPagePrecedes() cannot recognize.
960
 *
961
 * For the sake of a worked example, number entries with decimal values such
962
 * that page1==1 entries range from 1.0 to 1.999.  Let N+0.15 be the number of
963
 * pages that 2^31 entries will span (N is an integer).  If oldestXact=N+2.1,
964
 * then the final safe XID assignment leaves newestXact=1.95.  We keep page 2,
965
 * because entry=2.85 is the border that toggles whether entries precede the
966
 * last entry of the oldestXact page.  While page 2 is expendable at
967
 * oldestXact=N+2.1, it would be precious at oldestXact=N+2.9.
968
 */
969
static bool
970
CommitTsPagePrecedes(int64 page1, int64 page2)
971
0
{
972
0
  TransactionId xid1;
973
0
  TransactionId xid2;
974
975
0
  xid1 = ((TransactionId) page1) * COMMIT_TS_XACTS_PER_PAGE;
976
0
  xid1 += FirstNormalTransactionId + 1;
977
0
  xid2 = ((TransactionId) page2) * COMMIT_TS_XACTS_PER_PAGE;
978
0
  xid2 += FirstNormalTransactionId + 1;
979
980
0
  return (TransactionIdPrecedes(xid1, xid2) &&
981
0
      TransactionIdPrecedes(xid1, xid2 + COMMIT_TS_XACTS_PER_PAGE - 1));
982
0
}
983
984
985
/*
986
 * Write a ZEROPAGE xlog record
987
 */
988
static void
989
WriteZeroPageXlogRec(int64 pageno)
990
0
{
991
0
  XLogBeginInsert();
992
0
  XLogRegisterData(&pageno, sizeof(pageno));
993
0
  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_ZEROPAGE);
994
0
}
995
996
/*
997
 * Write a TRUNCATE xlog record
998
 */
999
static void
1000
WriteTruncateXlogRec(int64 pageno, TransactionId oldestXid)
1001
0
{
1002
0
  xl_commit_ts_truncate xlrec;
1003
1004
0
  xlrec.pageno = pageno;
1005
0
  xlrec.oldestXid = oldestXid;
1006
1007
0
  XLogBeginInsert();
1008
0
  XLogRegisterData(&xlrec, SizeOfCommitTsTruncate);
1009
0
  (void) XLogInsert(RM_COMMIT_TS_ID, COMMIT_TS_TRUNCATE);
1010
0
}
1011
1012
/*
1013
 * CommitTS resource manager's routines
1014
 */
1015
void
1016
commit_ts_redo(XLogReaderState *record)
1017
0
{
1018
0
  uint8   info = XLogRecGetInfo(record) & ~XLR_INFO_MASK;
1019
1020
  /* Backup blocks are not used in commit_ts records */
1021
0
  Assert(!XLogRecHasAnyBlockRefs(record));
1022
1023
0
  if (info == COMMIT_TS_ZEROPAGE)
1024
0
  {
1025
0
    int64   pageno;
1026
0
    int     slotno;
1027
0
    LWLock     *lock;
1028
1029
0
    memcpy(&pageno, XLogRecGetData(record), sizeof(pageno));
1030
1031
0
    lock = SimpleLruGetBankLock(CommitTsCtl, pageno);
1032
0
    LWLockAcquire(lock, LW_EXCLUSIVE);
1033
1034
0
    slotno = ZeroCommitTsPage(pageno, false);
1035
0
    SimpleLruWritePage(CommitTsCtl, slotno);
1036
0
    Assert(!CommitTsCtl->shared->page_dirty[slotno]);
1037
1038
0
    LWLockRelease(lock);
1039
0
  }
1040
0
  else if (info == COMMIT_TS_TRUNCATE)
1041
0
  {
1042
0
    xl_commit_ts_truncate *trunc = (xl_commit_ts_truncate *) XLogRecGetData(record);
1043
1044
0
    AdvanceOldestCommitTsXid(trunc->oldestXid);
1045
1046
    /*
1047
     * During XLOG replay, latest_page_number isn't set up yet; insert a
1048
     * suitable value to bypass the sanity test in SimpleLruTruncate.
1049
     */
1050
0
    pg_atomic_write_u64(&CommitTsCtl->shared->latest_page_number,
1051
0
              trunc->pageno);
1052
1053
0
    SimpleLruTruncate(CommitTsCtl, trunc->pageno);
1054
0
  }
1055
0
  else
1056
0
    elog(PANIC, "commit_ts_redo: unknown op code %u", info);
1057
0
}
1058
1059
/*
1060
 * Entrypoint for sync.c to sync commit_ts files.
1061
 */
1062
int
1063
committssyncfiletag(const FileTag *ftag, char *path)
1064
0
{
1065
0
  return SlruSyncFileTag(CommitTsCtl, ftag, path);
1066
0
}