Coverage Report

Created: 2025-08-12 06:43

/src/postgres/src/backend/storage/ipc/sinvaladt.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * sinvaladt.c
4
 *    POSTGRES shared cache invalidation data manager.
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/storage/ipc/sinvaladt.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
#include "postgres.h"
16
17
#include <signal.h>
18
#include <unistd.h>
19
20
#include "miscadmin.h"
21
#include "storage/ipc.h"
22
#include "storage/proc.h"
23
#include "storage/procnumber.h"
24
#include "storage/procsignal.h"
25
#include "storage/shmem.h"
26
#include "storage/sinvaladt.h"
27
#include "storage/spin.h"
28
29
/*
30
 * Conceptually, the shared cache invalidation messages are stored in an
31
 * infinite array, where maxMsgNum is the next array subscript to store a
32
 * submitted message in, minMsgNum is the smallest array subscript containing
33
 * a message not yet read by all backends, and we always have maxMsgNum >=
34
 * minMsgNum.  (They are equal when there are no messages pending.)  For each
35
 * active backend, there is a nextMsgNum pointer indicating the next message it
36
 * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every
37
 * backend.
38
 *
39
 * (In the current implementation, minMsgNum is a lower bound for the
40
 * per-process nextMsgNum values, but it isn't rigorously kept equal to the
41
 * smallest nextMsgNum --- it may lag behind.  We only update it when
42
 * SICleanupQueue is called, and we try not to do that often.)
43
 *
44
 * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES
45
 * entries.  We translate MsgNum values into circular-buffer indexes by
46
 * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as
47
 * MAXNUMMESSAGES is a constant and a power of 2).  As long as maxMsgNum
48
 * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space
49
 * in the buffer.  If the buffer does overflow, we recover by setting the
50
 * "reset" flag for each backend that has fallen too far behind.  A backend
51
 * that is in "reset" state is ignored while determining minMsgNum.  When
52
 * it does finally attempt to receive inval messages, it must discard all
53
 * its invalidatable state, since it won't know what it missed.
54
 *
55
 * To reduce the probability of needing resets, we send a "catchup" interrupt
56
 * to any backend that seems to be falling unreasonably far behind.  The
57
 * normal behavior is that at most one such interrupt is in flight at a time;
58
 * when a backend completes processing a catchup interrupt, it executes
59
 * SICleanupQueue, which will signal the next-furthest-behind backend if
60
 * needed.  This avoids undue contention from multiple backends all trying
61
 * to catch up at once.  However, the furthest-back backend might be stuck
62
 * in a state where it can't catch up.  Eventually it will get reset, so it
63
 * won't cause any more problems for anyone but itself.  But we don't want
64
 * to find that a bunch of other backends are now too close to the reset
65
 * threshold to be saved.  So SICleanupQueue is designed to occasionally
66
 * send extra catchup interrupts as the queue gets fuller, to backends that
67
 * are far behind and haven't gotten one yet.  As long as there aren't a lot
68
 * of "stuck" backends, we won't need a lot of extra interrupts, since ones
69
 * that aren't stuck will propagate their interrupts to the next guy.
70
 *
71
 * We would have problems if the MsgNum values overflow an integer, so
72
 * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND
73
 * from all the MsgNum variables simultaneously.  MSGNUMWRAPAROUND can be
74
 * large so that we don't need to do this often.  It must be a multiple of
75
 * MAXNUMMESSAGES so that the existing circular-buffer entries don't need
76
 * to be moved when we do it.
77
 *
78
 * Access to the shared sinval array is protected by two locks, SInvalReadLock
79
 * and SInvalWriteLock.  Readers take SInvalReadLock in shared mode; this
80
 * authorizes them to modify their own ProcState but not to modify or even
81
 * look at anyone else's.  When we need to perform array-wide updates,
82
 * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to
83
 * lock out all readers.  Writers take SInvalWriteLock (always in exclusive
84
 * mode) to serialize adding messages to the queue.  Note that a writer
85
 * can operate in parallel with one or more readers, because the writer
86
 * has no need to touch anyone's ProcState, except in the infrequent cases
87
 * when SICleanupQueue is needed.  The only point of overlap is that
88
 * the writer wants to change maxMsgNum while readers need to read it.
89
 * We deal with that by having a spinlock that readers must take for just
90
 * long enough to read maxMsgNum, while writers take it for just long enough
91
 * to write maxMsgNum.  (The exact rule is that you need the spinlock to
92
 * read maxMsgNum if you are not holding SInvalWriteLock, and you need the
93
 * spinlock to write maxMsgNum unless you are holding both locks.)
94
 *
95
 * Note: since maxMsgNum is an int and hence presumably atomically readable/
96
 * writable, the spinlock might seem unnecessary.  The reason it is needed
97
 * is to provide a memory barrier: we need to be sure that messages written
98
 * to the array are actually there before maxMsgNum is increased, and that
99
 * readers will see that data after fetching maxMsgNum.  Multiprocessors
100
 * that have weak memory-ordering guarantees can fail without the memory
101
 * barrier instructions that are included in the spinlock sequences.
102
 */
