/src/postgres/src/backend/commands/async.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * async.c |
4 | | * Asynchronous notification: NOTIFY, LISTEN, UNLISTEN |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * IDENTIFICATION |
10 | | * src/backend/commands/async.c |
11 | | * |
12 | | *------------------------------------------------------------------------- |
13 | | */ |
14 | | |
15 | | /*------------------------------------------------------------------------- |
16 | | * Async Notification Model as of 9.0: |
17 | | * |
18 | | * 1. Multiple backends on same machine. Multiple backends listening on |
19 | | * several channels. (Channels are also called "conditions" in other |
20 | | * parts of the code.) |
21 | | * |
22 | | * 2. There is one central queue in disk-based storage (directory pg_notify/), |
23 | | * with actively-used pages mapped into shared memory by the slru.c module. |
24 | | * All notification messages are placed in the queue and later read out |
25 | | * by listening backends. |
26 | | * |
27 | | * There is no central knowledge of which backend listens on which channel; |
28 | | * every backend has its own list of interesting channels. |
29 | | * |
30 | | * Although there is only one queue, notifications are treated as being |
31 | | * database-local; this is done by including the sender's database OID |
32 | | * in each notification message. Listening backends ignore messages |
33 | | * that don't match their database OID. This is important because it |
34 | | * ensures senders and receivers have the same database encoding and won't |
35 | | * misinterpret non-ASCII text in the channel name or payload string. |
36 | | * |
37 | | * Since notifications are not expected to survive database crashes, |
38 | | * we can simply clean out the pg_notify data at any reboot, and there |
39 | | * is no need for WAL support or fsync'ing. |
40 | | * |
41 | | * 3. Every backend that is listening on at least one channel registers by |
42 | | * entering its PID into the array in AsyncQueueControl. It then scans all |
43 | | * incoming notifications in the central queue and first compares the |
44 | | * database OID of the notification with its own database OID and then |
45 | | * compares the notified channel with the list of channels that it listens |
46 | | * to. In case there is a match it delivers the notification event to its |
47 | | * frontend. Non-matching events are simply skipped. |
48 | | * |
49 | | * 4. The NOTIFY statement (routine Async_Notify) stores the notification in |
50 | | * a backend-local list which will not be processed until transaction end. |
51 | | * |
52 | | * Duplicate notifications from the same transaction are sent out as one |
53 | | * notification only. This is done to save work when for example a trigger |
54 | | * on a 2 million row table fires a notification for each row that has been |
55 | | * changed. If the application needs to receive every single notification |
56 | | * that has been sent, it can easily add some unique string into the extra |
57 | | * payload parameter. |
58 | | * |
59 | | * When the transaction is ready to commit, PreCommit_Notify() adds the |
60 | | * pending notifications to the head of the queue. The head pointer of the |
61 | | * queue always points to the next free position and a position is just a |
62 | | * page number and the offset in that page. This is done before marking the |
63 | | * transaction as committed in clog. If we run into problems writing the |
64 | | * notifications, we can still call elog(ERROR, ...) and the transaction |
65 | | * will roll back. |
66 | | * |
67 | | * Once we have put all of the notifications into the queue, we return to |
68 | | * CommitTransaction() which will then do the actual transaction commit. |
69 | | * |
70 | | * After commit we are called another time (AtCommit_Notify()). Here we |
71 | | * make any actual updates to the effective listen state (listenChannels). |
72 | | * Then we signal any backends that may be interested in our messages |
73 | | * (including our own backend, if listening). This is done by |
74 | | * SignalBackends(), which scans the list of listening backends and sends a |
75 | | * PROCSIG_NOTIFY_INTERRUPT signal to every listening backend (we don't |
76 | | * know which backend is listening on which channel so we must signal them |
77 | | * all). We can exclude backends that are already up to date, though, and |
78 | | * we can also exclude backends that are in other databases (unless they |
79 | | * are way behind and should be kicked to make them advance their |
80 | | * pointers). |
81 | | * |
82 | | * Finally, after we are out of the transaction altogether and about to go |
83 | | * idle, we scan the queue for messages that need to be sent to our |
84 | | * frontend (which might be notifies from other backends, or self-notifies |
85 | | * from our own). This step is not part of the CommitTransaction sequence |
86 | | * for two important reasons. First, we could get errors while sending |
87 | | * data to our frontend, and it's really bad for errors to happen in |
88 | | * post-commit cleanup. Second, in cases where a procedure issues commits |
89 | | * within a single frontend command, we don't want to send notifies to our |
90 | | * frontend until the command is done; but notifies to other backends |
91 | | * should go out immediately after each commit. |
92 | | * |
93 | | * 5. Upon receipt of a PROCSIG_NOTIFY_INTERRUPT signal, the signal handler |
94 | | * sets the process's latch, which triggers the event to be processed |
95 | | * immediately if this backend is idle (i.e., it is waiting for a frontend |
96 | | * command and is not within a transaction block. C.f. |
97 | | * ProcessClientReadInterrupt()). Otherwise the handler may only set a |
98 | | * flag, which will cause the processing to occur just before we next go |
99 | | * idle. |
100 | | * |
101 | | * Inbound-notify processing consists of reading all of the notifications |
102 | | * that have arrived since scanning last time. We read every notification |
103 | | * until we reach either a notification from an uncommitted transaction or |
104 | | * the head pointer's position. |
105 | | * |
106 | | * 6. To limit disk space consumption, the tail pointer needs to be advanced |
107 | | * so that old pages can be truncated. This is relatively expensive |
108 | | * (notably, it requires an exclusive lock), so we don't want to do it |
109 | | * often. We make sending backends do this work if they advanced the queue |
110 | | * head into a new page, but only once every QUEUE_CLEANUP_DELAY pages. |
111 | | * |
112 | | * An application that listens on the same channel it notifies will get |
113 | | * NOTIFY messages for its own NOTIFYs. These can be ignored, if not useful, |
114 | | * by comparing be_pid in the NOTIFY message to the application's own backend's |
115 | | * PID. (As of FE/BE protocol 2.0, the backend's PID is provided to the |
116 | | * frontend during startup.) The above design guarantees that notifies from |
117 | | * other backends will never be missed by ignoring self-notifies. |
118 | | * |
119 | | * The amount of shared memory used for notify management (notify_buffers) |
120 | | * can be varied without affecting anything but performance. The maximum |
121 | | * amount of notification data that can be queued at one time is determined |
122 | | * by max_notify_queue_pages GUC. |
123 | | *------------------------------------------------------------------------- |
124 | | */ |
125 | | |
126 | | #include "postgres.h" |
127 | | |
128 | | #include <limits.h> |
129 | | #include <unistd.h> |
130 | | #include <signal.h> |
131 | | |
132 | | #include "access/parallel.h" |
133 | | #include "access/slru.h" |
134 | | #include "access/transam.h" |
135 | | #include "access/xact.h" |
136 | | #include "catalog/pg_database.h" |
137 | | #include "commands/async.h" |
138 | | #include "common/hashfn.h" |
139 | | #include "funcapi.h" |
140 | | #include "libpq/libpq.h" |
141 | | #include "libpq/pqformat.h" |
142 | | #include "miscadmin.h" |
143 | | #include "storage/ipc.h" |
144 | | #include "storage/lmgr.h" |
145 | | #include "storage/procsignal.h" |
146 | | #include "tcop/tcopprot.h" |
147 | | #include "utils/builtins.h" |
148 | | #include "utils/guc_hooks.h" |
149 | | #include "utils/memutils.h" |
150 | | #include "utils/ps_status.h" |
151 | | #include "utils/snapmgr.h" |
152 | | #include "utils/timestamp.h" |
153 | | |
154 | | |
155 | | /* |
156 | | * Maximum size of a NOTIFY payload, including terminating NULL. This |
157 | | * must be kept small enough so that a notification message fits on one |
158 | | * SLRU page. The magic fudge factor here is noncritical as long as it's |
159 | | * more than AsyncQueueEntryEmptySize --- we make it significantly bigger |
160 | | * than that, so changes in that data structure won't affect user-visible |
161 | | * restrictions. |
162 | | */ |
163 | 0 | #define NOTIFY_PAYLOAD_MAX_LENGTH (BLCKSZ - NAMEDATALEN - 128) |
164 | | |
165 | | /* |
166 | | * Struct representing an entry in the global notify queue |
167 | | * |
168 | | * This struct declaration has the maximal length, but in a real queue entry |
169 | | * the data area is only big enough for the actual channel and payload strings |
170 | | * (each null-terminated). AsyncQueueEntryEmptySize is the minimum possible |
171 | | * entry size, if both channel and payload strings are empty (but note it |
172 | | * doesn't include alignment padding). |
173 | | * |
174 | | * The "length" field should always be rounded up to the next QUEUEALIGN |
175 | | * multiple so that all fields are properly aligned. |
176 | | */ |
177 | | typedef struct AsyncQueueEntry |
178 | | { |
179 | | int length; /* total allocated length of entry */ |
180 | | Oid dboid; /* sender's database OID */ |
181 | | TransactionId xid; /* sender's XID */ |
182 | | int32 srcPid; /* sender's PID */ |
183 | | char data[NAMEDATALEN + NOTIFY_PAYLOAD_MAX_LENGTH]; |
184 | | } AsyncQueueEntry; |
185 | | |
186 | | /* Currently, no field of AsyncQueueEntry requires more than int alignment */ |
187 | 0 | #define QUEUEALIGN(len) INTALIGN(len) |
188 | | |
189 | 0 | #define AsyncQueueEntryEmptySize (offsetof(AsyncQueueEntry, data) + 2) |
190 | | |
191 | | /* |
192 | | * Struct describing a queue position, and assorted macros for working with it |
193 | | */ |
194 | | typedef struct QueuePosition |
195 | | { |
196 | | int64 page; /* SLRU page number */ |
197 | | int offset; /* byte offset within page */ |
198 | | } QueuePosition; |
199 | | |
200 | 0 | #define QUEUE_POS_PAGE(x) ((x).page) |
201 | 0 | #define QUEUE_POS_OFFSET(x) ((x).offset) |
202 | | |
203 | | #define SET_QUEUE_POS(x,y,z) \ |
204 | 0 | do { \ |
205 | 0 | (x).page = (y); \ |
206 | 0 | (x).offset = (z); \ |
207 | 0 | } while (0) |
208 | | |
209 | | #define QUEUE_POS_EQUAL(x,y) \ |
210 | 0 | ((x).page == (y).page && (x).offset == (y).offset) |
211 | | |
212 | | #define QUEUE_POS_IS_ZERO(x) \ |
213 | 0 | ((x).page == 0 && (x).offset == 0) |
214 | | |
215 | | /* choose logically smaller QueuePosition */ |
216 | | #define QUEUE_POS_MIN(x,y) \ |
217 | 0 | (asyncQueuePagePrecedes((x).page, (y).page) ? (x) : \ |
218 | 0 | (x).page != (y).page ? (y) : \ |
219 | 0 | (x).offset < (y).offset ? (x) : (y)) |
220 | | |
221 | | /* choose logically larger QueuePosition */ |
222 | | #define QUEUE_POS_MAX(x,y) \ |
223 | 0 | (asyncQueuePagePrecedes((x).page, (y).page) ? (y) : \ |
224 | 0 | (x).page != (y).page ? (x) : \ |
225 | 0 | (x).offset > (y).offset ? (x) : (y)) |
226 | | |
227 | | /* |
228 | | * Parameter determining how often we try to advance the tail pointer: |
229 | | * we do that after every QUEUE_CLEANUP_DELAY pages of NOTIFY data. This is |
230 | | * also the distance by which a backend in another database needs to be |
231 | | * behind before we'll decide we need to wake it up to advance its pointer. |
232 | | * |
233 | | * Resist the temptation to make this really large. While that would save |
234 | | * work in some places, it would add cost in others. In particular, this |
235 | | * should likely be less than notify_buffers, to ensure that backends |
236 | | * catch up before the pages they'll need to read fall out of SLRU cache. |
237 | | */ |
238 | 0 | #define QUEUE_CLEANUP_DELAY 4 |
239 | | |
240 | | /* |
241 | | * Struct describing a listening backend's status |
242 | | */ |
243 | | typedef struct QueueBackendStatus |
244 | | { |
245 | | int32 pid; /* either a PID or InvalidPid */ |
246 | | Oid dboid; /* backend's database OID, or InvalidOid */ |
247 | | ProcNumber nextListener; /* id of next listener, or INVALID_PROC_NUMBER */ |
248 | | QueuePosition pos; /* backend has read queue up to here */ |
249 | | } QueueBackendStatus; |
250 | | |
251 | | /* |
252 | | * Shared memory state for LISTEN/NOTIFY (excluding its SLRU stuff) |
253 | | * |
254 | | * The AsyncQueueControl structure is protected by the NotifyQueueLock and |
255 | | * NotifyQueueTailLock. |
256 | | * |
257 | | * When holding NotifyQueueLock in SHARED mode, backends may only inspect |
258 | | * their own entries as well as the head and tail pointers. Consequently we |
259 | | * can allow a backend to update its own record while holding only SHARED lock |
260 | | * (since no other backend will inspect it). |
261 | | * |
262 | | * When holding NotifyQueueLock in EXCLUSIVE mode, backends can inspect the |
263 | | * entries of other backends and also change the head pointer. When holding |
264 | | * both NotifyQueueLock and NotifyQueueTailLock in EXCLUSIVE mode, backends |
265 | | * can change the tail pointers. |
266 | | * |
267 | | * SLRU buffer pool is divided in banks and bank wise SLRU lock is used as |
268 | | * the control lock for the pg_notify SLRU buffers. |
269 | | * In order to avoid deadlocks, whenever we need multiple locks, we first get |
270 | | * NotifyQueueTailLock, then NotifyQueueLock, and lastly SLRU bank lock. |
271 | | * |
272 | | * Each backend uses the backend[] array entry with index equal to its |
273 | | * ProcNumber. We rely on this to make SendProcSignal fast. |
274 | | * |
275 | | * The backend[] array entries for actively-listening backends are threaded |
276 | | * together using firstListener and the nextListener links, so that we can |
277 | | * scan them without having to iterate over inactive entries. We keep this |
278 | | * list in order by ProcNumber so that the scan is cache-friendly when there |
279 | | * are many active entries. |
280 | | */ |
281 | | typedef struct AsyncQueueControl |
282 | | { |
283 | | QueuePosition head; /* head points to the next free location */ |
284 | | QueuePosition tail; /* tail must be <= the queue position of every |
285 | | * listening backend */ |
286 | | int64 stopPage; /* oldest unrecycled page; must be <= |
287 | | * tail.page */ |
288 | | ProcNumber firstListener; /* id of first listener, or |
289 | | * INVALID_PROC_NUMBER */ |
290 | | TimestampTz lastQueueFillWarn; /* time of last queue-full msg */ |
291 | | QueueBackendStatus backend[FLEXIBLE_ARRAY_MEMBER]; |
292 | | } AsyncQueueControl; |
293 | | |
294 | | static AsyncQueueControl *asyncQueueControl; |
295 | | |
296 | 0 | #define QUEUE_HEAD (asyncQueueControl->head) |
297 | 0 | #define QUEUE_TAIL (asyncQueueControl->tail) |
298 | 0 | #define QUEUE_STOP_PAGE (asyncQueueControl->stopPage) |
299 | 0 | #define QUEUE_FIRST_LISTENER (asyncQueueControl->firstListener) |
300 | 0 | #define QUEUE_BACKEND_PID(i) (asyncQueueControl->backend[i].pid) |
301 | 0 | #define QUEUE_BACKEND_DBOID(i) (asyncQueueControl->backend[i].dboid) |
302 | 0 | #define QUEUE_NEXT_LISTENER(i) (asyncQueueControl->backend[i].nextListener) |
303 | 0 | #define QUEUE_BACKEND_POS(i) (asyncQueueControl->backend[i].pos) |
304 | | |
305 | | /* |
306 | | * The SLRU buffer area through which we access the notification queue |
307 | | */ |
308 | | static SlruCtlData NotifyCtlData; |
309 | | |
310 | 0 | #define NotifyCtl (&NotifyCtlData) |
311 | 0 | #define QUEUE_PAGESIZE BLCKSZ |
312 | | |
313 | 0 | #define QUEUE_FULL_WARN_INTERVAL 5000 /* warn at most once every 5s */ |
314 | | |
315 | | /* |
316 | | * listenChannels identifies the channels we are actually listening to |
317 | | * (ie, have committed a LISTEN on). It is a simple list of channel names, |
318 | | * allocated in TopMemoryContext. |
319 | | */ |
320 | | static List *listenChannels = NIL; /* list of C strings */ |
321 | | |
322 | | /* |
323 | | * State for pending LISTEN/UNLISTEN actions consists of an ordered list of |
324 | | * all actions requested in the current transaction. As explained above, |
325 | | * we don't actually change listenChannels until we reach transaction commit. |
326 | | * |
327 | | * The list is kept in CurTransactionContext. In subtransactions, each |
328 | | * subtransaction has its own list in its own CurTransactionContext, but |
329 | | * successful subtransactions attach their lists to their parent's list. |
330 | | * Failed subtransactions simply discard their lists. |
331 | | */ |
332 | | typedef enum |
333 | | { |
334 | | LISTEN_LISTEN, |
335 | | LISTEN_UNLISTEN, |
336 | | LISTEN_UNLISTEN_ALL, |
337 | | } ListenActionKind; |
338 | | |
339 | | typedef struct |
340 | | { |
341 | | ListenActionKind action; |
342 | | char channel[FLEXIBLE_ARRAY_MEMBER]; /* nul-terminated string */ |
343 | | } ListenAction; |
344 | | |
345 | | typedef struct ActionList |
346 | | { |
347 | | int nestingLevel; /* current transaction nesting depth */ |
348 | | List *actions; /* list of ListenAction structs */ |
349 | | struct ActionList *upper; /* details for upper transaction levels */ |
350 | | } ActionList; |
351 | | |
352 | | static ActionList *pendingActions = NULL; |
353 | | |
354 | | /* |
355 | | * State for outbound notifies consists of a list of all channels+payloads |
356 | | * NOTIFYed in the current transaction. We do not actually perform a NOTIFY |
357 | | * until and unless the transaction commits. pendingNotifies is NULL if no |
358 | | * NOTIFYs have been done in the current (sub) transaction. |
359 | | * |
360 | | * We discard duplicate notify events issued in the same transaction. |
361 | | * Hence, in addition to the list proper (which we need to track the order |
362 | | * of the events, since we guarantee to deliver them in order), we build a |
363 | | * hash table which we can probe to detect duplicates. Since building the |
364 | | * hash table is somewhat expensive, we do so only once we have at least |
365 | | * MIN_HASHABLE_NOTIFIES events queued in the current (sub) transaction; |
366 | | * before that we just scan the events linearly. |
367 | | * |
368 | | * The list is kept in CurTransactionContext. In subtransactions, each |
369 | | * subtransaction has its own list in its own CurTransactionContext, but |
370 | | * successful subtransactions add their entries to their parent's list. |
371 | | * Failed subtransactions simply discard their lists. Since these lists |
372 | | * are independent, there may be notify events in a subtransaction's list |
373 | | * that duplicate events in some ancestor (sub) transaction; we get rid of |
374 | | * the dups when merging the subtransaction's list into its parent's. |
375 | | * |
376 | | * Note: the action and notify lists do not interact within a transaction. |
377 | | * In particular, if a transaction does NOTIFY and then LISTEN on the same |
378 | | * condition name, it will get a self-notify at commit. This is a bit odd |
379 | | * but is consistent with our historical behavior. |
380 | | */ |
381 | | typedef struct Notification |
382 | | { |
383 | | uint16 channel_len; /* length of channel-name string */ |
384 | | uint16 payload_len; /* length of payload string */ |
385 | | /* null-terminated channel name, then null-terminated payload follow */ |
386 | | char data[FLEXIBLE_ARRAY_MEMBER]; |
387 | | } Notification; |
388 | | |
389 | | typedef struct NotificationList |
390 | | { |
391 | | int nestingLevel; /* current transaction nesting depth */ |
392 | | List *events; /* list of Notification structs */ |
393 | | HTAB *hashtab; /* hash of NotificationHash structs, or NULL */ |
394 | | struct NotificationList *upper; /* details for upper transaction levels */ |
395 | | } NotificationList; |
396 | | |
397 | 0 | #define MIN_HASHABLE_NOTIFIES 16 /* threshold to build hashtab */ |
398 | | |
399 | | struct NotificationHash |
400 | | { |
401 | | Notification *event; /* => the actual Notification struct */ |
402 | | }; |
403 | | |
404 | | static NotificationList *pendingNotifies = NULL; |
405 | | |
406 | | /* |
407 | | * Inbound notifications are initially processed by HandleNotifyInterrupt(), |
408 | | * called from inside a signal handler. That just sets the |
409 | | * notifyInterruptPending flag and sets the process |
410 | | * latch. ProcessNotifyInterrupt() will then be called whenever it's safe to |
411 | | * actually deal with the interrupt. |
412 | | */ |
413 | | volatile sig_atomic_t notifyInterruptPending = false; |
414 | | |
415 | | /* True if we've registered an on_shmem_exit cleanup */ |
416 | | static bool unlistenExitRegistered = false; |
417 | | |
418 | | /* True if we're currently registered as a listener in asyncQueueControl */ |
419 | | static bool amRegisteredListener = false; |
420 | | |
421 | | /* have we advanced to a page that's a multiple of QUEUE_CLEANUP_DELAY? */ |
422 | | static bool tryAdvanceTail = false; |
423 | | |
424 | | /* GUC parameters */ |
425 | | bool Trace_notify = false; |
426 | | |
427 | | /* For 8 KB pages this gives 8 GB of disk space */ |
428 | | int max_notify_queue_pages = 1048576; |
429 | | |
430 | | /* local function prototypes */ |
431 | | static inline int64 asyncQueuePageDiff(int64 p, int64 q); |
432 | | static inline bool asyncQueuePagePrecedes(int64 p, int64 q); |
433 | | static void queue_listen(ListenActionKind action, const char *channel); |
434 | | static void Async_UnlistenOnExit(int code, Datum arg); |
435 | | static void Exec_ListenPreCommit(void); |
436 | | static void Exec_ListenCommit(const char *channel); |
437 | | static void Exec_UnlistenCommit(const char *channel); |
438 | | static void Exec_UnlistenAllCommit(void); |
439 | | static bool IsListeningOn(const char *channel); |
440 | | static void asyncQueueUnregister(void); |
441 | | static bool asyncQueueIsFull(void); |
442 | | static bool asyncQueueAdvance(volatile QueuePosition *position, int entryLength); |
443 | | static void asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe); |
444 | | static ListCell *asyncQueueAddEntries(ListCell *nextNotify); |
445 | | static double asyncQueueUsage(void); |
446 | | static void asyncQueueFillWarning(void); |
447 | | static void SignalBackends(void); |
448 | | static void asyncQueueReadAllNotifications(void); |
449 | | static bool asyncQueueProcessPageEntries(volatile QueuePosition *current, |
450 | | QueuePosition stop, |
451 | | char *page_buffer, |
452 | | Snapshot snapshot); |
453 | | static void asyncQueueAdvanceTail(void); |
454 | | static void ProcessIncomingNotify(bool flush); |
455 | | static bool AsyncExistsPendingNotify(Notification *n); |
456 | | static void AddEventToPendingNotifies(Notification *n); |
457 | | static uint32 notification_hash(const void *key, Size keysize); |
458 | | static int notification_match(const void *key1, const void *key2, Size keysize); |
459 | | static void ClearPendingActionsAndNotifies(void); |
460 | | |
461 | | /* |
462 | | * Compute the difference between two queue page numbers. |
463 | | * Previously this function accounted for a wraparound. |
464 | | */ |
465 | | static inline int64 |
466 | | asyncQueuePageDiff(int64 p, int64 q) |
467 | 0 | { |
468 | 0 | return p - q; |
469 | 0 | } |
470 | | |
471 | | /* |
472 | | * Determines whether p precedes q. |
473 | | * Previously this function accounted for a wraparound. |
474 | | */ |
475 | | static inline bool |
476 | | asyncQueuePagePrecedes(int64 p, int64 q) |
477 | 0 | { |
478 | 0 | return p < q; |
479 | 0 | } |
480 | | |
481 | | /* |
482 | | * Report space needed for our shared memory area |
483 | | */ |
484 | | Size |
485 | | AsyncShmemSize(void) |
486 | 0 | { |
487 | 0 | Size size; |
488 | | |
489 | | /* This had better match AsyncShmemInit */ |
490 | 0 | size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); |
491 | 0 | size = add_size(size, offsetof(AsyncQueueControl, backend)); |
492 | |
|
493 | 0 | size = add_size(size, SimpleLruShmemSize(notify_buffers, 0)); |
494 | |
|
495 | 0 | return size; |
496 | 0 | } |
497 | | |
498 | | /* |
499 | | * Initialize our shared memory area |
500 | | */ |
501 | | void |
502 | | AsyncShmemInit(void) |
503 | 0 | { |
504 | 0 | bool found; |
505 | 0 | Size size; |
506 | | |
507 | | /* |
508 | | * Create or attach to the AsyncQueueControl structure. |
509 | | */ |
510 | 0 | size = mul_size(MaxBackends, sizeof(QueueBackendStatus)); |
511 | 0 | size = add_size(size, offsetof(AsyncQueueControl, backend)); |
512 | |
|
513 | 0 | asyncQueueControl = (AsyncQueueControl *) |
514 | 0 | ShmemInitStruct("Async Queue Control", size, &found); |
515 | |
|
516 | 0 | if (!found) |
517 | 0 | { |
518 | | /* First time through, so initialize it */ |
519 | 0 | SET_QUEUE_POS(QUEUE_HEAD, 0, 0); |
520 | 0 | SET_QUEUE_POS(QUEUE_TAIL, 0, 0); |
521 | 0 | QUEUE_STOP_PAGE = 0; |
522 | 0 | QUEUE_FIRST_LISTENER = INVALID_PROC_NUMBER; |
523 | 0 | asyncQueueControl->lastQueueFillWarn = 0; |
524 | 0 | for (int i = 0; i < MaxBackends; i++) |
525 | 0 | { |
526 | 0 | QUEUE_BACKEND_PID(i) = InvalidPid; |
527 | 0 | QUEUE_BACKEND_DBOID(i) = InvalidOid; |
528 | 0 | QUEUE_NEXT_LISTENER(i) = INVALID_PROC_NUMBER; |
529 | 0 | SET_QUEUE_POS(QUEUE_BACKEND_POS(i), 0, 0); |
530 | 0 | } |
531 | 0 | } |
532 | | |
533 | | /* |
534 | | * Set up SLRU management of the pg_notify data. Note that long segment |
535 | | * names are used in order to avoid wraparound. |
536 | | */ |
537 | 0 | NotifyCtl->PagePrecedes = asyncQueuePagePrecedes; |
538 | 0 | SimpleLruInit(NotifyCtl, "notify", notify_buffers, 0, |
539 | 0 | "pg_notify", LWTRANCHE_NOTIFY_BUFFER, LWTRANCHE_NOTIFY_SLRU, |
540 | 0 | SYNC_HANDLER_NONE, true); |
541 | |
|
542 | 0 | if (!found) |
543 | 0 | { |
544 | | /* |
545 | | * During start or reboot, clean out the pg_notify directory. |
546 | | */ |
547 | 0 | (void) SlruScanDirectory(NotifyCtl, SlruScanDirCbDeleteAll, NULL); |
548 | 0 | } |
549 | 0 | } |
550 | | |
551 | | |
552 | | /* |
553 | | * pg_notify - |
554 | | * SQL function to send a notification event |
555 | | */ |
556 | | Datum |
557 | | pg_notify(PG_FUNCTION_ARGS) |
558 | 0 | { |
559 | 0 | const char *channel; |
560 | 0 | const char *payload; |
561 | |
|
562 | 0 | if (PG_ARGISNULL(0)) |
563 | 0 | channel = ""; |
564 | 0 | else |
565 | 0 | channel = text_to_cstring(PG_GETARG_TEXT_PP(0)); |
566 | |
|
567 | 0 | if (PG_ARGISNULL(1)) |
568 | 0 | payload = ""; |
569 | 0 | else |
570 | 0 | payload = text_to_cstring(PG_GETARG_TEXT_PP(1)); |
571 | | |
572 | | /* For NOTIFY as a statement, this is checked in ProcessUtility */ |
573 | 0 | PreventCommandDuringRecovery("NOTIFY"); |
574 | |
|
575 | 0 | Async_Notify(channel, payload); |
576 | |
|
577 | 0 | PG_RETURN_VOID(); |
578 | 0 | } |
579 | | |
580 | | |
581 | | /* |
582 | | * Async_Notify |
583 | | * |
584 | | * This is executed by the SQL notify command. |
585 | | * |
586 | | * Adds the message to the list of pending notifies. |
587 | | * Actual notification happens during transaction commit. |
588 | | * ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ |
589 | | */ |
590 | | void |
591 | | Async_Notify(const char *channel, const char *payload) |
592 | 0 | { |
593 | 0 | int my_level = GetCurrentTransactionNestLevel(); |
594 | 0 | size_t channel_len; |
595 | 0 | size_t payload_len; |
596 | 0 | Notification *n; |
597 | 0 | MemoryContext oldcontext; |
598 | |
|
599 | 0 | if (IsParallelWorker()) |
600 | 0 | elog(ERROR, "cannot send notifications from a parallel worker"); |
601 | | |
602 | 0 | if (Trace_notify) |
603 | 0 | elog(DEBUG1, "Async_Notify(%s)", channel); |
604 | | |
605 | 0 | channel_len = channel ? strlen(channel) : 0; |
606 | 0 | payload_len = payload ? strlen(payload) : 0; |
607 | | |
608 | | /* a channel name must be specified */ |
609 | 0 | if (channel_len == 0) |
610 | 0 | ereport(ERROR, |
611 | 0 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
612 | 0 | errmsg("channel name cannot be empty"))); |
613 | | |
614 | | /* enforce length limits */ |
615 | 0 | if (channel_len >= NAMEDATALEN) |
616 | 0 | ereport(ERROR, |
617 | 0 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
618 | 0 | errmsg("channel name too long"))); |
619 | | |
620 | 0 | if (payload_len >= NOTIFY_PAYLOAD_MAX_LENGTH) |
621 | 0 | ereport(ERROR, |
622 | 0 | (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
623 | 0 | errmsg("payload string too long"))); |
624 | | |
625 | | /* |
626 | | * We must construct the Notification entry, even if we end up not using |
627 | | * it, in order to compare it cheaply to existing list entries. |
628 | | * |
629 | | * The notification list needs to live until end of transaction, so store |
630 | | * it in the transaction context. |
631 | | */ |
632 | 0 | oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
633 | |
|
634 | 0 | n = (Notification *) palloc(offsetof(Notification, data) + |
635 | 0 | channel_len + payload_len + 2); |
636 | 0 | n->channel_len = channel_len; |
637 | 0 | n->payload_len = payload_len; |
638 | 0 | strcpy(n->data, channel); |
639 | 0 | if (payload) |
640 | 0 | strcpy(n->data + channel_len + 1, payload); |
641 | 0 | else |
642 | 0 | n->data[channel_len + 1] = '\0'; |
643 | |
|
644 | 0 | if (pendingNotifies == NULL || my_level > pendingNotifies->nestingLevel) |
645 | 0 | { |
646 | 0 | NotificationList *notifies; |
647 | | |
648 | | /* |
649 | | * First notify event in current (sub)xact. Note that we allocate the |
650 | | * NotificationList in TopTransactionContext; the nestingLevel might |
651 | | * get changed later by AtSubCommit_Notify. |
652 | | */ |
653 | 0 | notifies = (NotificationList *) |
654 | 0 | MemoryContextAlloc(TopTransactionContext, |
655 | 0 | sizeof(NotificationList)); |
656 | 0 | notifies->nestingLevel = my_level; |
657 | 0 | notifies->events = list_make1(n); |
658 | | /* We certainly don't need a hashtable yet */ |
659 | 0 | notifies->hashtab = NULL; |
660 | 0 | notifies->upper = pendingNotifies; |
661 | 0 | pendingNotifies = notifies; |
662 | 0 | } |
663 | 0 | else |
664 | 0 | { |
665 | | /* Now check for duplicates */ |
666 | 0 | if (AsyncExistsPendingNotify(n)) |
667 | 0 | { |
668 | | /* It's a dup, so forget it */ |
669 | 0 | pfree(n); |
670 | 0 | MemoryContextSwitchTo(oldcontext); |
671 | 0 | return; |
672 | 0 | } |
673 | | |
674 | | /* Append more events to existing list */ |
675 | 0 | AddEventToPendingNotifies(n); |
676 | 0 | } |
677 | | |
678 | 0 | MemoryContextSwitchTo(oldcontext); |
679 | 0 | } |
680 | | |
681 | | /* |
682 | | * queue_listen |
683 | | * Common code for listen, unlisten, unlisten all commands. |
684 | | * |
685 | | * Adds the request to the list of pending actions. |
686 | | * Actual update of the listenChannels list happens during transaction |
687 | | * commit. |
688 | | */ |
689 | | static void |
690 | | queue_listen(ListenActionKind action, const char *channel) |
691 | 0 | { |
692 | 0 | MemoryContext oldcontext; |
693 | 0 | ListenAction *actrec; |
694 | 0 | int my_level = GetCurrentTransactionNestLevel(); |
695 | | |
696 | | /* |
697 | | * Unlike Async_Notify, we don't try to collapse out duplicates. It would |
698 | | * be too complicated to ensure we get the right interactions of |
699 | | * conflicting LISTEN/UNLISTEN/UNLISTEN_ALL, and it's unlikely that there |
700 | | * would be any performance benefit anyway in sane applications. |
701 | | */ |
702 | 0 | oldcontext = MemoryContextSwitchTo(CurTransactionContext); |
703 | | |
704 | | /* space for terminating null is included in sizeof(ListenAction) */ |
705 | 0 | actrec = (ListenAction *) palloc(offsetof(ListenAction, channel) + |
706 | 0 | strlen(channel) + 1); |
707 | 0 | actrec->action = action; |
708 | 0 | strcpy(actrec->channel, channel); |
709 | |
|
710 | 0 | if (pendingActions == NULL || my_level > pendingActions->nestingLevel) |
711 | 0 | { |
712 | 0 | ActionList *actions; |
713 | | |
714 | | /* |
715 | | * First action in current sub(xact). Note that we allocate the |
716 | | * ActionList in TopTransactionContext; the nestingLevel might get |
717 | | * changed later by AtSubCommit_Notify. |
718 | | */ |
719 | 0 | actions = (ActionList *) |
720 | 0 | MemoryContextAlloc(TopTransactionContext, sizeof(ActionList)); |
721 | 0 | actions->nestingLevel = my_level; |
722 | 0 | actions->actions = list_make1(actrec); |
723 | 0 | actions->upper = pendingActions; |
724 | 0 | pendingActions = actions; |
725 | 0 | } |
726 | 0 | else |
727 | 0 | pendingActions->actions = lappend(pendingActions->actions, actrec); |
728 | |
|
729 | 0 | MemoryContextSwitchTo(oldcontext); |
730 | 0 | } |
731 | | |
732 | | /* |
733 | | * Async_Listen |
734 | | * |
735 | | * This is executed by the SQL listen command. |
736 | | */ |
737 | | void |
738 | | Async_Listen(const char *channel) |
739 | 0 | { |
740 | 0 | if (Trace_notify) |
741 | 0 | elog(DEBUG1, "Async_Listen(%s,%d)", channel, MyProcPid); |
742 | | |
743 | 0 | queue_listen(LISTEN_LISTEN, channel); |
744 | 0 | } |
745 | | |
746 | | /* |
747 | | * Async_Unlisten |
748 | | * |
749 | | * This is executed by the SQL unlisten command. |
750 | | */ |
751 | | void |
752 | | Async_Unlisten(const char *channel) |
753 | 0 | { |
754 | 0 | if (Trace_notify) |
755 | 0 | elog(DEBUG1, "Async_Unlisten(%s,%d)", channel, MyProcPid); |
756 | | |
757 | | /* If we couldn't possibly be listening, no need to queue anything */ |
758 | 0 | if (pendingActions == NULL && !unlistenExitRegistered) |
759 | 0 | return; |
760 | | |
761 | 0 | queue_listen(LISTEN_UNLISTEN, channel); |
762 | 0 | } |
763 | | |
764 | | /* |
765 | | * Async_UnlistenAll |
766 | | * |
767 | | * This is invoked by UNLISTEN * command, and also at backend exit. |
768 | | */ |
769 | | void |
770 | | Async_UnlistenAll(void) |
771 | 0 | { |
772 | 0 | if (Trace_notify) |
773 | 0 | elog(DEBUG1, "Async_UnlistenAll(%d)", MyProcPid); |
774 | | |
775 | | /* If we couldn't possibly be listening, no need to queue anything */ |
776 | 0 | if (pendingActions == NULL && !unlistenExitRegistered) |
777 | 0 | return; |
778 | | |
779 | 0 | queue_listen(LISTEN_UNLISTEN_ALL, ""); |
780 | 0 | } |
781 | | |
782 | | /* |
783 | | * SQL function: return a set of the channel names this backend is actively |
784 | | * listening to. |
785 | | * |
786 | | * Note: this coding relies on the fact that the listenChannels list cannot |
787 | | * change within a transaction. |
788 | | */ |
789 | | Datum |
790 | | pg_listening_channels(PG_FUNCTION_ARGS) |
791 | 0 | { |
792 | 0 | FuncCallContext *funcctx; |
793 | | |
794 | | /* stuff done only on the first call of the function */ |
795 | 0 | if (SRF_IS_FIRSTCALL()) |
796 | 0 | { |
797 | | /* create a function context for cross-call persistence */ |
798 | 0 | funcctx = SRF_FIRSTCALL_INIT(); |
799 | 0 | } |
800 | | |
801 | | /* stuff done on every call of the function */ |
802 | 0 | funcctx = SRF_PERCALL_SETUP(); |
803 | |
|
804 | 0 | if (funcctx->call_cntr < list_length(listenChannels)) |
805 | 0 | { |
806 | 0 | char *channel = (char *) list_nth(listenChannels, |
807 | 0 | funcctx->call_cntr); |
808 | |
|
809 | 0 | SRF_RETURN_NEXT(funcctx, CStringGetTextDatum(channel)); |
810 | 0 | } |
811 | | |
812 | 0 | SRF_RETURN_DONE(funcctx); |
813 | 0 | } |
814 | | |
815 | | /* |
816 | | * Async_UnlistenOnExit |
817 | | * |
818 | | * This is executed at backend exit if we have done any LISTENs in this |
819 | | * backend. It might not be necessary anymore, if the user UNLISTENed |
820 | | * everything, but we don't try to detect that case. |
821 | | */ |
822 | | static void |
823 | | Async_UnlistenOnExit(int code, Datum arg) |
824 | 0 | { |
825 | 0 | Exec_UnlistenAllCommit(); |
826 | 0 | asyncQueueUnregister(); |
827 | 0 | } |
828 | | |
829 | | /* |
830 | | * AtPrepare_Notify |
831 | | * |
832 | | * This is called at the prepare phase of a two-phase |
833 | | * transaction. Save the state for possible commit later. |
834 | | */ |
835 | | void |
836 | | AtPrepare_Notify(void) |
837 | 0 | { |
838 | | /* It's not allowed to have any pending LISTEN/UNLISTEN/NOTIFY actions */ |
839 | 0 | if (pendingActions || pendingNotifies) |
840 | 0 | ereport(ERROR, |
841 | 0 | (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
842 | 0 | errmsg("cannot PREPARE a transaction that has executed LISTEN, UNLISTEN, or NOTIFY"))); |
843 | 0 | } |
844 | | |
845 | | /* |
846 | | * PreCommit_Notify |
847 | | * |
848 | | * This is called at transaction commit, before actually committing to |
849 | | * clog. |
850 | | * |
851 | | * If there are pending LISTEN actions, make sure we are listed in the |
852 | | * shared-memory listener array. This must happen before commit to |
853 | | * ensure we don't miss any notifies from transactions that commit |
854 | | * just after ours. |
855 | | * |
856 | | * If there are outbound notify requests in the pendingNotifies list, |
857 | | * add them to the global queue. We do that before commit so that |
858 | | * we can still throw error if we run out of queue space. |
859 | | */ |
860 | | void |
861 | | PreCommit_Notify(void) |
862 | 0 | { |
863 | 0 | ListCell *p; |
864 | |
|
865 | 0 | if (!pendingActions && !pendingNotifies) |
866 | 0 | return; /* no relevant statements in this xact */ |
867 | | |
868 | 0 | if (Trace_notify) |
869 | 0 | elog(DEBUG1, "PreCommit_Notify"); |
870 | | |
871 | | /* Preflight for any pending listen/unlisten actions */ |
872 | 0 | if (pendingActions != NULL) |
873 | 0 | { |
874 | 0 | foreach(p, pendingActions->actions) |
875 | 0 | { |
876 | 0 | ListenAction *actrec = (ListenAction *) lfirst(p); |
877 | |
|
878 | 0 | switch (actrec->action) |
879 | 0 | { |
880 | 0 | case LISTEN_LISTEN: |
881 | 0 | Exec_ListenPreCommit(); |
882 | 0 | break; |
883 | 0 | case LISTEN_UNLISTEN: |
884 | | /* there is no Exec_UnlistenPreCommit() */ |
885 | 0 | break; |
886 | 0 | case LISTEN_UNLISTEN_ALL: |
887 | | /* there is no Exec_UnlistenAllPreCommit() */ |
888 | 0 | break; |
889 | 0 | } |
890 | 0 | } |
891 | 0 | } |
892 | | |
893 | | /* Queue any pending notifies (must happen after the above) */ |
894 | 0 | if (pendingNotifies) |
895 | 0 | { |
896 | 0 | ListCell *nextNotify; |
897 | | |
898 | | /* |
899 | | * Make sure that we have an XID assigned to the current transaction. |
900 | | * GetCurrentTransactionId is cheap if we already have an XID, but not |
901 | | * so cheap if we don't, and we'd prefer not to do that work while |
902 | | * holding NotifyQueueLock. |
903 | | */ |
904 | 0 | (void) GetCurrentTransactionId(); |
905 | | |
906 | | /* |
907 | | * Serialize writers by acquiring a special lock that we hold till |
908 | | * after commit. This ensures that queue entries appear in commit |
909 | | * order, and in particular that there are never uncommitted queue |
910 | | * entries ahead of committed ones, so an uncommitted transaction |
911 | | * can't block delivery of deliverable notifications. |
912 | | * |
913 | | * We use a heavyweight lock so that it'll automatically be released |
914 | | * after either commit or abort. This also allows deadlocks to be |
915 | | * detected, though really a deadlock shouldn't be possible here. |
916 | | * |
917 | | * The lock is on "database 0", which is pretty ugly but it doesn't |
918 | | * seem worth inventing a special locktag category just for this. |
919 | | * (Historical note: before PG 9.0, a similar lock on "database 0" was |
920 | | * used by the flatfiles mechanism.) |
921 | | */ |
922 | 0 | LockSharedObject(DatabaseRelationId, InvalidOid, 0, |
923 | 0 | AccessExclusiveLock); |
924 | | |
925 | | /* Now push the notifications into the queue */ |
926 | 0 | nextNotify = list_head(pendingNotifies->events); |
927 | 0 | while (nextNotify != NULL) |
928 | 0 | { |
929 | | /* |
930 | | * Add the pending notifications to the queue. We acquire and |
931 | | * release NotifyQueueLock once per page, which might be overkill |
932 | | * but it does allow readers to get in while we're doing this. |
933 | | * |
934 | | * A full queue is very uncommon and should really not happen, |
935 | | * given that we have so much space available in the SLRU pages. |
936 | | * Nevertheless we need to deal with this possibility. Note that |
937 | | * when we get here we are in the process of committing our |
938 | | * transaction, but we have not yet committed to clog, so at this |
939 | | * point in time we can still roll the transaction back. |
940 | | */ |
941 | 0 | LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
942 | 0 | asyncQueueFillWarning(); |
943 | 0 | if (asyncQueueIsFull()) |
944 | 0 | ereport(ERROR, |
945 | 0 | (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
946 | 0 | errmsg("too many notifications in the NOTIFY queue"))); |
947 | 0 | nextNotify = asyncQueueAddEntries(nextNotify); |
948 | 0 | LWLockRelease(NotifyQueueLock); |
949 | 0 | } |
950 | | |
951 | | /* Note that we don't clear pendingNotifies; AtCommit_Notify will. */ |
952 | 0 | } |
953 | 0 | } |
954 | | |
955 | | /* |
956 | | * AtCommit_Notify |
957 | | * |
958 | | * This is called at transaction commit, after committing to clog. |
959 | | * |
960 | | * Update listenChannels and clear transaction-local state. |
961 | | * |
962 | | * If we issued any notifications in the transaction, send signals to |
963 | | * listening backends (possibly including ourselves) to process them. |
964 | | * Also, if we filled enough queue pages with new notifies, try to |
965 | | * advance the queue tail pointer. |
966 | | */ |
967 | | void |
968 | | AtCommit_Notify(void) |
969 | 0 | { |
970 | 0 | ListCell *p; |
971 | | |
972 | | /* |
973 | | * Allow transactions that have not executed LISTEN/UNLISTEN/NOTIFY to |
974 | | * return as soon as possible |
975 | | */ |
976 | 0 | if (!pendingActions && !pendingNotifies) |
977 | 0 | return; |
978 | | |
979 | 0 | if (Trace_notify) |
980 | 0 | elog(DEBUG1, "AtCommit_Notify"); |
981 | | |
982 | | /* Perform any pending listen/unlisten actions */ |
983 | 0 | if (pendingActions != NULL) |
984 | 0 | { |
985 | 0 | foreach(p, pendingActions->actions) |
986 | 0 | { |
987 | 0 | ListenAction *actrec = (ListenAction *) lfirst(p); |
988 | |
|
989 | 0 | switch (actrec->action) |
990 | 0 | { |
991 | 0 | case LISTEN_LISTEN: |
992 | 0 | Exec_ListenCommit(actrec->channel); |
993 | 0 | break; |
994 | 0 | case LISTEN_UNLISTEN: |
995 | 0 | Exec_UnlistenCommit(actrec->channel); |
996 | 0 | break; |
997 | 0 | case LISTEN_UNLISTEN_ALL: |
998 | 0 | Exec_UnlistenAllCommit(); |
999 | 0 | break; |
1000 | 0 | } |
1001 | 0 | } |
1002 | 0 | } |
1003 | | |
1004 | | /* If no longer listening to anything, get out of listener array */ |
1005 | 0 | if (amRegisteredListener && listenChannels == NIL) |
1006 | 0 | asyncQueueUnregister(); |
1007 | | |
1008 | | /* |
1009 | | * Send signals to listening backends. We need do this only if there are |
1010 | | * pending notifies, which were previously added to the shared queue by |
1011 | | * PreCommit_Notify(). |
1012 | | */ |
1013 | 0 | if (pendingNotifies != NULL) |
1014 | 0 | SignalBackends(); |
1015 | | |
1016 | | /* |
1017 | | * If it's time to try to advance the global tail pointer, do that. |
1018 | | * |
1019 | | * (It might seem odd to do this in the sender, when more than likely the |
1020 | | * listeners won't yet have read the messages we just sent. However, |
1021 | | * there's less contention if only the sender does it, and there is little |
1022 | | * need for urgency in advancing the global tail. So this typically will |
1023 | | * be clearing out messages that were sent some time ago.) |
1024 | | */ |
1025 | 0 | if (tryAdvanceTail) |
1026 | 0 | { |
1027 | 0 | tryAdvanceTail = false; |
1028 | 0 | asyncQueueAdvanceTail(); |
1029 | 0 | } |
1030 | | |
1031 | | /* And clean up */ |
1032 | 0 | ClearPendingActionsAndNotifies(); |
1033 | 0 | } |
1034 | | |
1035 | | /* |
1036 | | * Exec_ListenPreCommit --- subroutine for PreCommit_Notify |
1037 | | * |
1038 | | * This function must make sure we are ready to catch any incoming messages. |
1039 | | */ |
1040 | | static void |
1041 | | Exec_ListenPreCommit(void) |
1042 | 0 | { |
1043 | 0 | QueuePosition head; |
1044 | 0 | QueuePosition max; |
1045 | 0 | ProcNumber prevListener; |
1046 | | |
1047 | | /* |
1048 | | * Nothing to do if we are already listening to something, nor if we |
1049 | | * already ran this routine in this transaction. |
1050 | | */ |
1051 | 0 | if (amRegisteredListener) |
1052 | 0 | return; |
1053 | | |
1054 | 0 | if (Trace_notify) |
1055 | 0 | elog(DEBUG1, "Exec_ListenPreCommit(%d)", MyProcPid); |
1056 | | |
1057 | | /* |
1058 | | * Before registering, make sure we will unlisten before dying. (Note: |
1059 | | * this action does not get undone if we abort later.) |
1060 | | */ |
1061 | 0 | if (!unlistenExitRegistered) |
1062 | 0 | { |
1063 | 0 | before_shmem_exit(Async_UnlistenOnExit, 0); |
1064 | 0 | unlistenExitRegistered = true; |
1065 | 0 | } |
1066 | | |
1067 | | /* |
1068 | | * This is our first LISTEN, so establish our pointer. |
1069 | | * |
1070 | | * We set our pointer to the global tail pointer and then move it forward |
1071 | | * over already-committed notifications. This ensures we cannot miss any |
1072 | | * not-yet-committed notifications. We might get a few more but that |
1073 | | * doesn't hurt. |
1074 | | * |
1075 | | * In some scenarios there might be a lot of committed notifications that |
1076 | | * have not yet been pruned away (because some backend is being lazy about |
1077 | | * reading them). To reduce our startup time, we can look at other |
1078 | | * backends and adopt the maximum "pos" pointer of any backend that's in |
1079 | | * our database; any notifications it's already advanced over are surely |
1080 | | * committed and need not be re-examined by us. (We must consider only |
1081 | | * backends connected to our DB, because others will not have bothered to |
1082 | | * check committed-ness of notifications in our DB.) |
1083 | | * |
1084 | | * We need exclusive lock here so we can look at other backends' entries |
1085 | | * and manipulate the list links. |
1086 | | */ |
1087 | 0 | LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
1088 | 0 | head = QUEUE_HEAD; |
1089 | 0 | max = QUEUE_TAIL; |
1090 | 0 | prevListener = INVALID_PROC_NUMBER; |
1091 | 0 | for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) |
1092 | 0 | { |
1093 | 0 | if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) |
1094 | 0 | max = QUEUE_POS_MAX(max, QUEUE_BACKEND_POS(i)); |
1095 | | /* Also find last listening backend before this one */ |
1096 | 0 | if (i < MyProcNumber) |
1097 | 0 | prevListener = i; |
1098 | 0 | } |
1099 | 0 | QUEUE_BACKEND_POS(MyProcNumber) = max; |
1100 | 0 | QUEUE_BACKEND_PID(MyProcNumber) = MyProcPid; |
1101 | 0 | QUEUE_BACKEND_DBOID(MyProcNumber) = MyDatabaseId; |
1102 | | /* Insert backend into list of listeners at correct position */ |
1103 | 0 | if (prevListener != INVALID_PROC_NUMBER) |
1104 | 0 | { |
1105 | 0 | QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_NEXT_LISTENER(prevListener); |
1106 | 0 | QUEUE_NEXT_LISTENER(prevListener) = MyProcNumber; |
1107 | 0 | } |
1108 | 0 | else |
1109 | 0 | { |
1110 | 0 | QUEUE_NEXT_LISTENER(MyProcNumber) = QUEUE_FIRST_LISTENER; |
1111 | 0 | QUEUE_FIRST_LISTENER = MyProcNumber; |
1112 | 0 | } |
1113 | 0 | LWLockRelease(NotifyQueueLock); |
1114 | | |
1115 | | /* Now we are listed in the global array, so remember we're listening */ |
1116 | 0 | amRegisteredListener = true; |
1117 | | |
1118 | | /* |
1119 | | * Try to move our pointer forward as far as possible. This will skip |
1120 | | * over already-committed notifications, which we want to do because they |
1121 | | * might be quite stale. Note that we are not yet listening on anything, |
1122 | | * so we won't deliver such notifications to our frontend. Also, although |
1123 | | * our transaction might have executed NOTIFY, those message(s) aren't |
1124 | | * queued yet so we won't skip them here. |
1125 | | */ |
1126 | 0 | if (!QUEUE_POS_EQUAL(max, head)) |
1127 | 0 | asyncQueueReadAllNotifications(); |
1128 | 0 | } |
1129 | | |
1130 | | /* |
1131 | | * Exec_ListenCommit --- subroutine for AtCommit_Notify |
1132 | | * |
1133 | | * Add the channel to the list of channels we are listening on. |
1134 | | */ |
1135 | | static void |
1136 | | Exec_ListenCommit(const char *channel) |
1137 | 0 | { |
1138 | 0 | MemoryContext oldcontext; |
1139 | | |
1140 | | /* Do nothing if we are already listening on this channel */ |
1141 | 0 | if (IsListeningOn(channel)) |
1142 | 0 | return; |
1143 | | |
1144 | | /* |
1145 | | * Add the new channel name to listenChannels. |
1146 | | * |
1147 | | * XXX It is theoretically possible to get an out-of-memory failure here, |
1148 | | * which would be bad because we already committed. For the moment it |
1149 | | * doesn't seem worth trying to guard against that, but maybe improve this |
1150 | | * later. |
1151 | | */ |
1152 | 0 | oldcontext = MemoryContextSwitchTo(TopMemoryContext); |
1153 | 0 | listenChannels = lappend(listenChannels, pstrdup(channel)); |
1154 | 0 | MemoryContextSwitchTo(oldcontext); |
1155 | 0 | } |
1156 | | |
1157 | | /* |
1158 | | * Exec_UnlistenCommit --- subroutine for AtCommit_Notify |
1159 | | * |
1160 | | * Remove the specified channel name from listenChannels. |
1161 | | */ |
1162 | | static void |
1163 | | Exec_UnlistenCommit(const char *channel) |
1164 | 0 | { |
1165 | 0 | ListCell *q; |
1166 | |
|
1167 | 0 | if (Trace_notify) |
1168 | 0 | elog(DEBUG1, "Exec_UnlistenCommit(%s,%d)", channel, MyProcPid); |
1169 | | |
1170 | 0 | foreach(q, listenChannels) |
1171 | 0 | { |
1172 | 0 | char *lchan = (char *) lfirst(q); |
1173 | |
|
1174 | 0 | if (strcmp(lchan, channel) == 0) |
1175 | 0 | { |
1176 | 0 | listenChannels = foreach_delete_current(listenChannels, q); |
1177 | 0 | pfree(lchan); |
1178 | 0 | break; |
1179 | 0 | } |
1180 | 0 | } |
1181 | | |
1182 | | /* |
1183 | | * We do not complain about unlistening something not being listened; |
1184 | | * should we? |
1185 | | */ |
1186 | 0 | } |
1187 | | |
1188 | | /* |
1189 | | * Exec_UnlistenAllCommit --- subroutine for AtCommit_Notify |
1190 | | * |
1191 | | * Unlisten on all channels for this backend. |
1192 | | */ |
1193 | | static void |
1194 | | Exec_UnlistenAllCommit(void) |
1195 | 0 | { |
1196 | 0 | if (Trace_notify) |
1197 | 0 | elog(DEBUG1, "Exec_UnlistenAllCommit(%d)", MyProcPid); |
1198 | | |
1199 | 0 | list_free_deep(listenChannels); |
1200 | 0 | listenChannels = NIL; |
1201 | 0 | } |
1202 | | |
1203 | | /* |
1204 | | * Test whether we are actively listening on the given channel name. |
1205 | | * |
1206 | | * Note: this function is executed for every notification found in the queue. |
1207 | | * Perhaps it is worth further optimization, eg convert the list to a sorted |
1208 | | * array so we can binary-search it. In practice the list is likely to be |
1209 | | * fairly short, though. |
1210 | | */ |
1211 | | static bool |
1212 | | IsListeningOn(const char *channel) |
1213 | 0 | { |
1214 | 0 | ListCell *p; |
1215 | |
|
1216 | 0 | foreach(p, listenChannels) |
1217 | 0 | { |
1218 | 0 | char *lchan = (char *) lfirst(p); |
1219 | |
|
1220 | 0 | if (strcmp(lchan, channel) == 0) |
1221 | 0 | return true; |
1222 | 0 | } |
1223 | 0 | return false; |
1224 | 0 | } |
1225 | | |
1226 | | /* |
1227 | | * Remove our entry from the listeners array when we are no longer listening |
1228 | | * on any channel. NB: must not fail if we're already not listening. |
1229 | | */ |
1230 | | static void |
1231 | | asyncQueueUnregister(void) |
1232 | 0 | { |
1233 | 0 | Assert(listenChannels == NIL); /* else caller error */ |
1234 | |
|
1235 | 0 | if (!amRegisteredListener) /* nothing to do */ |
1236 | 0 | return; |
1237 | | |
1238 | | /* |
1239 | | * Need exclusive lock here to manipulate list links. |
1240 | | */ |
1241 | 0 | LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
1242 | | /* Mark our entry as invalid */ |
1243 | 0 | QUEUE_BACKEND_PID(MyProcNumber) = InvalidPid; |
1244 | 0 | QUEUE_BACKEND_DBOID(MyProcNumber) = InvalidOid; |
1245 | | /* and remove it from the list */ |
1246 | 0 | if (QUEUE_FIRST_LISTENER == MyProcNumber) |
1247 | 0 | QUEUE_FIRST_LISTENER = QUEUE_NEXT_LISTENER(MyProcNumber); |
1248 | 0 | else |
1249 | 0 | { |
1250 | 0 | for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) |
1251 | 0 | { |
1252 | 0 | if (QUEUE_NEXT_LISTENER(i) == MyProcNumber) |
1253 | 0 | { |
1254 | 0 | QUEUE_NEXT_LISTENER(i) = QUEUE_NEXT_LISTENER(MyProcNumber); |
1255 | 0 | break; |
1256 | 0 | } |
1257 | 0 | } |
1258 | 0 | } |
1259 | 0 | QUEUE_NEXT_LISTENER(MyProcNumber) = INVALID_PROC_NUMBER; |
1260 | 0 | LWLockRelease(NotifyQueueLock); |
1261 | | |
1262 | | /* mark ourselves as no longer listed in the global array */ |
1263 | 0 | amRegisteredListener = false; |
1264 | 0 | } |
1265 | | |
1266 | | /* |
1267 | | * Test whether there is room to insert more notification messages. |
1268 | | * |
1269 | | * Caller must hold at least shared NotifyQueueLock. |
1270 | | */ |
1271 | | static bool |
1272 | | asyncQueueIsFull(void) |
1273 | 0 | { |
1274 | 0 | int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD); |
1275 | 0 | int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); |
1276 | 0 | int64 occupied = headPage - tailPage; |
1277 | |
|
1278 | 0 | return occupied >= max_notify_queue_pages; |
1279 | 0 | } |
1280 | | |
1281 | | /* |
1282 | | * Advance the QueuePosition to the next entry, assuming that the current |
1283 | | * entry is of length entryLength. If we jump to a new page the function |
1284 | | * returns true, else false. |
1285 | | */ |
1286 | | static bool |
1287 | | asyncQueueAdvance(volatile QueuePosition *position, int entryLength) |
1288 | 0 | { |
1289 | 0 | int64 pageno = QUEUE_POS_PAGE(*position); |
1290 | 0 | int offset = QUEUE_POS_OFFSET(*position); |
1291 | 0 | bool pageJump = false; |
1292 | | |
1293 | | /* |
1294 | | * Move to the next writing position: First jump over what we have just |
1295 | | * written or read. |
1296 | | */ |
1297 | 0 | offset += entryLength; |
1298 | 0 | Assert(offset <= QUEUE_PAGESIZE); |
1299 | | |
1300 | | /* |
1301 | | * In a second step check if another entry can possibly be written to the |
1302 | | * page. If so, stay here, we have reached the next position. If not, then |
1303 | | * we need to move on to the next page. |
1304 | | */ |
1305 | 0 | if (offset + QUEUEALIGN(AsyncQueueEntryEmptySize) > QUEUE_PAGESIZE) |
1306 | 0 | { |
1307 | 0 | pageno++; |
1308 | 0 | offset = 0; |
1309 | 0 | pageJump = true; |
1310 | 0 | } |
1311 | |
|
1312 | 0 | SET_QUEUE_POS(*position, pageno, offset); |
1313 | 0 | return pageJump; |
1314 | 0 | } |
1315 | | |
1316 | | /* |
1317 | | * Fill the AsyncQueueEntry at *qe with an outbound notification message. |
1318 | | */ |
1319 | | static void |
1320 | | asyncQueueNotificationToEntry(Notification *n, AsyncQueueEntry *qe) |
1321 | 0 | { |
1322 | 0 | size_t channellen = n->channel_len; |
1323 | 0 | size_t payloadlen = n->payload_len; |
1324 | 0 | int entryLength; |
1325 | |
|
1326 | 0 | Assert(channellen < NAMEDATALEN); |
1327 | 0 | Assert(payloadlen < NOTIFY_PAYLOAD_MAX_LENGTH); |
1328 | | |
1329 | | /* The terminators are already included in AsyncQueueEntryEmptySize */ |
1330 | 0 | entryLength = AsyncQueueEntryEmptySize + payloadlen + channellen; |
1331 | 0 | entryLength = QUEUEALIGN(entryLength); |
1332 | 0 | qe->length = entryLength; |
1333 | 0 | qe->dboid = MyDatabaseId; |
1334 | 0 | qe->xid = GetCurrentTransactionId(); |
1335 | 0 | qe->srcPid = MyProcPid; |
1336 | 0 | memcpy(qe->data, n->data, channellen + payloadlen + 2); |
1337 | 0 | } |
1338 | | |
1339 | | /* |
1340 | | * Add pending notifications to the queue. |
1341 | | * |
1342 | | * We go page by page here, i.e. we stop once we have to go to a new page but |
1343 | | * we will be called again and then fill that next page. If an entry does not |
1344 | | * fit into the current page, we write a dummy entry with an InvalidOid as the |
1345 | | * database OID in order to fill the page. So every page is always used up to |
1346 | | * the last byte which simplifies reading the page later. |
1347 | | * |
1348 | | * We are passed the list cell (in pendingNotifies->events) containing the next |
1349 | | * notification to write and return the first still-unwritten cell back. |
1350 | | * Eventually we will return NULL indicating all is done. |
1351 | | * |
1352 | | * We are holding NotifyQueueLock already from the caller and grab |
1353 | | * page specific SLRU bank lock locally in this function. |
1354 | | */ |
1355 | | static ListCell * |
1356 | | asyncQueueAddEntries(ListCell *nextNotify) |
1357 | 0 | { |
1358 | 0 | AsyncQueueEntry qe; |
1359 | 0 | QueuePosition queue_head; |
1360 | 0 | int64 pageno; |
1361 | 0 | int offset; |
1362 | 0 | int slotno; |
1363 | 0 | LWLock *prevlock; |
1364 | | |
1365 | | /* |
1366 | | * We work with a local copy of QUEUE_HEAD, which we write back to shared |
1367 | | * memory upon exiting. The reason for this is that if we have to advance |
1368 | | * to a new page, SimpleLruZeroPage might fail (out of disk space, for |
1369 | | * instance), and we must not advance QUEUE_HEAD if it does. (Otherwise, |
1370 | | * subsequent insertions would try to put entries into a page that slru.c |
1371 | | * thinks doesn't exist yet.) So, use a local position variable. Note |
1372 | | * that if we do fail, any already-inserted queue entries are forgotten; |
1373 | | * this is okay, since they'd be useless anyway after our transaction |
1374 | | * rolls back. |
1375 | | */ |
1376 | 0 | queue_head = QUEUE_HEAD; |
1377 | | |
1378 | | /* |
1379 | | * If this is the first write since the postmaster started, we need to |
1380 | | * initialize the first page of the async SLRU. Otherwise, the current |
1381 | | * page should be initialized already, so just fetch it. |
1382 | | */ |
1383 | 0 | pageno = QUEUE_POS_PAGE(queue_head); |
1384 | 0 | prevlock = SimpleLruGetBankLock(NotifyCtl, pageno); |
1385 | | |
1386 | | /* We hold both NotifyQueueLock and SLRU bank lock during this operation */ |
1387 | 0 | LWLockAcquire(prevlock, LW_EXCLUSIVE); |
1388 | |
|
1389 | 0 | if (QUEUE_POS_IS_ZERO(queue_head)) |
1390 | 0 | slotno = SimpleLruZeroPage(NotifyCtl, pageno); |
1391 | 0 | else |
1392 | 0 | slotno = SimpleLruReadPage(NotifyCtl, pageno, true, |
1393 | 0 | InvalidTransactionId); |
1394 | | |
1395 | | /* Note we mark the page dirty before writing in it */ |
1396 | 0 | NotifyCtl->shared->page_dirty[slotno] = true; |
1397 | |
|
1398 | 0 | while (nextNotify != NULL) |
1399 | 0 | { |
1400 | 0 | Notification *n = (Notification *) lfirst(nextNotify); |
1401 | | |
1402 | | /* Construct a valid queue entry in local variable qe */ |
1403 | 0 | asyncQueueNotificationToEntry(n, &qe); |
1404 | |
|
1405 | 0 | offset = QUEUE_POS_OFFSET(queue_head); |
1406 | | |
1407 | | /* Check whether the entry really fits on the current page */ |
1408 | 0 | if (offset + qe.length <= QUEUE_PAGESIZE) |
1409 | 0 | { |
1410 | | /* OK, so advance nextNotify past this item */ |
1411 | 0 | nextNotify = lnext(pendingNotifies->events, nextNotify); |
1412 | 0 | } |
1413 | 0 | else |
1414 | 0 | { |
1415 | | /* |
1416 | | * Write a dummy entry to fill up the page. Actually readers will |
1417 | | * only check dboid and since it won't match any reader's database |
1418 | | * OID, they will ignore this entry and move on. |
1419 | | */ |
1420 | 0 | qe.length = QUEUE_PAGESIZE - offset; |
1421 | 0 | qe.dboid = InvalidOid; |
1422 | 0 | qe.data[0] = '\0'; /* empty channel */ |
1423 | 0 | qe.data[1] = '\0'; /* empty payload */ |
1424 | 0 | } |
1425 | | |
1426 | | /* Now copy qe into the shared buffer page */ |
1427 | 0 | memcpy(NotifyCtl->shared->page_buffer[slotno] + offset, |
1428 | 0 | &qe, |
1429 | 0 | qe.length); |
1430 | | |
1431 | | /* Advance queue_head appropriately, and detect if page is full */ |
1432 | 0 | if (asyncQueueAdvance(&(queue_head), qe.length)) |
1433 | 0 | { |
1434 | 0 | LWLock *lock; |
1435 | |
|
1436 | 0 | pageno = QUEUE_POS_PAGE(queue_head); |
1437 | 0 | lock = SimpleLruGetBankLock(NotifyCtl, pageno); |
1438 | 0 | if (lock != prevlock) |
1439 | 0 | { |
1440 | 0 | LWLockRelease(prevlock); |
1441 | 0 | LWLockAcquire(lock, LW_EXCLUSIVE); |
1442 | 0 | prevlock = lock; |
1443 | 0 | } |
1444 | | |
1445 | | /* |
1446 | | * Page is full, so we're done here, but first fill the next page |
1447 | | * with zeroes. The reason to do this is to ensure that slru.c's |
1448 | | * idea of the head page is always the same as ours, which avoids |
1449 | | * boundary problems in SimpleLruTruncate. The test in |
1450 | | * asyncQueueIsFull() ensured that there is room to create this |
1451 | | * page without overrunning the queue. |
1452 | | */ |
1453 | 0 | slotno = SimpleLruZeroPage(NotifyCtl, QUEUE_POS_PAGE(queue_head)); |
1454 | | |
1455 | | /* |
1456 | | * If the new page address is a multiple of QUEUE_CLEANUP_DELAY, |
1457 | | * set flag to remember that we should try to advance the tail |
1458 | | * pointer (we don't want to actually do that right here). |
1459 | | */ |
1460 | 0 | if (QUEUE_POS_PAGE(queue_head) % QUEUE_CLEANUP_DELAY == 0) |
1461 | 0 | tryAdvanceTail = true; |
1462 | | |
1463 | | /* And exit the loop */ |
1464 | 0 | break; |
1465 | 0 | } |
1466 | 0 | } |
1467 | | |
1468 | | /* Success, so update the global QUEUE_HEAD */ |
1469 | 0 | QUEUE_HEAD = queue_head; |
1470 | |
|
1471 | 0 | LWLockRelease(prevlock); |
1472 | |
|
1473 | 0 | return nextNotify; |
1474 | 0 | } |
1475 | | |
1476 | | /* |
1477 | | * SQL function to return the fraction of the notification queue currently |
1478 | | * occupied. |
1479 | | */ |
1480 | | Datum |
1481 | | pg_notification_queue_usage(PG_FUNCTION_ARGS) |
1482 | 0 | { |
1483 | 0 | double usage; |
1484 | | |
1485 | | /* Advance the queue tail so we don't report a too-large result */ |
1486 | 0 | asyncQueueAdvanceTail(); |
1487 | |
|
1488 | 0 | LWLockAcquire(NotifyQueueLock, LW_SHARED); |
1489 | 0 | usage = asyncQueueUsage(); |
1490 | 0 | LWLockRelease(NotifyQueueLock); |
1491 | |
|
1492 | 0 | PG_RETURN_FLOAT8(usage); |
1493 | 0 | } |
1494 | | |
1495 | | /* |
1496 | | * Return the fraction of the queue that is currently occupied. |
1497 | | * |
1498 | | * The caller must hold NotifyQueueLock in (at least) shared mode. |
1499 | | * |
1500 | | * Note: we measure the distance to the logical tail page, not the physical |
1501 | | * tail page. In some sense that's wrong, but the relative position of the |
1502 | | * physical tail is affected by details such as SLRU segment boundaries, |
1503 | | * so that a result based on that is unpleasantly unstable. |
1504 | | */ |
1505 | | static double |
1506 | | asyncQueueUsage(void) |
1507 | 0 | { |
1508 | 0 | int64 headPage = QUEUE_POS_PAGE(QUEUE_HEAD); |
1509 | 0 | int64 tailPage = QUEUE_POS_PAGE(QUEUE_TAIL); |
1510 | 0 | int64 occupied = headPage - tailPage; |
1511 | |
|
1512 | 0 | if (occupied == 0) |
1513 | 0 | return (double) 0; /* fast exit for common case */ |
1514 | | |
1515 | 0 | return (double) occupied / (double) max_notify_queue_pages; |
1516 | 0 | } |
1517 | | |
1518 | | /* |
1519 | | * Check whether the queue is at least half full, and emit a warning if so. |
1520 | | * |
1521 | | * This is unlikely given the size of the queue, but possible. |
1522 | | * The warnings show up at most once every QUEUE_FULL_WARN_INTERVAL. |
1523 | | * |
1524 | | * Caller must hold exclusive NotifyQueueLock. |
1525 | | */ |
1526 | | static void |
1527 | | asyncQueueFillWarning(void) |
1528 | 0 | { |
1529 | 0 | double fillDegree; |
1530 | 0 | TimestampTz t; |
1531 | |
|
1532 | 0 | fillDegree = asyncQueueUsage(); |
1533 | 0 | if (fillDegree < 0.5) |
1534 | 0 | return; |
1535 | | |
1536 | 0 | t = GetCurrentTimestamp(); |
1537 | |
|
1538 | 0 | if (TimestampDifferenceExceeds(asyncQueueControl->lastQueueFillWarn, |
1539 | 0 | t, QUEUE_FULL_WARN_INTERVAL)) |
1540 | 0 | { |
1541 | 0 | QueuePosition min = QUEUE_HEAD; |
1542 | 0 | int32 minPid = InvalidPid; |
1543 | |
|
1544 | 0 | for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) |
1545 | 0 | { |
1546 | 0 | Assert(QUEUE_BACKEND_PID(i) != InvalidPid); |
1547 | 0 | min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); |
1548 | 0 | if (QUEUE_POS_EQUAL(min, QUEUE_BACKEND_POS(i))) |
1549 | 0 | minPid = QUEUE_BACKEND_PID(i); |
1550 | 0 | } |
1551 | |
|
1552 | 0 | ereport(WARNING, |
1553 | 0 | (errmsg("NOTIFY queue is %.0f%% full", fillDegree * 100), |
1554 | 0 | (minPid != InvalidPid ? |
1555 | 0 | errdetail("The server process with PID %d is among those with the oldest transactions.", minPid) |
1556 | 0 | : 0), |
1557 | 0 | (minPid != InvalidPid ? |
1558 | 0 | errhint("The NOTIFY queue cannot be emptied until that process ends its current transaction.") |
1559 | 0 | : 0))); |
1560 | | |
1561 | 0 | asyncQueueControl->lastQueueFillWarn = t; |
1562 | 0 | } |
1563 | 0 | } |
1564 | | |
1565 | | /* |
1566 | | * Send signals to listening backends. |
1567 | | * |
1568 | | * Normally we signal only backends in our own database, since only those |
1569 | | * backends could be interested in notifies we send. However, if there's |
1570 | | * notify traffic in our database but no traffic in another database that |
1571 | | * does have listener(s), those listeners will fall further and further |
1572 | | * behind. Waken them anyway if they're far enough behind, so that they'll |
1573 | | * advance their queue position pointers, allowing the global tail to advance. |
1574 | | * |
1575 | | * Since we know the ProcNumber and the Pid the signaling is quite cheap. |
1576 | | * |
1577 | | * This is called during CommitTransaction(), so it's important for it |
1578 | | * to have very low probability of failure. |
1579 | | */ |
1580 | | static void |
1581 | | SignalBackends(void) |
1582 | 0 | { |
1583 | 0 | int32 *pids; |
1584 | 0 | ProcNumber *procnos; |
1585 | 0 | int count; |
1586 | | |
1587 | | /* |
1588 | | * Identify backends that we need to signal. We don't want to send |
1589 | | * signals while holding the NotifyQueueLock, so this loop just builds a |
1590 | | * list of target PIDs. |
1591 | | * |
1592 | | * XXX in principle these pallocs could fail, which would be bad. Maybe |
1593 | | * preallocate the arrays? They're not that large, though. |
1594 | | */ |
1595 | 0 | pids = (int32 *) palloc(MaxBackends * sizeof(int32)); |
1596 | 0 | procnos = (ProcNumber *) palloc(MaxBackends * sizeof(ProcNumber)); |
1597 | 0 | count = 0; |
1598 | |
|
1599 | 0 | LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
1600 | 0 | for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) |
1601 | 0 | { |
1602 | 0 | int32 pid = QUEUE_BACKEND_PID(i); |
1603 | 0 | QueuePosition pos; |
1604 | |
|
1605 | 0 | Assert(pid != InvalidPid); |
1606 | 0 | pos = QUEUE_BACKEND_POS(i); |
1607 | 0 | if (QUEUE_BACKEND_DBOID(i) == MyDatabaseId) |
1608 | 0 | { |
1609 | | /* |
1610 | | * Always signal listeners in our own database, unless they're |
1611 | | * already caught up (unlikely, but possible). |
1612 | | */ |
1613 | 0 | if (QUEUE_POS_EQUAL(pos, QUEUE_HEAD)) |
1614 | 0 | continue; |
1615 | 0 | } |
1616 | 0 | else |
1617 | 0 | { |
1618 | | /* |
1619 | | * Listeners in other databases should be signaled only if they |
1620 | | * are far behind. |
1621 | | */ |
1622 | 0 | if (asyncQueuePageDiff(QUEUE_POS_PAGE(QUEUE_HEAD), |
1623 | 0 | QUEUE_POS_PAGE(pos)) < QUEUE_CLEANUP_DELAY) |
1624 | 0 | continue; |
1625 | 0 | } |
1626 | | /* OK, need to signal this one */ |
1627 | 0 | pids[count] = pid; |
1628 | 0 | procnos[count] = i; |
1629 | 0 | count++; |
1630 | 0 | } |
1631 | 0 | LWLockRelease(NotifyQueueLock); |
1632 | | |
1633 | | /* Now send signals */ |
1634 | 0 | for (int i = 0; i < count; i++) |
1635 | 0 | { |
1636 | 0 | int32 pid = pids[i]; |
1637 | | |
1638 | | /* |
1639 | | * If we are signaling our own process, no need to involve the kernel; |
1640 | | * just set the flag directly. |
1641 | | */ |
1642 | 0 | if (pid == MyProcPid) |
1643 | 0 | { |
1644 | 0 | notifyInterruptPending = true; |
1645 | 0 | continue; |
1646 | 0 | } |
1647 | | |
1648 | | /* |
1649 | | * Note: assuming things aren't broken, a signal failure here could |
1650 | | * only occur if the target backend exited since we released |
1651 | | * NotifyQueueLock; which is unlikely but certainly possible. So we |
1652 | | * just log a low-level debug message if it happens. |
1653 | | */ |
1654 | 0 | if (SendProcSignal(pid, PROCSIG_NOTIFY_INTERRUPT, procnos[i]) < 0) |
1655 | 0 | elog(DEBUG3, "could not signal backend with PID %d: %m", pid); |
1656 | 0 | } |
1657 | | |
1658 | 0 | pfree(pids); |
1659 | 0 | pfree(procnos); |
1660 | 0 | } |
1661 | | |
1662 | | /* |
1663 | | * AtAbort_Notify |
1664 | | * |
1665 | | * This is called at transaction abort. |
1666 | | * |
1667 | | * Gets rid of pending actions and outbound notifies that we would have |
1668 | | * executed if the transaction got committed. |
1669 | | */ |
1670 | | void |
1671 | | AtAbort_Notify(void) |
1672 | 0 | { |
1673 | | /* |
1674 | | * If we LISTEN but then roll back the transaction after PreCommit_Notify, |
1675 | | * we have registered as a listener but have not made any entry in |
1676 | | * listenChannels. In that case, deregister again. |
1677 | | */ |
1678 | 0 | if (amRegisteredListener && listenChannels == NIL) |
1679 | 0 | asyncQueueUnregister(); |
1680 | | |
1681 | | /* And clean up */ |
1682 | 0 | ClearPendingActionsAndNotifies(); |
1683 | 0 | } |
1684 | | |
1685 | | /* |
1686 | | * AtSubCommit_Notify() --- Take care of subtransaction commit. |
1687 | | * |
1688 | | * Reassign all items in the pending lists to the parent transaction. |
1689 | | */ |
1690 | | void |
1691 | | AtSubCommit_Notify(void) |
1692 | 0 | { |
1693 | 0 | int my_level = GetCurrentTransactionNestLevel(); |
1694 | | |
1695 | | /* If there are actions at our nesting level, we must reparent them. */ |
1696 | 0 | if (pendingActions != NULL && |
1697 | 0 | pendingActions->nestingLevel >= my_level) |
1698 | 0 | { |
1699 | 0 | if (pendingActions->upper == NULL || |
1700 | 0 | pendingActions->upper->nestingLevel < my_level - 1) |
1701 | 0 | { |
1702 | | /* nothing to merge; give the whole thing to the parent */ |
1703 | 0 | --pendingActions->nestingLevel; |
1704 | 0 | } |
1705 | 0 | else |
1706 | 0 | { |
1707 | 0 | ActionList *childPendingActions = pendingActions; |
1708 | |
|
1709 | 0 | pendingActions = pendingActions->upper; |
1710 | | |
1711 | | /* |
1712 | | * Mustn't try to eliminate duplicates here --- see queue_listen() |
1713 | | */ |
1714 | 0 | pendingActions->actions = |
1715 | 0 | list_concat(pendingActions->actions, |
1716 | 0 | childPendingActions->actions); |
1717 | 0 | pfree(childPendingActions); |
1718 | 0 | } |
1719 | 0 | } |
1720 | | |
1721 | | /* If there are notifies at our nesting level, we must reparent them. */ |
1722 | 0 | if (pendingNotifies != NULL && |
1723 | 0 | pendingNotifies->nestingLevel >= my_level) |
1724 | 0 | { |
1725 | 0 | Assert(pendingNotifies->nestingLevel == my_level); |
1726 | |
|
1727 | 0 | if (pendingNotifies->upper == NULL || |
1728 | 0 | pendingNotifies->upper->nestingLevel < my_level - 1) |
1729 | 0 | { |
1730 | | /* nothing to merge; give the whole thing to the parent */ |
1731 | 0 | --pendingNotifies->nestingLevel; |
1732 | 0 | } |
1733 | 0 | else |
1734 | 0 | { |
1735 | | /* |
1736 | | * Formerly, we didn't bother to eliminate duplicates here, but |
1737 | | * now we must, else we fall foul of "Assert(!found)", either here |
1738 | | * or during a later attempt to build the parent-level hashtable. |
1739 | | */ |
1740 | 0 | NotificationList *childPendingNotifies = pendingNotifies; |
1741 | 0 | ListCell *l; |
1742 | |
|
1743 | 0 | pendingNotifies = pendingNotifies->upper; |
1744 | | /* Insert all the subxact's events into parent, except for dups */ |
1745 | 0 | foreach(l, childPendingNotifies->events) |
1746 | 0 | { |
1747 | 0 | Notification *childn = (Notification *) lfirst(l); |
1748 | |
|
1749 | 0 | if (!AsyncExistsPendingNotify(childn)) |
1750 | 0 | AddEventToPendingNotifies(childn); |
1751 | 0 | } |
1752 | 0 | pfree(childPendingNotifies); |
1753 | 0 | } |
1754 | 0 | } |
1755 | 0 | } |
1756 | | |
1757 | | /* |
1758 | | * AtSubAbort_Notify() --- Take care of subtransaction abort. |
1759 | | */ |
1760 | | void |
1761 | | AtSubAbort_Notify(void) |
1762 | 0 | { |
1763 | 0 | int my_level = GetCurrentTransactionNestLevel(); |
1764 | | |
1765 | | /* |
1766 | | * All we have to do is pop the stack --- the actions/notifies made in |
1767 | | * this subxact are no longer interesting, and the space will be freed |
1768 | | * when CurTransactionContext is recycled. We still have to free the |
1769 | | * ActionList and NotificationList objects themselves, though, because |
1770 | | * those are allocated in TopTransactionContext. |
1771 | | * |
1772 | | * Note that there might be no entries at all, or no entries for the |
1773 | | * current subtransaction level, either because none were ever created, or |
1774 | | * because we reentered this routine due to trouble during subxact abort. |
1775 | | */ |
1776 | 0 | while (pendingActions != NULL && |
1777 | 0 | pendingActions->nestingLevel >= my_level) |
1778 | 0 | { |
1779 | 0 | ActionList *childPendingActions = pendingActions; |
1780 | |
|
1781 | 0 | pendingActions = pendingActions->upper; |
1782 | 0 | pfree(childPendingActions); |
1783 | 0 | } |
1784 | |
|
1785 | 0 | while (pendingNotifies != NULL && |
1786 | 0 | pendingNotifies->nestingLevel >= my_level) |
1787 | 0 | { |
1788 | 0 | NotificationList *childPendingNotifies = pendingNotifies; |
1789 | |
|
1790 | 0 | pendingNotifies = pendingNotifies->upper; |
1791 | 0 | pfree(childPendingNotifies); |
1792 | 0 | } |
1793 | 0 | } |
1794 | | |
1795 | | /* |
1796 | | * HandleNotifyInterrupt |
1797 | | * |
1798 | | * Signal handler portion of interrupt handling. Let the backend know |
1799 | | * that there's a pending notify interrupt. If we're currently reading |
1800 | | * from the client, this will interrupt the read and |
1801 | | * ProcessClientReadInterrupt() will call ProcessNotifyInterrupt(). |
1802 | | */ |
1803 | | void |
1804 | | HandleNotifyInterrupt(void) |
1805 | 0 | { |
1806 | | /* |
1807 | | * Note: this is called by a SIGNAL HANDLER. You must be very wary what |
1808 | | * you do here. |
1809 | | */ |
1810 | | |
1811 | | /* signal that work needs to be done */ |
1812 | 0 | notifyInterruptPending = true; |
1813 | | |
1814 | | /* make sure the event is processed in due course */ |
1815 | 0 | SetLatch(MyLatch); |
1816 | 0 | } |
1817 | | |
1818 | | /* |
1819 | | * ProcessNotifyInterrupt |
1820 | | * |
1821 | | * This is called if we see notifyInterruptPending set, just before |
1822 | | * transmitting ReadyForQuery at the end of a frontend command, and |
1823 | | * also if a notify signal occurs while reading from the frontend. |
1824 | | * HandleNotifyInterrupt() will cause the read to be interrupted |
1825 | | * via the process's latch, and this routine will get called. |
1826 | | * If we are truly idle (ie, *not* inside a transaction block), |
1827 | | * process the incoming notifies. |
1828 | | * |
1829 | | * If "flush" is true, force any frontend messages out immediately. |
1830 | | * This can be false when being called at the end of a frontend command, |
1831 | | * since we'll flush after sending ReadyForQuery. |
1832 | | */ |
1833 | | void |
1834 | | ProcessNotifyInterrupt(bool flush) |
1835 | 0 | { |
1836 | 0 | if (IsTransactionOrTransactionBlock()) |
1837 | 0 | return; /* not really idle */ |
1838 | | |
1839 | | /* Loop in case another signal arrives while sending messages */ |
1840 | 0 | while (notifyInterruptPending) |
1841 | 0 | ProcessIncomingNotify(flush); |
1842 | 0 | } |
1843 | | |
1844 | | |
1845 | | /* |
1846 | | * Read all pending notifications from the queue, and deliver appropriate |
1847 | | * ones to my frontend. Stop when we reach queue head or an uncommitted |
1848 | | * notification. |
1849 | | */ |
1850 | | static void |
1851 | | asyncQueueReadAllNotifications(void) |
1852 | 0 | { |
1853 | 0 | volatile QueuePosition pos; |
1854 | 0 | QueuePosition head; |
1855 | 0 | Snapshot snapshot; |
1856 | | |
1857 | | /* page_buffer must be adequately aligned, so use a union */ |
1858 | 0 | union |
1859 | 0 | { |
1860 | 0 | char buf[QUEUE_PAGESIZE]; |
1861 | 0 | AsyncQueueEntry align; |
1862 | 0 | } page_buffer; |
1863 | | |
1864 | | /* Fetch current state */ |
1865 | 0 | LWLockAcquire(NotifyQueueLock, LW_SHARED); |
1866 | | /* Assert checks that we have a valid state entry */ |
1867 | 0 | Assert(MyProcPid == QUEUE_BACKEND_PID(MyProcNumber)); |
1868 | 0 | pos = QUEUE_BACKEND_POS(MyProcNumber); |
1869 | 0 | head = QUEUE_HEAD; |
1870 | 0 | LWLockRelease(NotifyQueueLock); |
1871 | |
|
1872 | 0 | if (QUEUE_POS_EQUAL(pos, head)) |
1873 | 0 | { |
1874 | | /* Nothing to do, we have read all notifications already. */ |
1875 | 0 | return; |
1876 | 0 | } |
1877 | | |
1878 | | /*---------- |
1879 | | * Get snapshot we'll use to decide which xacts are still in progress. |
1880 | | * This is trickier than it might seem, because of race conditions. |
1881 | | * Consider the following example: |
1882 | | * |
1883 | | * Backend 1: Backend 2: |
1884 | | * |
1885 | | * transaction starts |
1886 | | * UPDATE foo SET ...; |
1887 | | * NOTIFY foo; |
1888 | | * commit starts |
1889 | | * queue the notify message |
1890 | | * transaction starts |
1891 | | * LISTEN foo; -- first LISTEN in session |
1892 | | * SELECT * FROM foo WHERE ...; |
1893 | | * commit to clog |
1894 | | * commit starts |
1895 | | * add backend 2 to array of listeners |
1896 | | * advance to queue head (this code) |
1897 | | * commit to clog |
1898 | | * |
1899 | | * Transaction 2's SELECT has not seen the UPDATE's effects, since that |
1900 | | * wasn't committed yet. Ideally we'd ensure that client 2 would |
1901 | | * eventually get transaction 1's notify message, but there's no way |
1902 | | * to do that; until we're in the listener array, there's no guarantee |
1903 | | * that the notify message doesn't get removed from the queue. |
1904 | | * |
1905 | | * Therefore the coding technique transaction 2 is using is unsafe: |
1906 | | * applications must commit a LISTEN before inspecting database state, |
1907 | | * if they want to ensure they will see notifications about subsequent |
1908 | | * changes to that state. |
1909 | | * |
1910 | | * What we do guarantee is that we'll see all notifications from |
1911 | | * transactions committing after the snapshot we take here. |
1912 | | * Exec_ListenPreCommit has already added us to the listener array, |
1913 | | * so no not-yet-committed messages can be removed from the queue |
1914 | | * before we see them. |
1915 | | *---------- |
1916 | | */ |
1917 | 0 | snapshot = RegisterSnapshot(GetLatestSnapshot()); |
1918 | | |
1919 | | /* |
1920 | | * It is possible that we fail while trying to send a message to our |
1921 | | * frontend (for example, because of encoding conversion failure). If |
1922 | | * that happens it is critical that we not try to send the same message |
1923 | | * over and over again. Therefore, we place a PG_TRY block here that will |
1924 | | * forcibly advance our queue position before we lose control to an error. |
1925 | | * (We could alternatively retake NotifyQueueLock and move the position |
1926 | | * before handling each individual message, but that seems like too much |
1927 | | * lock traffic.) |
1928 | | */ |
1929 | 0 | PG_TRY(); |
1930 | 0 | { |
1931 | 0 | bool reachedStop; |
1932 | |
|
1933 | 0 | do |
1934 | 0 | { |
1935 | 0 | int64 curpage = QUEUE_POS_PAGE(pos); |
1936 | 0 | int curoffset = QUEUE_POS_OFFSET(pos); |
1937 | 0 | int slotno; |
1938 | 0 | int copysize; |
1939 | | |
1940 | | /* |
1941 | | * We copy the data from SLRU into a local buffer, so as to avoid |
1942 | | * holding the SLRU lock while we are examining the entries and |
1943 | | * possibly transmitting them to our frontend. Copy only the part |
1944 | | * of the page we will actually inspect. |
1945 | | */ |
1946 | 0 | slotno = SimpleLruReadPage_ReadOnly(NotifyCtl, curpage, |
1947 | 0 | InvalidTransactionId); |
1948 | 0 | if (curpage == QUEUE_POS_PAGE(head)) |
1949 | 0 | { |
1950 | | /* we only want to read as far as head */ |
1951 | 0 | copysize = QUEUE_POS_OFFSET(head) - curoffset; |
1952 | 0 | if (copysize < 0) |
1953 | 0 | copysize = 0; /* just for safety */ |
1954 | 0 | } |
1955 | 0 | else |
1956 | 0 | { |
1957 | | /* fetch all the rest of the page */ |
1958 | 0 | copysize = QUEUE_PAGESIZE - curoffset; |
1959 | 0 | } |
1960 | 0 | memcpy(page_buffer.buf + curoffset, |
1961 | 0 | NotifyCtl->shared->page_buffer[slotno] + curoffset, |
1962 | 0 | copysize); |
1963 | | /* Release lock that we got from SimpleLruReadPage_ReadOnly() */ |
1964 | 0 | LWLockRelease(SimpleLruGetBankLock(NotifyCtl, curpage)); |
1965 | | |
1966 | | /* |
1967 | | * Process messages up to the stop position, end of page, or an |
1968 | | * uncommitted message. |
1969 | | * |
1970 | | * Our stop position is what we found to be the head's position |
1971 | | * when we entered this function. It might have changed already. |
1972 | | * But if it has, we will receive (or have already received and |
1973 | | * queued) another signal and come here again. |
1974 | | * |
1975 | | * We are not holding NotifyQueueLock here! The queue can only |
1976 | | * extend beyond the head pointer (see above) and we leave our |
1977 | | * backend's pointer where it is so nobody will truncate or |
1978 | | * rewrite pages under us. Especially we don't want to hold a lock |
1979 | | * while sending the notifications to the frontend. |
1980 | | */ |
1981 | 0 | reachedStop = asyncQueueProcessPageEntries(&pos, head, |
1982 | 0 | page_buffer.buf, |
1983 | 0 | snapshot); |
1984 | 0 | } while (!reachedStop); |
1985 | 0 | } |
1986 | 0 | PG_FINALLY(); |
1987 | 0 | { |
1988 | | /* Update shared state */ |
1989 | 0 | LWLockAcquire(NotifyQueueLock, LW_SHARED); |
1990 | 0 | QUEUE_BACKEND_POS(MyProcNumber) = pos; |
1991 | 0 | LWLockRelease(NotifyQueueLock); |
1992 | 0 | } |
1993 | 0 | PG_END_TRY(); |
1994 | | |
1995 | | /* Done with snapshot */ |
1996 | 0 | UnregisterSnapshot(snapshot); |
1997 | 0 | } |
1998 | | |
1999 | | /* |
2000 | | * Fetch notifications from the shared queue, beginning at position current, |
2001 | | * and deliver relevant ones to my frontend. |
2002 | | * |
2003 | | * The current page must have been fetched into page_buffer from shared |
2004 | | * memory. (We could access the page right in shared memory, but that |
2005 | | * would imply holding the SLRU bank lock throughout this routine.) |
2006 | | * |
2007 | | * We stop if we reach the "stop" position, or reach a notification from an |
2008 | | * uncommitted transaction, or reach the end of the page. |
2009 | | * |
2010 | | * The function returns true once we have reached the stop position or an |
2011 | | * uncommitted notification, and false if we have finished with the page. |
2012 | | * In other words: once it returns true there is no need to look further. |
2013 | | * The QueuePosition *current is advanced past all processed messages. |
2014 | | */ |
2015 | | static bool |
2016 | | asyncQueueProcessPageEntries(volatile QueuePosition *current, |
2017 | | QueuePosition stop, |
2018 | | char *page_buffer, |
2019 | | Snapshot snapshot) |
2020 | 0 | { |
2021 | 0 | bool reachedStop = false; |
2022 | 0 | bool reachedEndOfPage; |
2023 | 0 | AsyncQueueEntry *qe; |
2024 | |
|
2025 | 0 | do |
2026 | 0 | { |
2027 | 0 | QueuePosition thisentry = *current; |
2028 | |
|
2029 | 0 | if (QUEUE_POS_EQUAL(thisentry, stop)) |
2030 | 0 | break; |
2031 | | |
2032 | 0 | qe = (AsyncQueueEntry *) (page_buffer + QUEUE_POS_OFFSET(thisentry)); |
2033 | | |
2034 | | /* |
2035 | | * Advance *current over this message, possibly to the next page. As |
2036 | | * noted in the comments for asyncQueueReadAllNotifications, we must |
2037 | | * do this before possibly failing while processing the message. |
2038 | | */ |
2039 | 0 | reachedEndOfPage = asyncQueueAdvance(current, qe->length); |
2040 | | |
2041 | | /* Ignore messages destined for other databases */ |
2042 | 0 | if (qe->dboid == MyDatabaseId) |
2043 | 0 | { |
2044 | 0 | if (XidInMVCCSnapshot(qe->xid, snapshot)) |
2045 | 0 | { |
2046 | | /* |
2047 | | * The source transaction is still in progress, so we can't |
2048 | | * process this message yet. Break out of the loop, but first |
2049 | | * back up *current so we will reprocess the message next |
2050 | | * time. (Note: it is unlikely but not impossible for |
2051 | | * TransactionIdDidCommit to fail, so we can't really avoid |
2052 | | * this advance-then-back-up behavior when dealing with an |
2053 | | * uncommitted message.) |
2054 | | * |
2055 | | * Note that we must test XidInMVCCSnapshot before we test |
2056 | | * TransactionIdDidCommit, else we might return a message from |
2057 | | * a transaction that is not yet visible to snapshots; compare |
2058 | | * the comments at the head of heapam_visibility.c. |
2059 | | * |
2060 | | * Also, while our own xact won't be listed in the snapshot, |
2061 | | * we need not check for TransactionIdIsCurrentTransactionId |
2062 | | * because our transaction cannot (yet) have queued any |
2063 | | * messages. |
2064 | | */ |
2065 | 0 | *current = thisentry; |
2066 | 0 | reachedStop = true; |
2067 | 0 | break; |
2068 | 0 | } |
2069 | 0 | else if (TransactionIdDidCommit(qe->xid)) |
2070 | 0 | { |
2071 | | /* qe->data is the null-terminated channel name */ |
2072 | 0 | char *channel = qe->data; |
2073 | |
|
2074 | 0 | if (IsListeningOn(channel)) |
2075 | 0 | { |
2076 | | /* payload follows channel name */ |
2077 | 0 | char *payload = qe->data + strlen(channel) + 1; |
2078 | |
|
2079 | 0 | NotifyMyFrontEnd(channel, payload, qe->srcPid); |
2080 | 0 | } |
2081 | 0 | } |
2082 | 0 | else |
2083 | 0 | { |
2084 | | /* |
2085 | | * The source transaction aborted or crashed, so we just |
2086 | | * ignore its notifications. |
2087 | | */ |
2088 | 0 | } |
2089 | 0 | } |
2090 | | |
2091 | | /* Loop back if we're not at end of page */ |
2092 | 0 | } while (!reachedEndOfPage); |
2093 | | |
2094 | 0 | if (QUEUE_POS_EQUAL(*current, stop)) |
2095 | 0 | reachedStop = true; |
2096 | |
|
2097 | 0 | return reachedStop; |
2098 | 0 | } |
2099 | | |
2100 | | /* |
2101 | | * Advance the shared queue tail variable to the minimum of all the |
2102 | | * per-backend tail pointers. Truncate pg_notify space if possible. |
2103 | | * |
2104 | | * This is (usually) called during CommitTransaction(), so it's important for |
2105 | | * it to have very low probability of failure. |
2106 | | */ |
2107 | | static void |
2108 | | asyncQueueAdvanceTail(void) |
2109 | 0 | { |
2110 | 0 | QueuePosition min; |
2111 | 0 | int64 oldtailpage; |
2112 | 0 | int64 newtailpage; |
2113 | 0 | int64 boundary; |
2114 | | |
2115 | | /* Restrict task to one backend per cluster; see SimpleLruTruncate(). */ |
2116 | 0 | LWLockAcquire(NotifyQueueTailLock, LW_EXCLUSIVE); |
2117 | | |
2118 | | /* |
2119 | | * Compute the new tail. Pre-v13, it's essential that QUEUE_TAIL be exact |
2120 | | * (ie, exactly match at least one backend's queue position), so it must |
2121 | | * be updated atomically with the actual computation. Since v13, we could |
2122 | | * get away with not doing it like that, but it seems prudent to keep it |
2123 | | * so. |
2124 | | * |
2125 | | * Also, because incoming backends will scan forward from QUEUE_TAIL, that |
2126 | | * must be advanced before we can truncate any data. Thus, QUEUE_TAIL is |
2127 | | * the logical tail, while QUEUE_STOP_PAGE is the physical tail, or oldest |
2128 | | * un-truncated page. When QUEUE_STOP_PAGE != QUEUE_POS_PAGE(QUEUE_TAIL), |
2129 | | * there are pages we can truncate but haven't yet finished doing so. |
2130 | | * |
2131 | | * For concurrency's sake, we don't want to hold NotifyQueueLock while |
2132 | | * performing SimpleLruTruncate. This is OK because no backend will try |
2133 | | * to access the pages we are in the midst of truncating. |
2134 | | */ |
2135 | 0 | LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
2136 | 0 | min = QUEUE_HEAD; |
2137 | 0 | for (ProcNumber i = QUEUE_FIRST_LISTENER; i != INVALID_PROC_NUMBER; i = QUEUE_NEXT_LISTENER(i)) |
2138 | 0 | { |
2139 | 0 | Assert(QUEUE_BACKEND_PID(i) != InvalidPid); |
2140 | 0 | min = QUEUE_POS_MIN(min, QUEUE_BACKEND_POS(i)); |
2141 | 0 | } |
2142 | 0 | QUEUE_TAIL = min; |
2143 | 0 | oldtailpage = QUEUE_STOP_PAGE; |
2144 | 0 | LWLockRelease(NotifyQueueLock); |
2145 | | |
2146 | | /* |
2147 | | * We can truncate something if the global tail advanced across an SLRU |
2148 | | * segment boundary. |
2149 | | * |
2150 | | * XXX it might be better to truncate only once every several segments, to |
2151 | | * reduce the number of directory scans. |
2152 | | */ |
2153 | 0 | newtailpage = QUEUE_POS_PAGE(min); |
2154 | 0 | boundary = newtailpage - (newtailpage % SLRU_PAGES_PER_SEGMENT); |
2155 | 0 | if (asyncQueuePagePrecedes(oldtailpage, boundary)) |
2156 | 0 | { |
2157 | | /* |
2158 | | * SimpleLruTruncate() will ask for SLRU bank locks but will also |
2159 | | * release the lock again. |
2160 | | */ |
2161 | 0 | SimpleLruTruncate(NotifyCtl, newtailpage); |
2162 | |
|
2163 | 0 | LWLockAcquire(NotifyQueueLock, LW_EXCLUSIVE); |
2164 | 0 | QUEUE_STOP_PAGE = newtailpage; |
2165 | 0 | LWLockRelease(NotifyQueueLock); |
2166 | 0 | } |
2167 | |
|
2168 | 0 | LWLockRelease(NotifyQueueTailLock); |
2169 | 0 | } |
2170 | | |
2171 | | /* |
2172 | | * ProcessIncomingNotify |
2173 | | * |
2174 | | * Scan the queue for arriving notifications and report them to the front |
2175 | | * end. The notifications might be from other sessions, or our own; |
2176 | | * there's no need to distinguish here. |
2177 | | * |
2178 | | * If "flush" is true, force any frontend messages out immediately. |
2179 | | * |
2180 | | * NOTE: since we are outside any transaction, we must create our own. |
2181 | | */ |
2182 | | static void |
2183 | | ProcessIncomingNotify(bool flush) |
2184 | 0 | { |
2185 | | /* We *must* reset the flag */ |
2186 | 0 | notifyInterruptPending = false; |
2187 | | |
2188 | | /* Do nothing else if we aren't actively listening */ |
2189 | 0 | if (listenChannels == NIL) |
2190 | 0 | return; |
2191 | | |
2192 | 0 | if (Trace_notify) |
2193 | 0 | elog(DEBUG1, "ProcessIncomingNotify"); |
2194 | | |
2195 | 0 | set_ps_display("notify interrupt"); |
2196 | | |
2197 | | /* |
2198 | | * We must run asyncQueueReadAllNotifications inside a transaction, else |
2199 | | * bad things happen if it gets an error. |
2200 | | */ |
2201 | 0 | StartTransactionCommand(); |
2202 | |
|
2203 | 0 | asyncQueueReadAllNotifications(); |
2204 | |
|
2205 | 0 | CommitTransactionCommand(); |
2206 | | |
2207 | | /* |
2208 | | * If this isn't an end-of-command case, we must flush the notify messages |
2209 | | * to ensure frontend gets them promptly. |
2210 | | */ |
2211 | 0 | if (flush) |
2212 | 0 | pq_flush(); |
2213 | |
|
2214 | 0 | set_ps_display("idle"); |
2215 | |
|
2216 | 0 | if (Trace_notify) |
2217 | 0 | elog(DEBUG1, "ProcessIncomingNotify: done"); |
2218 | 0 | } |
2219 | | |
2220 | | /* |
2221 | | * Send NOTIFY message to my front end. |
2222 | | */ |
2223 | | void |
2224 | | NotifyMyFrontEnd(const char *channel, const char *payload, int32 srcPid) |
2225 | 0 | { |
2226 | 0 | if (whereToSendOutput == DestRemote) |
2227 | 0 | { |
2228 | 0 | StringInfoData buf; |
2229 | |
|
2230 | 0 | pq_beginmessage(&buf, PqMsg_NotificationResponse); |
2231 | 0 | pq_sendint32(&buf, srcPid); |
2232 | 0 | pq_sendstring(&buf, channel); |
2233 | 0 | pq_sendstring(&buf, payload); |
2234 | 0 | pq_endmessage(&buf); |
2235 | | |
2236 | | /* |
2237 | | * NOTE: we do not do pq_flush() here. Some level of caller will |
2238 | | * handle it later, allowing this message to be combined into a packet |
2239 | | * with other ones. |
2240 | | */ |
2241 | 0 | } |
2242 | 0 | else |
2243 | 0 | elog(INFO, "NOTIFY for \"%s\" payload \"%s\"", channel, payload); |
2244 | 0 | } |
2245 | | |
2246 | | /* Does pendingNotifies include a match for the given event? */ |
2247 | | static bool |
2248 | | AsyncExistsPendingNotify(Notification *n) |
2249 | 0 | { |
2250 | 0 | if (pendingNotifies == NULL) |
2251 | 0 | return false; |
2252 | | |
2253 | 0 | if (pendingNotifies->hashtab != NULL) |
2254 | 0 | { |
2255 | | /* Use the hash table to probe for a match */ |
2256 | 0 | if (hash_search(pendingNotifies->hashtab, |
2257 | 0 | &n, |
2258 | 0 | HASH_FIND, |
2259 | 0 | NULL)) |
2260 | 0 | return true; |
2261 | 0 | } |
2262 | 0 | else |
2263 | 0 | { |
2264 | | /* Must scan the event list */ |
2265 | 0 | ListCell *l; |
2266 | |
|
2267 | 0 | foreach(l, pendingNotifies->events) |
2268 | 0 | { |
2269 | 0 | Notification *oldn = (Notification *) lfirst(l); |
2270 | |
|
2271 | 0 | if (n->channel_len == oldn->channel_len && |
2272 | 0 | n->payload_len == oldn->payload_len && |
2273 | 0 | memcmp(n->data, oldn->data, |
2274 | 0 | n->channel_len + n->payload_len + 2) == 0) |
2275 | 0 | return true; |
2276 | 0 | } |
2277 | 0 | } |
2278 | | |
2279 | 0 | return false; |
2280 | 0 | } |
2281 | | |
2282 | | /* |
2283 | | * Add a notification event to a pre-existing pendingNotifies list. |
2284 | | * |
2285 | | * Because pendingNotifies->events is already nonempty, this works |
2286 | | * correctly no matter what CurrentMemoryContext is. |
2287 | | */ |
2288 | | static void |
2289 | | AddEventToPendingNotifies(Notification *n) |
2290 | 0 | { |
2291 | 0 | Assert(pendingNotifies->events != NIL); |
2292 | | |
2293 | | /* Create the hash table if it's time to */ |
2294 | 0 | if (list_length(pendingNotifies->events) >= MIN_HASHABLE_NOTIFIES && |
2295 | 0 | pendingNotifies->hashtab == NULL) |
2296 | 0 | { |
2297 | 0 | HASHCTL hash_ctl; |
2298 | 0 | ListCell *l; |
2299 | | |
2300 | | /* Create the hash table */ |
2301 | 0 | hash_ctl.keysize = sizeof(Notification *); |
2302 | 0 | hash_ctl.entrysize = sizeof(struct NotificationHash); |
2303 | 0 | hash_ctl.hash = notification_hash; |
2304 | 0 | hash_ctl.match = notification_match; |
2305 | 0 | hash_ctl.hcxt = CurTransactionContext; |
2306 | 0 | pendingNotifies->hashtab = |
2307 | 0 | hash_create("Pending Notifies", |
2308 | 0 | 256L, |
2309 | 0 | &hash_ctl, |
2310 | 0 | HASH_ELEM | HASH_FUNCTION | HASH_COMPARE | HASH_CONTEXT); |
2311 | | |
2312 | | /* Insert all the already-existing events */ |
2313 | 0 | foreach(l, pendingNotifies->events) |
2314 | 0 | { |
2315 | 0 | Notification *oldn = (Notification *) lfirst(l); |
2316 | 0 | bool found; |
2317 | |
|
2318 | 0 | (void) hash_search(pendingNotifies->hashtab, |
2319 | 0 | &oldn, |
2320 | 0 | HASH_ENTER, |
2321 | 0 | &found); |
2322 | 0 | Assert(!found); |
2323 | 0 | } |
2324 | 0 | } |
2325 | | |
2326 | | /* Add new event to the list, in order */ |
2327 | 0 | pendingNotifies->events = lappend(pendingNotifies->events, n); |
2328 | | |
2329 | | /* Add event to the hash table if needed */ |
2330 | 0 | if (pendingNotifies->hashtab != NULL) |
2331 | 0 | { |
2332 | 0 | bool found; |
2333 | |
|
2334 | 0 | (void) hash_search(pendingNotifies->hashtab, |
2335 | 0 | &n, |
2336 | 0 | HASH_ENTER, |
2337 | 0 | &found); |
2338 | 0 | Assert(!found); |
2339 | 0 | } |
2340 | 0 | } |
2341 | | |
2342 | | /* |
2343 | | * notification_hash: hash function for notification hash table |
2344 | | * |
2345 | | * The hash "keys" are pointers to Notification structs. |
2346 | | */ |
2347 | | static uint32 |
2348 | | notification_hash(const void *key, Size keysize) |
2349 | 0 | { |
2350 | 0 | const Notification *k = *(const Notification *const *) key; |
2351 | |
|
2352 | 0 | Assert(keysize == sizeof(Notification *)); |
2353 | | /* We don't bother to include the payload's trailing null in the hash */ |
2354 | 0 | return DatumGetUInt32(hash_any((const unsigned char *) k->data, |
2355 | 0 | k->channel_len + k->payload_len + 1)); |
2356 | 0 | } |
2357 | | |
2358 | | /* |
2359 | | * notification_match: match function to use with notification_hash |
2360 | | */ |
2361 | | static int |
2362 | | notification_match(const void *key1, const void *key2, Size keysize) |
2363 | 0 | { |
2364 | 0 | const Notification *k1 = *(const Notification *const *) key1; |
2365 | 0 | const Notification *k2 = *(const Notification *const *) key2; |
2366 | |
|
2367 | 0 | Assert(keysize == sizeof(Notification *)); |
2368 | 0 | if (k1->channel_len == k2->channel_len && |
2369 | 0 | k1->payload_len == k2->payload_len && |
2370 | 0 | memcmp(k1->data, k2->data, |
2371 | 0 | k1->channel_len + k1->payload_len + 2) == 0) |
2372 | 0 | return 0; /* equal */ |
2373 | 0 | return 1; /* not equal */ |
2374 | 0 | } |
2375 | | |
2376 | | /* Clear the pendingActions and pendingNotifies lists. */ |
2377 | | static void |
2378 | | ClearPendingActionsAndNotifies(void) |
2379 | 0 | { |
2380 | | /* |
2381 | | * Everything's allocated in either TopTransactionContext or the context |
2382 | | * for the subtransaction to which it corresponds. So, there's nothing to |
2383 | | * do here except reset the pointers; the space will be reclaimed when the |
2384 | | * contexts are deleted. |
2385 | | */ |
2386 | 0 | pendingActions = NULL; |
2387 | 0 | pendingNotifies = NULL; |
2388 | 0 | } |
2389 | | |
2390 | | /* |
2391 | | * GUC check_hook for notify_buffers |
2392 | | */ |
2393 | | bool |
2394 | | check_notify_buffers(int *newval, void **extra, GucSource source) |
2395 | 0 | { |
2396 | 0 | return check_slru_buffers("notify_buffers", newval); |
2397 | 0 | } |