/src/postgres/src/backend/storage/ipc/sinvaladt.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * sinvaladt.c |
4 | | * POSTGRES shared cache invalidation data manager. |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/storage/ipc/sinvaladt.c |
12 | | * |
13 | | *------------------------------------------------------------------------- |
14 | | */ |
15 | | #include "postgres.h" |
16 | | |
17 | | #include <signal.h> |
18 | | #include <unistd.h> |
19 | | |
20 | | #include "miscadmin.h" |
21 | | #include "storage/ipc.h" |
22 | | #include "storage/proc.h" |
23 | | #include "storage/procnumber.h" |
24 | | #include "storage/procsignal.h" |
25 | | #include "storage/shmem.h" |
26 | | #include "storage/sinvaladt.h" |
27 | | #include "storage/spin.h" |
28 | | |
29 | | /* |
30 | | * Conceptually, the shared cache invalidation messages are stored in an |
31 | | * infinite array, where maxMsgNum is the next array subscript to store a |
32 | | * submitted message in, minMsgNum is the smallest array subscript containing |
33 | | * a message not yet read by all backends, and we always have maxMsgNum >= |
34 | | * minMsgNum. (They are equal when there are no messages pending.) For each |
35 | | * active backend, there is a nextMsgNum pointer indicating the next message it |
36 | | * needs to read; we have maxMsgNum >= nextMsgNum >= minMsgNum for every |
37 | | * backend. |
38 | | * |
39 | | * (In the current implementation, minMsgNum is a lower bound for the |
40 | | * per-process nextMsgNum values, but it isn't rigorously kept equal to the |
41 | | * smallest nextMsgNum --- it may lag behind. We only update it when |
42 | | * SICleanupQueue is called, and we try not to do that often.) |
43 | | * |
44 | | * In reality, the messages are stored in a circular buffer of MAXNUMMESSAGES |
45 | | * entries. We translate MsgNum values into circular-buffer indexes by |
46 | | * computing MsgNum % MAXNUMMESSAGES (this should be fast as long as |
47 | | * MAXNUMMESSAGES is a constant and a power of 2). As long as maxMsgNum |
48 | | * doesn't exceed minMsgNum by more than MAXNUMMESSAGES, we have enough space |
49 | | * in the buffer. If the buffer does overflow, we recover by setting the |
50 | | * "reset" flag for each backend that has fallen too far behind. A backend |
51 | | * that is in "reset" state is ignored while determining minMsgNum. When |
52 | | * it does finally attempt to receive inval messages, it must discard all |
53 | | * its invalidatable state, since it won't know what it missed. |
54 | | * |
55 | | * To reduce the probability of needing resets, we send a "catchup" interrupt |
56 | | * to any backend that seems to be falling unreasonably far behind. The |
57 | | * normal behavior is that at most one such interrupt is in flight at a time; |
58 | | * when a backend completes processing a catchup interrupt, it executes |
59 | | * SICleanupQueue, which will signal the next-furthest-behind backend if |
60 | | * needed. This avoids undue contention from multiple backends all trying |
61 | | * to catch up at once. However, the furthest-back backend might be stuck |
62 | | * in a state where it can't catch up. Eventually it will get reset, so it |
63 | | * won't cause any more problems for anyone but itself. But we don't want |
64 | | * to find that a bunch of other backends are now too close to the reset |
65 | | * threshold to be saved. So SICleanupQueue is designed to occasionally |
66 | | * send extra catchup interrupts as the queue gets fuller, to backends that |
67 | | * are far behind and haven't gotten one yet. As long as there aren't a lot |
68 | | * of "stuck" backends, we won't need a lot of extra interrupts, since ones |
69 | | * that aren't stuck will propagate their interrupts to the next guy. |
70 | | * |
71 | | * We would have problems if the MsgNum values overflow an integer, so |
72 | | * whenever minMsgNum exceeds MSGNUMWRAPAROUND, we subtract MSGNUMWRAPAROUND |
73 | | * from all the MsgNum variables simultaneously. MSGNUMWRAPAROUND can be |
74 | | * large so that we don't need to do this often. It must be a multiple of |
75 | | * MAXNUMMESSAGES so that the existing circular-buffer entries don't need |
76 | | * to be moved when we do it. |
77 | | * |
78 | | * Access to the shared sinval array is protected by two locks, SInvalReadLock |
79 | | * and SInvalWriteLock. Readers take SInvalReadLock in shared mode; this |
80 | | * authorizes them to modify their own ProcState but not to modify or even |
81 | | * look at anyone else's. When we need to perform array-wide updates, |
82 | | * such as in SICleanupQueue, we take SInvalReadLock in exclusive mode to |
83 | | * lock out all readers. Writers take SInvalWriteLock (always in exclusive |
84 | | * mode) to serialize adding messages to the queue. Note that a writer |
85 | | * can operate in parallel with one or more readers, because the writer |
86 | | * has no need to touch anyone's ProcState, except in the infrequent cases |
87 | | * when SICleanupQueue is needed. The only point of overlap is that |
88 | | * the writer wants to change maxMsgNum while readers need to read it. |
89 | | * We deal with that by having a spinlock that readers must take for just |
90 | | * long enough to read maxMsgNum, while writers take it for just long enough |
91 | | * to write maxMsgNum. (The exact rule is that you need the spinlock to |
92 | | * read maxMsgNum if you are not holding SInvalWriteLock, and you need the |
93 | | * spinlock to write maxMsgNum unless you are holding both locks.) |
94 | | * |
95 | | * Note: since maxMsgNum is an int and hence presumably atomically readable/ |
96 | | * writable, the spinlock might seem unnecessary. The reason it is needed |
97 | | * is to provide a memory barrier: we need to be sure that messages written |
98 | | * to the array are actually there before maxMsgNum is increased, and that |
99 | | * readers will see that data after fetching maxMsgNum. Multiprocessors |
100 | | * that have weak memory-ordering guarantees can fail without the memory |
101 | | * barrier instructions that are included in the spinlock sequences. |
102 | | */ |
103 | | |
104 | | |
105 | | /* |
106 | | * Configurable parameters. |
107 | | * |
108 | | * MAXNUMMESSAGES: max number of shared-inval messages we can buffer. |
109 | | * Must be a power of 2 for speed. |
110 | | * |
111 | | * MSGNUMWRAPAROUND: how often to reduce MsgNum variables to avoid overflow. |
112 | | * Must be a multiple of MAXNUMMESSAGES. Should be large. |
113 | | * |
114 | | * CLEANUP_MIN: the minimum number of messages that must be in the buffer |
115 | | * before we bother to call SICleanupQueue. |
116 | | * |
117 | | * CLEANUP_QUANTUM: how often (in messages) to call SICleanupQueue once |
118 | | * we exceed CLEANUP_MIN. Should be a power of 2 for speed. |
119 | | * |
120 | | * SIG_THRESHOLD: the minimum number of messages a backend must have fallen |
121 | | * behind before we'll send it PROCSIG_CATCHUP_INTERRUPT. |
122 | | * |
123 | | * WRITE_QUANTUM: the max number of messages to push into the buffer per |
124 | | * iteration of SIInsertDataEntries. Noncritical but should be less than |
125 | | * CLEANUP_QUANTUM, because we only consider calling SICleanupQueue once |
126 | | * per iteration. |
127 | | */ |
128 | | |
129 | 0 | #define MAXNUMMESSAGES 4096 |
130 | 0 | #define MSGNUMWRAPAROUND (MAXNUMMESSAGES * 262144) |
131 | 0 | #define CLEANUP_MIN (MAXNUMMESSAGES / 2) |
132 | 0 | #define CLEANUP_QUANTUM (MAXNUMMESSAGES / 16) |
133 | 0 | #define SIG_THRESHOLD (MAXNUMMESSAGES / 2) |
134 | | #define WRITE_QUANTUM 64 |
135 | | |
136 | | /* Per-backend state in shared invalidation structure */ |
137 | | typedef struct ProcState |
138 | | { |
139 | | /* procPid is zero in an inactive ProcState array entry. */ |
140 | | pid_t procPid; /* PID of backend, for signaling */ |
141 | | /* nextMsgNum is meaningless if procPid == 0 or resetState is true. */ |
142 | | int nextMsgNum; /* next message number to read */ |
143 | | bool resetState; /* backend needs to reset its state */ |
144 | | bool signaled; /* backend has been sent catchup signal */ |
145 | | bool hasMessages; /* backend has unread messages */ |
146 | | |
147 | | /* |
148 | | * Backend only sends invalidations, never receives them. This only makes |
149 | | * sense for Startup process during recovery because it doesn't maintain a |
150 | | * relcache, yet it fires inval messages to allow query backends to see |
151 | | * schema changes. |
152 | | */ |
153 | | bool sendOnly; /* backend only sends, never receives */ |
154 | | |
155 | | /* |
156 | | * Next LocalTransactionId to use for each idle backend slot. We keep |
157 | | * this here because it is indexed by ProcNumber and it is convenient to |
158 | | * copy the value to and from local memory when MyProcNumber is set. It's |
159 | | * meaningless in an active ProcState entry. |
160 | | */ |
161 | | LocalTransactionId nextLXID; |
162 | | } ProcState; |
163 | | |
164 | | /* Shared cache invalidation memory segment */ |
165 | | typedef struct SISeg |
166 | | { |
167 | | /* |
168 | | * General state information |
169 | | */ |
170 | | int minMsgNum; /* oldest message still needed */ |
171 | | int maxMsgNum; /* next message number to be assigned */ |
172 | | int nextThreshold; /* # of messages to call SICleanupQueue */ |
173 | | |
174 | | slock_t msgnumLock; /* spinlock protecting maxMsgNum */ |
175 | | |
176 | | /* |
177 | | * Circular buffer holding shared-inval messages |
178 | | */ |
179 | | SharedInvalidationMessage buffer[MAXNUMMESSAGES]; |
180 | | |
181 | | /* |
182 | | * Per-backend invalidation state info. |
183 | | * |
184 | | * 'procState' has NumProcStateSlots entries, and is indexed by pgprocno. |
185 | | * 'numProcs' is the number of slots currently in use, and 'pgprocnos' is |
186 | | * a dense array of their indexes, to speed up scanning all in-use slots. |
187 | | * |
188 | | * 'pgprocnos' is largely redundant with ProcArrayStruct->pgprocnos, but |
189 | | * having our separate copy avoids contention on ProcArrayLock, and allows |
190 | | * us to track only the processes that participate in shared cache |
191 | | * invalidations. |
192 | | */ |
193 | | int numProcs; |
194 | | int *pgprocnos; |
195 | | ProcState procState[FLEXIBLE_ARRAY_MEMBER]; |
196 | | } SISeg; |
197 | | |
198 | | /* |
199 | | * We reserve a slot for each possible ProcNumber, plus one for each |
200 | | * possible auxiliary process type. (This scheme assumes there is not |
201 | | * more than one of any auxiliary process type at a time, except for |
202 | | * IO workers.) |
203 | | */ |
204 | 0 | #define NumProcStateSlots (MaxBackends + NUM_AUXILIARY_PROCS) |
205 | | |
206 | | static SISeg *shmInvalBuffer; /* pointer to the shared inval buffer */ |
207 | | |
208 | | |
209 | | static LocalTransactionId nextLocalTransactionId; |
210 | | |
211 | | static void CleanupInvalidationState(int status, Datum arg); |
212 | | |
213 | | |
214 | | /* |
215 | | * SharedInvalShmemSize --- return shared-memory space needed |
216 | | */ |
217 | | Size |
218 | | SharedInvalShmemSize(void) |
219 | 0 | { |
220 | 0 | Size size; |
221 | |
|
222 | 0 | size = offsetof(SISeg, procState); |
223 | 0 | size = add_size(size, mul_size(sizeof(ProcState), NumProcStateSlots)); /* procState */ |
224 | 0 | size = add_size(size, mul_size(sizeof(int), NumProcStateSlots)); /* pgprocnos */ |
225 | |
|
226 | 0 | return size; |
227 | 0 | } |
228 | | |
229 | | /* |
230 | | * SharedInvalShmemInit |
231 | | * Create and initialize the SI message buffer |
232 | | */ |
233 | | void |
234 | | SharedInvalShmemInit(void) |
235 | 0 | { |
236 | 0 | int i; |
237 | 0 | bool found; |
238 | | |
239 | | /* Allocate space in shared memory */ |
240 | 0 | shmInvalBuffer = (SISeg *) |
241 | 0 | ShmemInitStruct("shmInvalBuffer", SharedInvalShmemSize(), &found); |
242 | 0 | if (found) |
243 | 0 | return; |
244 | | |
245 | | /* Clear message counters, save size of procState array, init spinlock */ |
246 | 0 | shmInvalBuffer->minMsgNum = 0; |
247 | 0 | shmInvalBuffer->maxMsgNum = 0; |
248 | 0 | shmInvalBuffer->nextThreshold = CLEANUP_MIN; |
249 | 0 | SpinLockInit(&shmInvalBuffer->msgnumLock); |
250 | | |
251 | | /* The buffer[] array is initially all unused, so we need not fill it */ |
252 | | |
253 | | /* Mark all backends inactive, and initialize nextLXID */ |
254 | 0 | for (i = 0; i < NumProcStateSlots; i++) |
255 | 0 | { |
256 | 0 | shmInvalBuffer->procState[i].procPid = 0; /* inactive */ |
257 | 0 | shmInvalBuffer->procState[i].nextMsgNum = 0; /* meaningless */ |
258 | 0 | shmInvalBuffer->procState[i].resetState = false; |
259 | 0 | shmInvalBuffer->procState[i].signaled = false; |
260 | 0 | shmInvalBuffer->procState[i].hasMessages = false; |
261 | 0 | shmInvalBuffer->procState[i].nextLXID = InvalidLocalTransactionId; |
262 | 0 | } |
263 | 0 | shmInvalBuffer->numProcs = 0; |
264 | 0 | shmInvalBuffer->pgprocnos = (int *) &shmInvalBuffer->procState[i]; |
265 | 0 | } |
266 | | |
267 | | /* |
268 | | * SharedInvalBackendInit |
269 | | * Initialize a new backend to operate on the sinval buffer |
270 | | */ |
271 | | void |
272 | | SharedInvalBackendInit(bool sendOnly) |
273 | 0 | { |
274 | 0 | ProcState *stateP; |
275 | 0 | pid_t oldPid; |
276 | 0 | SISeg *segP = shmInvalBuffer; |
277 | |
|
278 | 0 | if (MyProcNumber < 0) |
279 | 0 | elog(ERROR, "MyProcNumber not set"); |
280 | 0 | if (MyProcNumber >= NumProcStateSlots) |
281 | 0 | elog(PANIC, "unexpected MyProcNumber %d in SharedInvalBackendInit (max %d)", |
282 | 0 | MyProcNumber, NumProcStateSlots); |
283 | 0 | stateP = &segP->procState[MyProcNumber]; |
284 | | |
285 | | /* |
286 | | * This can run in parallel with read operations, but not with write |
287 | | * operations, since SIInsertDataEntries relies on the pgprocnos array to |
288 | | * set hasMessages appropriately. |
289 | | */ |
290 | 0 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
291 | |
|
292 | 0 | oldPid = stateP->procPid; |
293 | 0 | if (oldPid != 0) |
294 | 0 | { |
295 | 0 | LWLockRelease(SInvalWriteLock); |
296 | 0 | elog(ERROR, "sinval slot for backend %d is already in use by process %d", |
297 | 0 | MyProcNumber, (int) oldPid); |
298 | 0 | } |
299 | | |
300 | 0 | shmInvalBuffer->pgprocnos[shmInvalBuffer->numProcs++] = MyProcNumber; |
301 | | |
302 | | /* Fetch next local transaction ID into local memory */ |
303 | 0 | nextLocalTransactionId = stateP->nextLXID; |
304 | | |
305 | | /* mark myself active, with all extant messages already read */ |
306 | 0 | stateP->procPid = MyProcPid; |
307 | 0 | stateP->nextMsgNum = segP->maxMsgNum; |
308 | 0 | stateP->resetState = false; |
309 | 0 | stateP->signaled = false; |
310 | 0 | stateP->hasMessages = false; |
311 | 0 | stateP->sendOnly = sendOnly; |
312 | |
|
313 | 0 | LWLockRelease(SInvalWriteLock); |
314 | | |
315 | | /* register exit routine to mark my entry inactive at exit */ |
316 | 0 | on_shmem_exit(CleanupInvalidationState, PointerGetDatum(segP)); |
317 | 0 | } |
318 | | |
319 | | /* |
320 | | * CleanupInvalidationState |
321 | | * Mark the current backend as no longer active. |
322 | | * |
323 | | * This function is called via on_shmem_exit() during backend shutdown. |
324 | | * |
325 | | * arg is really of type "SISeg*". |
326 | | */ |
327 | | static void |
328 | | CleanupInvalidationState(int status, Datum arg) |
329 | 0 | { |
330 | 0 | SISeg *segP = (SISeg *) DatumGetPointer(arg); |
331 | 0 | ProcState *stateP; |
332 | 0 | int i; |
333 | |
|
334 | 0 | Assert(PointerIsValid(segP)); |
335 | |
|
336 | 0 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
337 | |
|
338 | 0 | stateP = &segP->procState[MyProcNumber]; |
339 | | |
340 | | /* Update next local transaction ID for next holder of this proc number */ |
341 | 0 | stateP->nextLXID = nextLocalTransactionId; |
342 | | |
343 | | /* Mark myself inactive */ |
344 | 0 | stateP->procPid = 0; |
345 | 0 | stateP->nextMsgNum = 0; |
346 | 0 | stateP->resetState = false; |
347 | 0 | stateP->signaled = false; |
348 | |
|
349 | 0 | for (i = segP->numProcs - 1; i >= 0; i--) |
350 | 0 | { |
351 | 0 | if (segP->pgprocnos[i] == MyProcNumber) |
352 | 0 | { |
353 | 0 | if (i != segP->numProcs - 1) |
354 | 0 | segP->pgprocnos[i] = segP->pgprocnos[segP->numProcs - 1]; |
355 | 0 | break; |
356 | 0 | } |
357 | 0 | } |
358 | 0 | if (i < 0) |
359 | 0 | elog(PANIC, "could not find entry in sinval array"); |
360 | 0 | segP->numProcs--; |
361 | |
|
362 | 0 | LWLockRelease(SInvalWriteLock); |
363 | 0 | } |
364 | | |
365 | | /* |
366 | | * SIInsertDataEntries |
367 | | * Add new invalidation message(s) to the buffer. |
368 | | */ |
369 | | void |
370 | | SIInsertDataEntries(const SharedInvalidationMessage *data, int n) |
371 | 0 | { |
372 | 0 | SISeg *segP = shmInvalBuffer; |
373 | | |
374 | | /* |
375 | | * N can be arbitrarily large. We divide the work into groups of no more |
376 | | * than WRITE_QUANTUM messages, to be sure that we don't hold the lock for |
377 | | * an unreasonably long time. (This is not so much because we care about |
378 | | * letting in other writers, as that some just-caught-up backend might be |
379 | | * trying to do SICleanupQueue to pass on its signal, and we don't want it |
380 | | * to have to wait a long time.) Also, we need to consider calling |
381 | | * SICleanupQueue every so often. |
382 | | */ |
383 | 0 | while (n > 0) |
384 | 0 | { |
385 | 0 | int nthistime = Min(n, WRITE_QUANTUM); |
386 | 0 | int numMsgs; |
387 | 0 | int max; |
388 | 0 | int i; |
389 | |
|
390 | 0 | n -= nthistime; |
391 | |
|
392 | 0 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
393 | | |
394 | | /* |
395 | | * If the buffer is full, we *must* acquire some space. Clean the |
396 | | * queue and reset anyone who is preventing space from being freed. |
397 | | * Otherwise, clean the queue only when it's exceeded the next |
398 | | * fullness threshold. We have to loop and recheck the buffer state |
399 | | * after any call of SICleanupQueue. |
400 | | */ |
401 | 0 | for (;;) |
402 | 0 | { |
403 | 0 | numMsgs = segP->maxMsgNum - segP->minMsgNum; |
404 | 0 | if (numMsgs + nthistime > MAXNUMMESSAGES || |
405 | 0 | numMsgs >= segP->nextThreshold) |
406 | 0 | SICleanupQueue(true, nthistime); |
407 | 0 | else |
408 | 0 | break; |
409 | 0 | } |
410 | | |
411 | | /* |
412 | | * Insert new message(s) into proper slot of circular buffer |
413 | | */ |
414 | 0 | max = segP->maxMsgNum; |
415 | 0 | while (nthistime-- > 0) |
416 | 0 | { |
417 | 0 | segP->buffer[max % MAXNUMMESSAGES] = *data++; |
418 | 0 | max++; |
419 | 0 | } |
420 | | |
421 | | /* Update current value of maxMsgNum using spinlock */ |
422 | 0 | SpinLockAcquire(&segP->msgnumLock); |
423 | 0 | segP->maxMsgNum = max; |
424 | 0 | SpinLockRelease(&segP->msgnumLock); |
425 | | |
426 | | /* |
427 | | * Now that the maxMsgNum change is globally visible, we give everyone |
428 | | * a swift kick to make sure they read the newly added messages. |
429 | | * Releasing SInvalWriteLock will enforce a full memory barrier, so |
430 | | * these (unlocked) changes will be committed to memory before we exit |
431 | | * the function. |
432 | | */ |
433 | 0 | for (i = 0; i < segP->numProcs; i++) |
434 | 0 | { |
435 | 0 | ProcState *stateP = &segP->procState[segP->pgprocnos[i]]; |
436 | |
|
437 | 0 | stateP->hasMessages = true; |
438 | 0 | } |
439 | |
|
440 | 0 | LWLockRelease(SInvalWriteLock); |
441 | 0 | } |
442 | 0 | } |
443 | | |
444 | | /* |
445 | | * SIGetDataEntries |
446 | | * get next SI message(s) for current backend, if there are any |
447 | | * |
448 | | * Possible return values: |
449 | | * 0: no SI message available |
450 | | * n>0: next n SI messages have been extracted into data[] |
451 | | * -1: SI reset message extracted |
452 | | * |
453 | | * If the return value is less than the array size "datasize", the caller |
454 | | * can assume that there are no more SI messages after the one(s) returned. |
455 | | * Otherwise, another call is needed to collect more messages. |
456 | | * |
457 | | * NB: this can run in parallel with other instances of SIGetDataEntries |
458 | | * executing on behalf of other backends, since each instance will modify only |
459 | | * fields of its own backend's ProcState, and no instance will look at fields |
460 | | * of other backends' ProcStates. We express this by grabbing SInvalReadLock |
461 | | * in shared mode. Note that this is not exactly the normal (read-only) |
462 | | * interpretation of a shared lock! Look closely at the interactions before |
463 | | * allowing SInvalReadLock to be grabbed in shared mode for any other reason! |
464 | | * |
465 | | * NB: this can also run in parallel with SIInsertDataEntries. It is not |
466 | | * guaranteed that we will return any messages added after the routine is |
467 | | * entered. |
468 | | * |
469 | | * Note: we assume that "datasize" is not so large that it might be important |
470 | | * to break our hold on SInvalReadLock into segments. |
471 | | */ |
472 | | int |
473 | | SIGetDataEntries(SharedInvalidationMessage *data, int datasize) |
474 | 0 | { |
475 | 0 | SISeg *segP; |
476 | 0 | ProcState *stateP; |
477 | 0 | int max; |
478 | 0 | int n; |
479 | |
|
480 | 0 | segP = shmInvalBuffer; |
481 | 0 | stateP = &segP->procState[MyProcNumber]; |
482 | | |
483 | | /* |
484 | | * Before starting to take locks, do a quick, unlocked test to see whether |
485 | | * there can possibly be anything to read. On a multiprocessor system, |
486 | | * it's possible that this load could migrate backwards and occur before |
487 | | * we actually enter this function, so we might miss a sinval message that |
488 | | * was just added by some other processor. But they can't migrate |
489 | | * backwards over a preceding lock acquisition, so it should be OK. If we |
490 | | * haven't acquired a lock preventing against further relevant |
491 | | * invalidations, any such occurrence is not much different than if the |
492 | | * invalidation had arrived slightly later in the first place. |
493 | | */ |
494 | 0 | if (!stateP->hasMessages) |
495 | 0 | return 0; |
496 | | |
497 | 0 | LWLockAcquire(SInvalReadLock, LW_SHARED); |
498 | | |
499 | | /* |
500 | | * We must reset hasMessages before determining how many messages we're |
501 | | * going to read. That way, if new messages arrive after we have |
502 | | * determined how many we're reading, the flag will get reset and we'll |
503 | | * notice those messages part-way through. |
504 | | * |
505 | | * Note that, if we don't end up reading all of the messages, we had |
506 | | * better be certain to reset this flag before exiting! |
507 | | */ |
508 | 0 | stateP->hasMessages = false; |
509 | | |
510 | | /* Fetch current value of maxMsgNum using spinlock */ |
511 | 0 | SpinLockAcquire(&segP->msgnumLock); |
512 | 0 | max = segP->maxMsgNum; |
513 | 0 | SpinLockRelease(&segP->msgnumLock); |
514 | |
|
515 | 0 | if (stateP->resetState) |
516 | 0 | { |
517 | | /* |
518 | | * Force reset. We can say we have dealt with any messages added |
519 | | * since the reset, as well; and that means we should clear the |
520 | | * signaled flag, too. |
521 | | */ |
522 | 0 | stateP->nextMsgNum = max; |
523 | 0 | stateP->resetState = false; |
524 | 0 | stateP->signaled = false; |
525 | 0 | LWLockRelease(SInvalReadLock); |
526 | 0 | return -1; |
527 | 0 | } |
528 | | |
529 | | /* |
530 | | * Retrieve messages and advance backend's counter, until data array is |
531 | | * full or there are no more messages. |
532 | | * |
533 | | * There may be other backends that haven't read the message(s), so we |
534 | | * cannot delete them here. SICleanupQueue() will eventually remove them |
535 | | * from the queue. |
536 | | */ |
537 | 0 | n = 0; |
538 | 0 | while (n < datasize && stateP->nextMsgNum < max) |
539 | 0 | { |
540 | 0 | data[n++] = segP->buffer[stateP->nextMsgNum % MAXNUMMESSAGES]; |
541 | 0 | stateP->nextMsgNum++; |
542 | 0 | } |
543 | | |
544 | | /* |
545 | | * If we have caught up completely, reset our "signaled" flag so that |
546 | | * we'll get another signal if we fall behind again. |
547 | | * |
548 | | * If we haven't caught up completely, reset the hasMessages flag so that |
549 | | * we see the remaining messages next time. |
550 | | */ |
551 | 0 | if (stateP->nextMsgNum >= max) |
552 | 0 | stateP->signaled = false; |
553 | 0 | else |
554 | 0 | stateP->hasMessages = true; |
555 | |
|
556 | 0 | LWLockRelease(SInvalReadLock); |
557 | 0 | return n; |
558 | 0 | } |
559 | | |
560 | | /* |
561 | | * SICleanupQueue |
562 | | * Remove messages that have been consumed by all active backends |
563 | | * |
564 | | * callerHasWriteLock is true if caller is holding SInvalWriteLock. |
565 | | * minFree is the minimum number of message slots to make free. |
566 | | * |
567 | | * Possible side effects of this routine include marking one or more |
568 | | * backends as "reset" in the array, and sending PROCSIG_CATCHUP_INTERRUPT |
569 | | * to some backend that seems to be getting too far behind. We signal at |
570 | | * most one backend at a time, for reasons explained at the top of the file. |
571 | | * |
572 | | * Caution: because we transiently release write lock when we have to signal |
573 | | * some other backend, it is NOT guaranteed that there are still minFree |
574 | | * free message slots at exit. Caller must recheck and perhaps retry. |
575 | | */ |
576 | | void |
577 | | SICleanupQueue(bool callerHasWriteLock, int minFree) |
578 | 0 | { |
579 | 0 | SISeg *segP = shmInvalBuffer; |
580 | 0 | int min, |
581 | 0 | minsig, |
582 | 0 | lowbound, |
583 | 0 | numMsgs, |
584 | 0 | i; |
585 | 0 | ProcState *needSig = NULL; |
586 | | |
587 | | /* Lock out all writers and readers */ |
588 | 0 | if (!callerHasWriteLock) |
589 | 0 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
590 | 0 | LWLockAcquire(SInvalReadLock, LW_EXCLUSIVE); |
591 | | |
592 | | /* |
593 | | * Recompute minMsgNum = minimum of all backends' nextMsgNum, identify the |
594 | | * furthest-back backend that needs signaling (if any), and reset any |
595 | | * backends that are too far back. Note that because we ignore sendOnly |
596 | | * backends here it is possible for them to keep sending messages without |
597 | | * a problem even when they are the only active backend. |
598 | | */ |
599 | 0 | min = segP->maxMsgNum; |
600 | 0 | minsig = min - SIG_THRESHOLD; |
601 | 0 | lowbound = min - MAXNUMMESSAGES + minFree; |
602 | |
|
603 | 0 | for (i = 0; i < segP->numProcs; i++) |
604 | 0 | { |
605 | 0 | ProcState *stateP = &segP->procState[segP->pgprocnos[i]]; |
606 | 0 | int n = stateP->nextMsgNum; |
607 | | |
608 | | /* Ignore if already in reset state */ |
609 | 0 | Assert(stateP->procPid != 0); |
610 | 0 | if (stateP->resetState || stateP->sendOnly) |
611 | 0 | continue; |
612 | | |
613 | | /* |
614 | | * If we must free some space and this backend is preventing it, force |
615 | | * him into reset state and then ignore until he catches up. |
616 | | */ |
617 | 0 | if (n < lowbound) |
618 | 0 | { |
619 | 0 | stateP->resetState = true; |
620 | | /* no point in signaling him ... */ |
621 | 0 | continue; |
622 | 0 | } |
623 | | |
624 | | /* Track the global minimum nextMsgNum */ |
625 | 0 | if (n < min) |
626 | 0 | min = n; |
627 | | |
628 | | /* Also see who's furthest back of the unsignaled backends */ |
629 | 0 | if (n < minsig && !stateP->signaled) |
630 | 0 | { |
631 | 0 | minsig = n; |
632 | 0 | needSig = stateP; |
633 | 0 | } |
634 | 0 | } |
635 | 0 | segP->minMsgNum = min; |
636 | | |
637 | | /* |
638 | | * When minMsgNum gets really large, decrement all message counters so as |
639 | | * to forestall overflow of the counters. This happens seldom enough that |
640 | | * folding it into the previous loop would be a loser. |
641 | | */ |
642 | 0 | if (min >= MSGNUMWRAPAROUND) |
643 | 0 | { |
644 | 0 | segP->minMsgNum -= MSGNUMWRAPAROUND; |
645 | 0 | segP->maxMsgNum -= MSGNUMWRAPAROUND; |
646 | 0 | for (i = 0; i < segP->numProcs; i++) |
647 | 0 | segP->procState[segP->pgprocnos[i]].nextMsgNum -= MSGNUMWRAPAROUND; |
648 | 0 | } |
649 | | |
650 | | /* |
651 | | * Determine how many messages are still in the queue, and set the |
652 | | * threshold at which we should repeat SICleanupQueue(). |
653 | | */ |
654 | 0 | numMsgs = segP->maxMsgNum - segP->minMsgNum; |
655 | 0 | if (numMsgs < CLEANUP_MIN) |
656 | 0 | segP->nextThreshold = CLEANUP_MIN; |
657 | 0 | else |
658 | 0 | segP->nextThreshold = (numMsgs / CLEANUP_QUANTUM + 1) * CLEANUP_QUANTUM; |
659 | | |
660 | | /* |
661 | | * Lastly, signal anyone who needs a catchup interrupt. Since |
662 | | * SendProcSignal() might not be fast, we don't want to hold locks while |
663 | | * executing it. |
664 | | */ |
665 | 0 | if (needSig) |
666 | 0 | { |
667 | 0 | pid_t his_pid = needSig->procPid; |
668 | 0 | ProcNumber his_procNumber = (needSig - &segP->procState[0]); |
669 | |
|
670 | 0 | needSig->signaled = true; |
671 | 0 | LWLockRelease(SInvalReadLock); |
672 | 0 | LWLockRelease(SInvalWriteLock); |
673 | 0 | elog(DEBUG4, "sending sinval catchup signal to PID %d", (int) his_pid); |
674 | 0 | SendProcSignal(his_pid, PROCSIG_CATCHUP_INTERRUPT, his_procNumber); |
675 | 0 | if (callerHasWriteLock) |
676 | 0 | LWLockAcquire(SInvalWriteLock, LW_EXCLUSIVE); |
677 | 0 | } |
678 | 0 | else |
679 | 0 | { |
680 | 0 | LWLockRelease(SInvalReadLock); |
681 | 0 | if (!callerHasWriteLock) |
682 | 0 | LWLockRelease(SInvalWriteLock); |
683 | 0 | } |
684 | 0 | } |
685 | | |
686 | | |
687 | | /* |
688 | | * GetNextLocalTransactionId --- allocate a new LocalTransactionId |
689 | | * |
690 | | * We split VirtualTransactionIds into two parts so that it is possible |
691 | | * to allocate a new one without any contention for shared memory, except |
692 | | * for a bit of additional overhead during backend startup/shutdown. |
693 | | * The high-order part of a VirtualTransactionId is a ProcNumber, and the |
694 | | * low-order part is a LocalTransactionId, which we assign from a local |
695 | | * counter. To avoid the risk of a VirtualTransactionId being reused |
696 | | * within a short interval, successive procs occupying the same PGPROC slot |
697 | | * should use a consecutive sequence of local IDs, which is implemented |
698 | | * by copying nextLocalTransactionId as seen above. |
699 | | */ |
700 | | LocalTransactionId |
701 | | GetNextLocalTransactionId(void) |
702 | 0 | { |
703 | 0 | LocalTransactionId result; |
704 | | |
705 | | /* loop to avoid returning InvalidLocalTransactionId at wraparound */ |
706 | 0 | do |
707 | 0 | { |
708 | 0 | result = nextLocalTransactionId++; |
709 | 0 | } while (!LocalTransactionIdIsValid(result)); |
710 | |
|
711 | 0 | return result; |
712 | 0 | } |