103
104
105
/*
106
 * Configurable parameters.
107
 *
108
 * MAXNUMMESSAGES: max number of shared-inval messages we can buffer.
109
 * Must be a power of 2 for speed.
110
 *
111
 * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow.
112
 * Must be a multiple of MAXNUMMESSAGES.  Should be large.
113
 *
114
 * CLEANUP_MIN: the minimum number of messages that must be in the buffer
115
 * before we bother to call SICleanupQueue.
116
 *
117
 * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once
118
 * we exceed CLEANUP_MIN.  Should be a power of 2 for speed.
119
 *
120
 * SIG_THRESHOLD: the minimum number of messages a backend must have fallen
121
 * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT.
122
 *
123
 * WRITE_QUANTUM: the max number of messages to push into the buffer per
124
 * iteration of SIInsertDataEntries.  Noncritical but should be less than
125
 * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once
126
 * per iteration.
127
 */
128
129
0
#define MAXNUMMESSAGES 4096
130
0
#define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144)
131
0
#define CLEANUP_MIN (MAXNUMMESSAGES / 2)
132
0
#define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16)
133
0
#define SIG_THRESHOLD (MAXNUMMESSAGES / 2)
134
#define WRITE_QUANTUM 64
135
136
/* Per-backend state in shared invalidation structure */
137
typedef struct ProcState
138
{
139
  /* procPid is zero in an inactive ProcState array entry. */
140
  pid_t   procPid;    /* PID of backend, for signaling */
141
  /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */
142
  int     nextMsgNum;   /* next message number to read */
143
  bool    resetState;   /* backend needs to reset its state */
144
  bool    signaled;   /* backend has been sent catchup signal */
145
  bool    hasMessages;  /* backend has unread messages */
146
147
  /*
148
   * Backend only sends invalidations, never receives them. This only makes
149
   * sense for Startup process during recovery because it doesn't maintain a
150
   * relcache, yet it fires inval messages to allow query backends to see
151
   * schema changes.
152
   */
153
  bool    sendOnly;   /* backend only sends, never receives */
154
155
  /*
156
   * Next LocalTransactionId to use for each idle backend slot.  We keep
157
   * this here because it is indexed by ProcNumber and it is convenient to
158
   * copy the value to and from local memory when MyProcNumber is set. It's
159
   * meaningless in an active ProcState entry.
160
   */
161
  LocalTransactionId nextLXID;
162
} ProcState;
163
164
/* Shared cache invalidation memory segment */
165
typedef struct SISeg
166
{
167
  /*
168
   * General state information
169
   */
170
  int     minMsgNum;    /* oldest message still needed */
171
  int     maxMsgNum;    /* next message number to be assigned */
172
  int     nextThreshold;  /* # of messages to call SICleanupQueue */
173
174
  slock_t   msgnumLock;   /* spinlock protecting maxMsgNum */
175
176
  /*
177
   * Circular buffer holding shared-inval messages
178
   */
179
  SharedInvalidationMessage buffer[MAXNUMMESSAGES];
180
181
  /*
182
   * Per-backend invalidation state info.
183
   *
184
   * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno.
185
   * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is
186
   * a dense array of their indexes, to speed up scanning all in-use slots.
187
   *
188
   * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but
189
   * having our separate copy avoids contention on ProcArrayLock, and allows
190
   * us to track only the processes that participate in shared cache
191
   * invalidations.
192
   */
193
  int     numProcs;
194
  int      *pgprocnos;
195
  ProcState procState[FLEXIBLE_ARRAY_MEMBER];
196
} SISeg;
197
198
/*
199
 * We reserve a slot for each possible ProcNumber, plus one for each
200
 * possible auxiliary process type.  (This scheme assumes there is not
201
 * more than one of any auxiliary process type at a time, except for
202
 * IO workers.)
203
 */
204
0
#define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS)
205
206
static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */
207
208
209
static LocalTransactionId nextLocalTransactionId;
210
211
static void CleanupInvalidationState(int status, Datum arg);
212
213
214
/*
215
 * SharedInvalShmemSize --- return shared-memory space needed
216
 */
217
Size
218
SharedInvalShmemSize(void)
219
0
{
220
0
  Size    size;
221
222
0
  size = offsetof(SISeg, procState);
223
0
  size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */
224
0
  size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */
225
226
0
  return size;
227
0
}
228
229
/*
230
 * SharedInvalShmemInit
231
 *    Create and initialize the SI message buffer
232
 */
233
void
234
SharedInvalShmemInit(void)
235
0
{
236
0
  int     i;
237
0
  bool    found;
238
239
  /* Allocate space in shared memory */
240
0
  shmInvalBuffer = (SISeg *)
241
0
    ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found);
242
0
  if (found)
243
0
    return;
244
245
  /* Clear message counters, save size of procState array, init spinlock */
246
0
  shmInvalBuffer->minMsgNum = 0;
247
0
  shmInvalBuffer->maxMsgNum = 0;
248
0
  shmInvalBuffer->nextThreshold = CLEANUP_MIN;
249
0
  SpinLockInit(&shmInvalBuffer->msgnumLock);
250
251
  /* The buffer[] array is initially all unused, so we need not fill it */
252
253
  /* Mark all backends inactive, and initialize nextLXID */
254
0
  for (i = 0; i < NumProcStateSlots; i++)
255
0
  {
256
0
    shmInvalBuffer->procState[i].procPid = 0; /* inactive */
257
0
    shmInvalBuffer->procState[i].nextMsgNum = 0;  /* meaningless */
258
0
    shmInvalBuffer->procState[i].resetState = false;
259
0
    shmInvalBuffer->procState[i].signaled = false;
260
0
    shmInvalBuffer->procState[i].hasMessages = false;
261
0
    shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId;
262
0
  }
263
0
  shmInvalBuffer->numProcs = 0;
264
0
  shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i];
265
0
}
266
267
/*
268
 * SharedInvalBackendInit
269
 *    Initialize a new backend to operate on the sinval buffer
270
 */
271
void
272
SharedInvalBackendInit(bool sendOnly)
273
0
{
274
0
  ProcState  *stateP;
275
0
  pid_t   oldPid;
276
0
  SISeg    *segP = shmInvalBuffer;
277
278
0
  if (MyProcNumber < 0)
279
0
    elog(ERROR, "MyProcNumber not set");
280
0
  if (MyProcNumber >= NumProcStateSlots)
281
0
    elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)",
282
0
       MyProcNumber, NumProcStateSlots);
283
0
  stateP = &segP->procState[MyProcNumber];
284
285
  /*
286
   * This can run in parallel with read operations, but not with write
287
   * operations, since SIInsertDataEntries relies on the pgprocnos array to
288
   * set hasMessages appropriately.
289
   */
290
0
  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
291
292
0
  oldPid = stateP->procPid;
293
0
  if (oldPid != 0)
294
0
  {
295
0
    LWLockRelease(SInvalWriteLock);
296
0
    elog(ERROR, "sinval slot for backend %d is already in use by process %d",
297
0
       MyProcNumber, (int) oldPid);
298
0
  }
299
300
0
  shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber;
301
302
  /* Fetch next local transaction ID into local memory */
303
0
  nextLocalTransactionId = stateP->nextLXID;
304
305
  /* mark myself active, with all extant messages already read */
306
0
  stateP->procPid = MyProcPid;
307
0
  stateP->nextMsgNum = segP->maxMsgNum;
308
0
  stateP->resetState = false;
309
0
  stateP->signaled = false;
310
0
  stateP->hasMessages = false;
311
0
  stateP->sendOnly = sendOnly;
312
313
0
  LWLockRelease(SInvalWriteLock);
314
315
  /* register exit routine to mark my entry inactive at exit */
316
0
  on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP));
317
0
}
318
319
/*
320
 * CleanupInvalidationState
321
 *    Mark the current backend as no longer active.
322
 *
323
 * This function is called via on_shmem_exit() during backend shutdown.
324
 *
325
 * arg is really of type "SISeg*".
326
 */
327
static void
328
CleanupInvalidationState(int status, Datum arg)
329
0
{
330
0
  SISeg    *segP = (SISeg *) DatumGetPointer(arg);
331
0
  ProcState  *stateP;
332
0
  int     i;
333
334
0
  Assert(PointerIsValid(segP));
335
336
0
  LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
337
338
0
  stateP = &segP->procState[MyProcNumber];
339
340
  /* Update next local transaction ID for next holder of this proc number */
341
0
  stateP->nextLXID = nextLocalTransactionId;
342
343
  /* Mark myself inactive */
344
0
  stateP->procPid = 0;
345
0
  stateP->nextMsgNum = 0;
346
0
  stateP->resetState = false;
347
0
  stateP->signaled = false;
348
349
0
  for (i = segP->numProcs - 1; i >= 0; i--)
350
0
  {
351
0
    if (segP->pgprocnos[i] == MyProcNumber)
352
0
    {
353
0
      if (i != segP->numProcs - 1)
354
0
        segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1];
355
0
      break;
356
0
    }
357
0
  }
358
0
  if (i < 0)
359
0
    elog(PANIC, "could not find entry in sinval array");
360
0
  segP->numProcs--;
361
362
0
  LWLockRelease(SInvalWriteLock);
363
0
}
364
365
/*
366
 * SIInsertDataEntries
367
 *    Add new invalidation message(s) to the buffer.
368
 */
369
void
370
SIInsertDataEntries(const SharedInvalidationMessage *data, int n)
371
0
{
372
0
  SISeg    *segP = shmInvalBuffer;
373
374
  /*
375
   * N can be arbitrarily large.  We divide the work into groups of no more
376
   * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for
377
   * an unreasonably long time.  (This is not so much because we care about
378
   * letting in other writers, as that some just-caught-up backend might be
379
   * trying to do SICleanupQueue to pass on its signal, and we don't want it
380
   * to have to wait a long time.)  Also, we need to consider calling
381
   * SICleanupQueue every so often.
382
   */
383
0
  while (n > 0)
384
0
  {
385
0
    int     nthistime = Min(n, WRITE_QUANTUM);
386
0
    int     numMsgs;
387
0
    int     max;
388
0
    int     i;
389
390
0
    n -= nthistime;
391
392
0
    LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
393
394
    /*
395
     * If the buffer is full, we *must* acquire some space.  Clean the
396
     * queue and reset anyone who is preventing space from being freed.
397
     * Otherwise, clean the queue only when it's exceeded the next
398
     * fullness threshold.  We have to loop and recheck the buffer state
399
     * after any call of SICleanupQueue.
400
     */
401
0
    for (;;)
402
0
    {
403
0
      numMsgs = segP->maxMsgNum - segP->minMsgNum;
404
0
      if (numMsgs + nthistime > MAXNUMMESSAGES ||
405
0
        numMsgs >= segP->nextThreshold)
406
0
        SICleanupQueue(true, nthistime);
407
0
      else
408
0
        break;
409
0
    }
410
411
    /*
412
     * Insert new message(s) into proper slot of circular buffer
413
     */
414
0
    max = segP->maxMsgNum;
415
0
    while (nthistime-- > 0)
416
0
    {
417
0
      segP->buffer[max % MAXNUMMESSAGES] = *data++;
418
0
      max++;
419
0
    }
420
421
    /* Update current value of maxMsgNum using spinlock */
422
0
    SpinLockAcquire(&segP->msgnumLock);
423
0
    segP->maxMsgNum = max;
424
0
    SpinLockRelease(&segP->msgnumLock);
425
426
    /*
427
     * Now that the maxMsgNum change is globally visible, we give everyone
428
     * a swift kick to make sure they read the newly added messages.
429
     * Releasing SInvalWriteLock will enforce a full memory barrier, so
430
     * these (unlocked) changes will be committed to memory before we exit
431
     * the function.
432
     */
433
0
    for (i = 0; i < segP->numProcs; i++)
434
0
    {
435
0
      ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
436
437
0
      stateP->hasMessages = true;
438
0
    }
439
440
0
    LWLockRelease(SInvalWriteLock);
441
0
  }
442
0
}
443
444
/*
445
 * SIGetDataEntries
446
 *    get next SI message(s) for current backend, if there are any
447
 *
448
 * Possible return values:
449
 *  0:   no SI message available
450
 *  n>0: next n SI messages have been extracted into data[]
451
 * -1:   SI reset message extracted
452
 *
453
 * If the return value is less than the array size "datasize", the caller
454
 * can assume that there are no more SI messages after the one(s) returned.
455
 * Otherwise, another call is needed to collect more messages.
456
 *
457
 * NB: this can run in parallel with other instances of SIGetDataEntries
458
 * executing on behalf of other backends, since each instance will modify only
459
 * fields of its own backend's ProcState, and no instance will look at fields
460
 * of other backends' ProcStates.  We express this by grabbing SInvalReadLock
461
 * in shared mode.  Note that this is not exactly the normal (read-only)
462
 * interpretation of a shared lock! Look closely at the interactions before
463
 * allowing SInvalReadLock to be grabbed in shared mode for any other reason!
464
 *
465
 * NB: this can also run in parallel with SIInsertDataEntries.  It is not
466
 * guaranteed that we will return any messages added after the routine is
467
 * entered.
468
 *
469
 * Note: we assume that "datasize" is not so large that it might be important
470
 * to break our hold on SInvalReadLock into segments.
471
 */
472
int
473
SIGetDataEntries(SharedInvalidationMessage *data, int datasize)
474
0
{
475
0
  SISeg    *segP;
476
0
  ProcState  *stateP;
477
0
  int     max;
478
0
  int     n;
479
480
0
  segP = shmInvalBuffer;
481
0
  stateP = &segP->procState[MyProcNumber];
482
483
  /*
484
   * Before starting to take locks, do a quick, unlocked test to see whether
485
   * there can possibly be anything to read.  On a multiprocessor system,
486
   * it's possible that this load could migrate backwards and occur before
487
   * we actually enter this function, so we might miss a sinval message that
488
   * was just added by some other processor.  But they can't migrate
489
   * backwards over a preceding lock acquisition, so it should be OK.  If we
490
   * haven't acquired a lock preventing against further relevant
491
   * invalidations, any such occurrence is not much different than if the
492
   * invalidation had arrived slightly later in the first place.
493
   */
494
0
  if (!stateP->hasMessages)
495
0
    return 0;
496
497
0
  LWLockAcquire(SInvalReadLock, LW_SHARED);
498
499
  /*
500
   * We must reset hasMessages before determining how many messages we're
501
   * going to read.  That way, if new messages arrive after we have
502
   * determined how many we're reading, the flag will get reset and we'll
503
   * notice those messages part-way through.
504
   *
505
   * Note that, if we don't end up reading all of the messages, we had
506
   * better be certain to reset this flag before exiting!
507
   */
508
0
  stateP->hasMessages = false;
509
510
  /* Fetch current value of maxMsgNum using spinlock */
511
0
  SpinLockAcquire(&segP->msgnumLock);
512
0
  max = segP->maxMsgNum;
513
0
  SpinLockRelease(&segP->msgnumLock);
514
515
0
  if (stateP->resetState)
516
0
  {
517
    /*
518
     * Force reset.  We can say we have dealt with any messages added
519
     * since the reset, as well; and that means we should clear the
520
     * signaled flag, too.
521
     */
522
0
    stateP->nextMsgNum = max;
523
0
    stateP->resetState = false;
524
0
    stateP->signaled = false;
525
0
    LWLockRelease(SInvalReadLock);
526
0
    return -1;
527
0
  }
528
529
  /*
530
   * Retrieve messages and advance backend's counter, until data array is
531
   * full or there are no more messages.
532
   *
533
   * There may be other backends that haven't read the message(s), so we
534
   * cannot delete them here.  SICleanupQueue() will eventually remove them
535
   * from the queue.
536
   */
537
0
  n = 0;
538
0
  while (n < datasize && stateP->nextMsgNum < max)
539
0
  {
540
0
    data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES];
541
0
    stateP->nextMsgNum++;
542
0
  }
543
544
  /*
545
   * If we have caught up completely, reset our "signaled" flag so that
546
   * we'll get another signal if we fall behind again.
547
   *
548
   * If we haven't caught up completely, reset the hasMessages flag so that
549
   * we see the remaining messages next time.
550
   */
551
0
  if (stateP->nextMsgNum >= max)
552
0
    stateP->signaled = false;
553
0
  else
554
0
    stateP->hasMessages = true;
555
556
0
  LWLockRelease(SInvalReadLock);
557
0
  return n;
558
0
}
559
560
/*
561
 * SICleanupQueue
562
 *    Remove messages that have been consumed by all active backends
563
 *
564
 * callerHasWriteLock is true if caller is holding SInvalWriteLock.
565
 * minFree is the minimum number of message slots to make free.
566
 *
567
 * Possible side effects of this routine include marking one or more
568
 * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT
569
 * to some backend that seems to be getting too far behind.  We signal at
570
 * most one backend at a time, for reasons explained at the top of the file.
571
 *
572
 * Caution: because we transiently release write lock when we have to signal
573
 * some other backend, it is NOT guaranteed that there are still minFree
574
 * free message slots at exit.  Caller must recheck and perhaps retry.
575
 */
576
void
577
SICleanupQueue(bool callerHasWriteLock, int minFree)
578
0
{
579
0
  SISeg    *segP = shmInvalBuffer;
580
0
  int     min,
581
0
        minsig,
582
0
        lowbound,
583
0
        numMsgs,
584
0
        i;
585
0
  ProcState  *needSig = NULL;
586
587
  /* Lock out all writers and readers */
588
0
  if (!callerHasWriteLock)
589
0
    LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
590
0
  LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE);
591
592
  /*
593
   * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the
594
   * furthest-back backend that needs signaling (if any), and reset any
595
   * backends that are too far back.  Note that because we ignore sendOnly
596
   * backends here it is possible for them to keep sending messages without
597
   * a problem even when they are the only active backend.
598
   */
599
0
  min = segP->maxMsgNum;
600
0
  minsig = min - SIG_THRESHOLD;
601
0
  lowbound = min - MAXNUMMESSAGES + minFree;
602
603
0
  for (i = 0; i < segP->numProcs; i++)
604
0
  {
605
0
    ProcState  *stateP = &segP->procState[segP->pgprocnos[i]];
606
0
    int     n = stateP->nextMsgNum;
607
608
    /* Ignore if already in reset state */
609
0
    Assert(stateP->procPid != 0);
610
0
    if (stateP->resetState || stateP->sendOnly)
611
0
      continue;
612
613
    /*
614
     * If we must free some space and this backend is preventing it, force
615
     * him into reset state and then ignore until he catches up.
616
     */
617
0
    if (n < lowbound)
618
0
    {
619
0
      stateP->resetState = true;
620
      /* no point in signaling him ... */
621
0
      continue;
622
0
    }
623
624
    /* Track the global minimum nextMsgNum */
625
0
    if (n < min)
626
0
      min = n;
627
628
    /* Also see who's furthest back of the unsignaled backends */
629
0
    if (n < minsig && !stateP->signaled)
630
0
    {
631
0
      minsig = n;
632
0
      needSig = stateP;
633
0
    }
634
0
  }
635
0
  segP->minMsgNum = min;
636
637
  /*
638
   * When minMsgNum gets really large, decrement all message counters so as
639
   * to forestall overflow of the counters.  This happens seldom enough that
640
   * folding it into the previous loop would be a loser.
641
   */
642
0
  if (min >= MSGNUMWRAPAROUND)
643
0
  {
644
0
    segP->minMsgNum -= MSGNUMWRAPAROUND;
645
0
    segP->maxMsgNum -= MSGNUMWRAPAROUND;
646
0
    for (i = 0; i < segP->numProcs; i++)
647
0
      segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND;
648
0
  }
649
650
  /*
651
   * Determine how many messages are still in the queue, and set the
652
   * threshold at which we should repeat SICleanupQueue().
653
   */
654
0
  numMsgs = segP->maxMsgNum - segP->minMsgNum;
655
0
  if (numMsgs < CLEANUP_MIN)
656
0
    segP->nextThreshold = CLEANUP_MIN;
657
0
  else
658
0
    segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM;
659
660
  /*
661
   * Lastly, signal anyone who needs a catchup interrupt.  Since
662
   * SendProcSignal() might not be fast, we don't want to hold locks while
663
   * executing it.
664
   */
665
0
  if (needSig)
666
0
  {
667
0
    pid_t   his_pid = needSig->procPid;
668
0
    ProcNumber  his_procNumber = (needSig - &segP->procState[0]);
669
670
0
    needSig->signaled = true;
671
0
    LWLockRelease(SInvalReadLock);
672
0
    LWLockRelease(SInvalWriteLock);
673
0
    elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid);
674
0
    SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber);
675
0
    if (callerHasWriteLock)
676
0
      LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE);
677
0
  }
678
0
  else
679
0
  {
680
0
    LWLockRelease(SInvalReadLock);
681
0
    if (!callerHasWriteLock)
682
0
      LWLockRelease(SInvalWriteLock);
683
0
  }
684
0
}
685
686
687
/*
688
 * GetNextLocalTransactionId --- allocate a new LocalTransactionId
689
 *
690
 * We split VirtualTransactionIds into two parts so that it is possible
691
 * to allocate a new one without any contention for shared memory, except
692
 * for a bit of additional overhead during backend startup/shutdown.
693
 * The high-order part of a VirtualTransactionId is a ProcNumber, and the
694
 * low-order part is a LocalTransactionId, which we assign from a local
695
 * counter.  To avoid the risk of a VirtualTransactionId being reused
696
 * within a short interval, successive procs occupying the same PGPROC slot
697
 * should use a consecutive sequence of local IDs, which is implemented
698
 * by copying nextLocalTransactionId as seen above.
699
 */
700
LocalTransactionId
701
GetNextLocalTransactionId(void)
702
0
{
703
0
  LocalTransactionId result;
704
705
  /* loop to avoid returning InvalidLocalTransactionId at wraparound */
706
0
  do
707
0
  {
708
0
    result = nextLocalTransactionId++;
709
0
  } while (!LocalTransactionIdIsValid(result));
710
711
0
  return result;
712
0
}