/src/postgres/src/backend/replication/logical/applyparallelworker.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * applyparallelworker.c |
3 | | * Support routines for applying xact by parallel apply worker |
4 | | * |
5 | | * Copyright (c) 2023-2025, PostgreSQL Global Development Group |
6 | | * |
7 | | * IDENTIFICATION |
8 | | * src/backend/replication/logical/applyparallelworker.c |
9 | | * |
10 | | * This file contains the code to launch, set up, and teardown a parallel apply |
11 | | * worker which receives the changes from the leader worker and invokes routines |
12 | | * to apply those on the subscriber database. Additionally, this file contains |
13 | | * routines that are intended to support setting up, using, and tearing down a |
14 | | * ParallelApplyWorkerInfo which is required so the leader worker and parallel |
15 | | * apply workers can communicate with each other. |
16 | | * |
17 | | * The parallel apply workers are assigned (if available) as soon as xact's |
18 | | * first stream is received for subscriptions that have set their 'streaming' |
19 | | * option as parallel. The leader apply worker will send changes to this new |
20 | | * worker via shared memory. We keep this worker assigned till the transaction |
21 | | * commit is received and also wait for the worker to finish at commit. This |
22 | | * preserves commit ordering and avoid file I/O in most cases, although we |
23 | | * still need to spill to a file if there is no worker available. See comments |
24 | | * atop logical/worker to know more about streamed xacts whose changes are |
25 | | * spilled to disk. It is important to maintain commit order to avoid failures |
26 | | * due to: (a) transaction dependencies - say if we insert a row in the first |
27 | | * transaction and update it in the second transaction on publisher then |
28 | | * allowing the subscriber to apply both in parallel can lead to failure in the |
29 | | * update; (b) deadlocks - allowing transactions that update the same set of |
30 | | * rows/tables in the opposite order to be applied in parallel can lead to |
31 | | * deadlocks. |
32 | | * |
33 | | * A worker pool is used to avoid restarting workers for each streaming |
34 | | * transaction. We maintain each worker's information (ParallelApplyWorkerInfo) |
35 | | * in the ParallelApplyWorkerPool. After successfully launching a new worker, |
36 | | * its information is added to the ParallelApplyWorkerPool. Once the worker |
37 | | * finishes applying the transaction, it is marked as available for re-use. |
38 | | * Now, before starting a new worker to apply the streaming transaction, we |
39 | | * check the list for any available worker. Note that we retain a maximum of |
40 | | * half the max_parallel_apply_workers_per_subscription workers in the pool and |
41 | | * after that, we simply exit the worker after applying the transaction. |
42 | | * |
43 | | * XXX This worker pool threshold is arbitrary and we can provide a GUC |
44 | | * variable for this in the future if required. |
45 | | * |
46 | | * The leader apply worker will create a separate dynamic shared memory segment |
47 | | * when each parallel apply worker starts. The reason for this design is that |
48 | | * we cannot predict how many workers will be needed. It may be possible to |
49 | | * allocate enough shared memory in one segment based on the maximum number of |
50 | | * parallel apply workers (max_parallel_apply_workers_per_subscription), but |
51 | | * this would waste memory if no process is actually started. |
52 | | * |
53 | | * The dynamic shared memory segment contains: (a) a shm_mq that is used to |
54 | | * send changes in the transaction from leader apply worker to parallel apply |
55 | | * worker; (b) another shm_mq that is used to send errors (and other messages |
56 | | * reported via elog/ereport) from the parallel apply worker to leader apply |
57 | | * worker; (c) necessary information to be shared among parallel apply workers |
58 | | * and the leader apply worker (i.e. members of ParallelApplyWorkerShared). |
59 | | * |
60 | | * Locking Considerations |
61 | | * ---------------------- |
62 | | * We have a risk of deadlock due to concurrently applying the transactions in |
63 | | * parallel mode that were independent on the publisher side but became |
64 | | * dependent on the subscriber side due to the different database structures |
65 | | * (like schema of subscription tables, constraints, etc.) on each side. This |
66 | | * can happen even without parallel mode when there are concurrent operations |
67 | | * on the subscriber. In order to detect the deadlocks among leader (LA) and |
68 | | * parallel apply (PA) workers, we used lmgr locks when the PA waits for the |
69 | | * next stream (set of changes) and LA waits for PA to finish the transaction. |
70 | | * An alternative approach could be to not allow parallelism when the schema of |
71 | | * tables is different between the publisher and subscriber but that would be |
72 | | * too restrictive and would require the publisher to send much more |
73 | | * information than it is currently sending. |
74 | | * |
75 | | * Consider a case where the subscribed table does not have a unique key on the |
76 | | * publisher and has a unique key on the subscriber. The deadlock can happen in |
77 | | * the following ways: |
78 | | * |
79 | | * 1) Deadlock between the leader apply worker and a parallel apply worker |
80 | | * |
81 | | * Consider that the parallel apply worker (PA) is executing TX-1 and the |
82 | | * leader apply worker (LA) is executing TX-2 concurrently on the subscriber. |
83 | | * Now, LA is waiting for PA because of the unique key constraint of the |
84 | | * subscribed table while PA is waiting for LA to send the next stream of |
85 | | * changes or transaction finish command message. |
86 | | * |
87 | | * In order for lmgr to detect this, we have LA acquire a session lock on the |
88 | | * remote transaction (by pa_lock_stream()) and have PA wait on the lock before |
89 | | * trying to receive the next stream of changes. Specifically, LA will acquire |
90 | | * the lock in AccessExclusive mode before sending the STREAM_STOP and will |
91 | | * release it if already acquired after sending the STREAM_START, STREAM_ABORT |
92 | | * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will |
93 | | * acquire the lock in AccessShare mode after processing STREAM_STOP and |
94 | | * STREAM_ABORT (for subtransaction) and then release the lock immediately |
95 | | * after acquiring it. |
96 | | * |
97 | | * The lock graph for the above example will look as follows: |
98 | | * LA (waiting to acquire the lock on the unique index) -> PA (waiting to |
99 | | * acquire the stream lock) -> LA |
100 | | * |
101 | | * This way, when PA is waiting for LA for the next stream of changes, we can |
102 | | * have a wait-edge from PA to LA in lmgr, which will make us detect the |
103 | | * deadlock between LA and PA. |
104 | | * |
105 | | * 2) Deadlock between the leader apply worker and parallel apply workers |
106 | | * |
107 | | * This scenario is similar to the first case but TX-1 and TX-2 are executed by |
108 | | * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario, |
109 | | * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting |
110 | | * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its |
111 | | * transaction in order to preserve the commit order. There is a deadlock among |
112 | | * the three processes. |
113 | | * |
114 | | * In order for lmgr to detect this, we have PA acquire a session lock (this is |
115 | | * a different lock than referred in the previous case, see |
116 | | * pa_lock_transaction()) on the transaction being applied and have LA wait on |
117 | | * the lock before proceeding in the transaction finish commands. Specifically, |
118 | | * PA will acquire this lock in AccessExclusive mode before executing the first |
119 | | * message of the transaction and release it at the xact end. LA will acquire |
120 | | * this lock in AccessShare mode at transaction finish commands and release it |
121 | | * immediately. |
122 | | * |
123 | | * The lock graph for the above example will look as follows: |
124 | | * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the |
125 | | * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream |
126 | | * lock) -> LA |
127 | | * |
128 | | * This way when LA is waiting to finish the transaction end command to preserve |
129 | | * the commit order, we will be able to detect deadlock, if any. |
130 | | * |
131 | | * One might think we can use XactLockTableWait(), but XactLockTableWait() |
132 | | * considers PREPARED TRANSACTION as still in progress which means the lock |
133 | | * won't be released even after the parallel apply worker has prepared the |
134 | | * transaction. |
135 | | * |
136 | | * 3) Deadlock when the shm_mq buffer is full |
137 | | * |
138 | | * In the previous scenario (ie. PA-1 and PA-2 are executing transactions |
139 | | * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to |
140 | | * wait to send messages, and this wait doesn't appear in lmgr. |
141 | | * |
142 | | * To avoid this wait, we use a non-blocking write and wait with a timeout. If |
143 | | * the timeout is exceeded, the LA will serialize all the pending messages to |
144 | | * a file and indicate PA-2 that it needs to read that file for the remaining |
145 | | * messages. Then LA will start waiting for commit as in the previous case |
146 | | * which will detect deadlock if any. See pa_send_data() and |
147 | | * enum TransApplyAction. |
148 | | * |
149 | | * Lock types |
150 | | * ---------- |
151 | | * Both the stream lock and the transaction lock mentioned above are |
152 | | * session-level locks because both locks could be acquired outside the |
153 | | * transaction, and the stream lock in the leader needs to persist across |
154 | | * transaction boundaries i.e. until the end of the streaming transaction. |
155 | | *------------------------------------------------------------------------- |
156 | | */ |
157 | | |
158 | | #include "postgres.h" |
159 | | |
160 | | #include "libpq/pqformat.h" |
161 | | #include "libpq/pqmq.h" |
162 | | #include "pgstat.h" |
163 | | #include "postmaster/interrupt.h" |
164 | | #include "replication/logicallauncher.h" |
165 | | #include "replication/logicalworker.h" |
166 | | #include "replication/origin.h" |
167 | | #include "replication/worker_internal.h" |
168 | | #include "storage/ipc.h" |
169 | | #include "storage/lmgr.h" |
170 | | #include "tcop/tcopprot.h" |
171 | | #include "utils/inval.h" |
172 | | #include "utils/memutils.h" |
173 | | #include "utils/syscache.h" |
174 | | |
175 | 0 | #define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067 |
176 | | |
177 | | /* |
178 | | * DSM keys for parallel apply worker. Unlike other parallel execution code, |
179 | | * since we don't need to worry about DSM keys conflicting with plan_node_id we |
180 | | * can use small integers. |
181 | | */ |
182 | 0 | #define PARALLEL_APPLY_KEY_SHARED 1 |
183 | 0 | #define PARALLEL_APPLY_KEY_MQ 2 |
184 | 0 | #define PARALLEL_APPLY_KEY_ERROR_QUEUE 3 |
185 | | |
186 | | /* Queue size of DSM, 16 MB for now. */ |
187 | 0 | #define DSM_QUEUE_SIZE (16 * 1024 * 1024) |
188 | | |
189 | | /* |
190 | | * Error queue size of DSM. It is desirable to make it large enough that a |
191 | | * typical ErrorResponse can be sent without blocking. That way, a worker that |
192 | | * errors out can write the whole message into the queue and terminate without |
193 | | * waiting for the user backend. |
194 | | */ |
195 | 0 | #define DSM_ERROR_QUEUE_SIZE (16 * 1024) |
196 | | |
197 | | /* |
198 | | * There are three fields in each message received by the parallel apply |
199 | | * worker: start_lsn, end_lsn and send_time. Because we have updated these |
200 | | * statistics in the leader apply worker, we can ignore these fields in the |
201 | | * parallel apply worker (see function LogicalRepApplyLoop). |
202 | | */ |
203 | 0 | #define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz)) |
204 | | |
205 | | /* |
206 | | * The type of session-level lock on a transaction being applied on a logical |
207 | | * replication subscriber. |
208 | | */ |
209 | 0 | #define PARALLEL_APPLY_LOCK_STREAM 0 |
210 | 0 | #define PARALLEL_APPLY_LOCK_XACT 1 |
211 | | |
212 | | /* |
213 | | * Hash table entry to map xid to the parallel apply worker state. |
214 | | */ |
215 | | typedef struct ParallelApplyWorkerEntry |
216 | | { |
217 | | TransactionId xid; /* Hash key -- must be first */ |
218 | | ParallelApplyWorkerInfo *winfo; |
219 | | } ParallelApplyWorkerEntry; |
220 | | |
221 | | /* |
222 | | * A hash table used to cache the state of streaming transactions being applied |
223 | | * by the parallel apply workers. |
224 | | */ |
225 | | static HTAB *ParallelApplyTxnHash = NULL; |
226 | | |
227 | | /* |
228 | | * A list (pool) of active parallel apply workers. The information for |
229 | | * the new worker is added to the list after successfully launching it. The |
230 | | * list entry is removed if there are already enough workers in the worker |
231 | | * pool at the end of the transaction. For more information about the worker |
232 | | * pool, see comments atop this file. |
233 | | */ |
234 | | static List *ParallelApplyWorkerPool = NIL; |
235 | | |
236 | | /* |
237 | | * Information shared between leader apply worker and parallel apply worker. |
238 | | */ |
239 | | ParallelApplyWorkerShared *MyParallelShared = NULL; |
240 | | |
241 | | /* |
242 | | * Is there a message sent by a parallel apply worker that the leader apply |
243 | | * worker needs to receive? |
244 | | */ |
245 | | volatile sig_atomic_t ParallelApplyMessagePending = false; |
246 | | |
247 | | /* |
248 | | * Cache the parallel apply worker information required for applying the |
249 | | * current streaming transaction. It is used to save the cost of searching the |
250 | | * hash table when applying the changes between STREAM_START and STREAM_STOP. |
251 | | */ |
252 | | static ParallelApplyWorkerInfo *stream_apply_worker = NULL; |
253 | | |
254 | | /* A list to maintain subtransactions, if any. */ |
255 | | static List *subxactlist = NIL; |
256 | | |
257 | | static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo); |
258 | | static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared); |
259 | | static PartialFileSetState pa_get_fileset_state(void); |
260 | | |
261 | | /* |
262 | | * Returns true if it is OK to start a parallel apply worker, false otherwise. |
263 | | */ |
264 | | static bool |
265 | | pa_can_start(void) |
266 | 0 | { |
267 | | /* Only leader apply workers can start parallel apply workers. */ |
268 | 0 | if (!am_leader_apply_worker()) |
269 | 0 | return false; |
270 | | |
271 | | /* |
272 | | * It is good to check for any change in the subscription parameter to |
273 | | * avoid the case where for a very long time the change doesn't get |
274 | | * reflected. This can happen when there is a constant flow of streaming |
275 | | * transactions that are handled by parallel apply workers. |
276 | | * |
277 | | * It is better to do it before the below checks so that the latest values |
278 | | * of subscription can be used for the checks. |
279 | | */ |
280 | 0 | maybe_reread_subscription(); |
281 | | |
282 | | /* |
283 | | * Don't start a new parallel apply worker if the subscription is not |
284 | | * using parallel streaming mode, or if the publisher does not support |
285 | | * parallel apply. |
286 | | */ |
287 | 0 | if (!MyLogicalRepWorker->parallel_apply) |
288 | 0 | return false; |
289 | | |
290 | | /* |
291 | | * Don't start a new parallel worker if user has set skiplsn as it's |
292 | | * possible that they want to skip the streaming transaction. For |
293 | | * streaming transactions, we need to serialize the transaction to a file |
294 | | * so that we can get the last LSN of the transaction to judge whether to |
295 | | * skip before starting to apply the change. |
296 | | * |
297 | | * One might think that we could allow parallelism if the first lsn of the |
298 | | * transaction is greater than skiplsn, but we don't send it with the |
299 | | * STREAM START message, and it doesn't seem worth sending the extra eight |
300 | | * bytes with the STREAM START to enable parallelism for this case. |
301 | | */ |
302 | 0 | if (!XLogRecPtrIsInvalid(MySubscription->skiplsn)) |
303 | 0 | return false; |
304 | | |
305 | | /* |
306 | | * For streaming transactions that are being applied using a parallel |
307 | | * apply worker, we cannot decide whether to apply the change for a |
308 | | * relation that is not in the READY state (see |
309 | | * should_apply_changes_for_rel) as we won't know remote_final_lsn by that |
310 | | * time. So, we don't start the new parallel apply worker in this case. |
311 | | */ |
312 | 0 | if (!AllTablesyncsReady()) |
313 | 0 | return false; |
314 | | |
315 | 0 | return true; |
316 | 0 | } |
317 | | |
318 | | /* |
319 | | * Set up a dynamic shared memory segment. |
320 | | * |
321 | | * We set up a control region that contains a fixed-size worker info |
322 | | * (ParallelApplyWorkerShared), a message queue, and an error queue. |
323 | | * |
324 | | * Returns true on success, false on failure. |
325 | | */ |
326 | | static bool |
327 | | pa_setup_dsm(ParallelApplyWorkerInfo *winfo) |
328 | 0 | { |
329 | 0 | shm_toc_estimator e; |
330 | 0 | Size segsize; |
331 | 0 | dsm_segment *seg; |
332 | 0 | shm_toc *toc; |
333 | 0 | ParallelApplyWorkerShared *shared; |
334 | 0 | shm_mq *mq; |
335 | 0 | Size queue_size = DSM_QUEUE_SIZE; |
336 | 0 | Size error_queue_size = DSM_ERROR_QUEUE_SIZE; |
337 | | |
338 | | /* |
339 | | * Estimate how much shared memory we need. |
340 | | * |
341 | | * Because the TOC machinery may choose to insert padding of oddly-sized |
342 | | * requests, we must estimate each chunk separately. |
343 | | * |
344 | | * We need one key to register the location of the header, and two other |
345 | | * keys to track the locations of the message queue and the error message |
346 | | * queue. |
347 | | */ |
348 | 0 | shm_toc_initialize_estimator(&e); |
349 | 0 | shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared)); |
350 | 0 | shm_toc_estimate_chunk(&e, queue_size); |
351 | 0 | shm_toc_estimate_chunk(&e, error_queue_size); |
352 | |
|
353 | 0 | shm_toc_estimate_keys(&e, 3); |
354 | 0 | segsize = shm_toc_estimate(&e); |
355 | | |
356 | | /* Create the shared memory segment and establish a table of contents. */ |
357 | 0 | seg = dsm_create(shm_toc_estimate(&e), 0); |
358 | 0 | if (!seg) |
359 | 0 | return false; |
360 | | |
361 | 0 | toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg), |
362 | 0 | segsize); |
363 | | |
364 | | /* Set up the header region. */ |
365 | 0 | shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared)); |
366 | 0 | SpinLockInit(&shared->mutex); |
367 | |
|
368 | 0 | shared->xact_state = PARALLEL_TRANS_UNKNOWN; |
369 | 0 | pg_atomic_init_u32(&(shared->pending_stream_count), 0); |
370 | 0 | shared->last_commit_end = InvalidXLogRecPtr; |
371 | 0 | shared->fileset_state = FS_EMPTY; |
372 | |
|
373 | 0 | shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared); |
374 | | |
375 | | /* Set up message queue for the worker. */ |
376 | 0 | mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size); |
377 | 0 | shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq); |
378 | 0 | shm_mq_set_sender(mq, MyProc); |
379 | | |
380 | | /* Attach the queue. */ |
381 | 0 | winfo->mq_handle = shm_mq_attach(mq, seg, NULL); |
382 | | |
383 | | /* Set up error queue for the worker. */ |
384 | 0 | mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size), |
385 | 0 | error_queue_size); |
386 | 0 | shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq); |
387 | 0 | shm_mq_set_receiver(mq, MyProc); |
388 | | |
389 | | /* Attach the queue. */ |
390 | 0 | winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL); |
391 | | |
392 | | /* Return results to caller. */ |
393 | 0 | winfo->dsm_seg = seg; |
394 | 0 | winfo->shared = shared; |
395 | |
|
396 | 0 | return true; |
397 | 0 | } |
398 | | |
399 | | /* |
400 | | * Try to get a parallel apply worker from the pool. If none is available then |
401 | | * start a new one. |
402 | | */ |
403 | | static ParallelApplyWorkerInfo * |
404 | | pa_launch_parallel_worker(void) |
405 | 0 | { |
406 | 0 | MemoryContext oldcontext; |
407 | 0 | bool launched; |
408 | 0 | ParallelApplyWorkerInfo *winfo; |
409 | 0 | ListCell *lc; |
410 | | |
411 | | /* Try to get an available parallel apply worker from the worker pool. */ |
412 | 0 | foreach(lc, ParallelApplyWorkerPool) |
413 | 0 | { |
414 | 0 | winfo = (ParallelApplyWorkerInfo *) lfirst(lc); |
415 | |
|
416 | 0 | if (!winfo->in_use) |
417 | 0 | return winfo; |
418 | 0 | } |
419 | | |
420 | | /* |
421 | | * Start a new parallel apply worker. |
422 | | * |
423 | | * The worker info can be used for the lifetime of the worker process, so |
424 | | * create it in a permanent context. |
425 | | */ |
426 | 0 | oldcontext = MemoryContextSwitchTo(ApplyContext); |
427 | |
|
428 | 0 | winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo)); |
429 | | |
430 | | /* Setup shared memory. */ |
431 | 0 | if (!pa_setup_dsm(winfo)) |
432 | 0 | { |
433 | 0 | MemoryContextSwitchTo(oldcontext); |
434 | 0 | pfree(winfo); |
435 | 0 | return NULL; |
436 | 0 | } |
437 | | |
438 | 0 | launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY, |
439 | 0 | MyLogicalRepWorker->dbid, |
440 | 0 | MySubscription->oid, |
441 | 0 | MySubscription->name, |
442 | 0 | MyLogicalRepWorker->userid, |
443 | 0 | InvalidOid, |
444 | 0 | dsm_segment_handle(winfo->dsm_seg)); |
445 | |
|
446 | 0 | if (launched) |
447 | 0 | { |
448 | 0 | ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo); |
449 | 0 | } |
450 | 0 | else |
451 | 0 | { |
452 | 0 | pa_free_worker_info(winfo); |
453 | 0 | winfo = NULL; |
454 | 0 | } |
455 | |
|
456 | 0 | MemoryContextSwitchTo(oldcontext); |
457 | |
|
458 | 0 | return winfo; |
459 | 0 | } |
460 | | |
461 | | /* |
462 | | * Allocate a parallel apply worker that will be used for the specified xid. |
463 | | * |
464 | | * We first try to get an available worker from the pool, if any and then try |
465 | | * to launch a new worker. On successful allocation, remember the worker |
466 | | * information in the hash table so that we can get it later for processing the |
467 | | * streaming changes. |
468 | | */ |
469 | | void |
470 | | pa_allocate_worker(TransactionId xid) |
471 | 0 | { |
472 | 0 | bool found; |
473 | 0 | ParallelApplyWorkerInfo *winfo = NULL; |
474 | 0 | ParallelApplyWorkerEntry *entry; |
475 | |
|
476 | 0 | if (!pa_can_start()) |
477 | 0 | return; |
478 | | |
479 | 0 | winfo = pa_launch_parallel_worker(); |
480 | 0 | if (!winfo) |
481 | 0 | return; |
482 | | |
483 | | /* First time through, initialize parallel apply worker state hashtable. */ |
484 | 0 | if (!ParallelApplyTxnHash) |
485 | 0 | { |
486 | 0 | HASHCTL ctl; |
487 | |
|
488 | 0 | MemSet(&ctl, 0, sizeof(ctl)); |
489 | 0 | ctl.keysize = sizeof(TransactionId); |
490 | 0 | ctl.entrysize = sizeof(ParallelApplyWorkerEntry); |
491 | 0 | ctl.hcxt = ApplyContext; |
492 | |
|
493 | 0 | ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash", |
494 | 0 | 16, &ctl, |
495 | 0 | HASH_ELEM | HASH_BLOBS | HASH_CONTEXT); |
496 | 0 | } |
497 | | |
498 | | /* Create an entry for the requested transaction. */ |
499 | 0 | entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found); |
500 | 0 | if (found) |
501 | 0 | elog(ERROR, "hash table corrupted"); |
502 | | |
503 | | /* Update the transaction information in shared memory. */ |
504 | 0 | SpinLockAcquire(&winfo->shared->mutex); |
505 | 0 | winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN; |
506 | 0 | winfo->shared->xid = xid; |
507 | 0 | SpinLockRelease(&winfo->shared->mutex); |
508 | |
|
509 | 0 | winfo->in_use = true; |
510 | 0 | winfo->serialize_changes = false; |
511 | 0 | entry->winfo = winfo; |
512 | 0 | } |
513 | | |
514 | | /* |
515 | | * Find the assigned worker for the given transaction, if any. |
516 | | */ |
517 | | ParallelApplyWorkerInfo * |
518 | | pa_find_worker(TransactionId xid) |
519 | 0 | { |
520 | 0 | bool found; |
521 | 0 | ParallelApplyWorkerEntry *entry; |
522 | |
|
523 | 0 | if (!TransactionIdIsValid(xid)) |
524 | 0 | return NULL; |
525 | | |
526 | 0 | if (!ParallelApplyTxnHash) |
527 | 0 | return NULL; |
528 | | |
529 | | /* Return the cached parallel apply worker if valid. */ |
530 | 0 | if (stream_apply_worker) |
531 | 0 | return stream_apply_worker; |
532 | | |
533 | | /* Find an entry for the requested transaction. */ |
534 | 0 | entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found); |
535 | 0 | if (found) |
536 | 0 | { |
537 | | /* The worker must not have exited. */ |
538 | 0 | Assert(entry->winfo->in_use); |
539 | 0 | return entry->winfo; |
540 | 0 | } |
541 | | |
542 | 0 | return NULL; |
543 | 0 | } |
544 | | |
545 | | /* |
546 | | * Makes the worker available for reuse. |
547 | | * |
548 | | * This removes the parallel apply worker entry from the hash table so that it |
549 | | * can't be used. If there are enough workers in the pool, it stops the worker |
550 | | * and frees the corresponding info. Otherwise it just marks the worker as |
551 | | * available for reuse. |
552 | | * |
553 | | * For more information about the worker pool, see comments atop this file. |
554 | | */ |
555 | | static void |
556 | | pa_free_worker(ParallelApplyWorkerInfo *winfo) |
557 | 0 | { |
558 | 0 | Assert(!am_parallel_apply_worker()); |
559 | 0 | Assert(winfo->in_use); |
560 | 0 | Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED); |
561 | |
|
562 | 0 | if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL)) |
563 | 0 | elog(ERROR, "hash table corrupted"); |
564 | | |
565 | | /* |
566 | | * Stop the worker if there are enough workers in the pool. |
567 | | * |
568 | | * XXX Additionally, we also stop the worker if the leader apply worker |
569 | | * serialize part of the transaction data due to a send timeout. This is |
570 | | * because the message could be partially written to the queue and there |
571 | | * is no way to clean the queue other than resending the message until it |
572 | | * succeeds. Instead of trying to send the data which anyway would have |
573 | | * been serialized and then letting the parallel apply worker deal with |
574 | | * the spurious message, we stop the worker. |
575 | | */ |
576 | 0 | if (winfo->serialize_changes || |
577 | 0 | list_length(ParallelApplyWorkerPool) > |
578 | 0 | (max_parallel_apply_workers_per_subscription / 2)) |
579 | 0 | { |
580 | 0 | logicalrep_pa_worker_stop(winfo); |
581 | 0 | pa_free_worker_info(winfo); |
582 | |
|
583 | 0 | return; |
584 | 0 | } |
585 | | |
586 | 0 | winfo->in_use = false; |
587 | 0 | winfo->serialize_changes = false; |
588 | 0 | } |
589 | | |
590 | | /* |
591 | | * Free the parallel apply worker information and unlink the files with |
592 | | * serialized changes if any. |
593 | | */ |
594 | | static void |
595 | | pa_free_worker_info(ParallelApplyWorkerInfo *winfo) |
596 | 0 | { |
597 | 0 | Assert(winfo); |
598 | |
|
599 | 0 | if (winfo->mq_handle) |
600 | 0 | shm_mq_detach(winfo->mq_handle); |
601 | |
|
602 | 0 | if (winfo->error_mq_handle) |
603 | 0 | shm_mq_detach(winfo->error_mq_handle); |
604 | | |
605 | | /* Unlink the files with serialized changes. */ |
606 | 0 | if (winfo->serialize_changes) |
607 | 0 | stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid); |
608 | |
|
609 | 0 | if (winfo->dsm_seg) |
610 | 0 | dsm_detach(winfo->dsm_seg); |
611 | | |
612 | | /* Remove from the worker pool. */ |
613 | 0 | ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo); |
614 | |
|
615 | 0 | pfree(winfo); |
616 | 0 | } |
617 | | |
618 | | /* |
619 | | * Detach the error queue for all parallel apply workers. |
620 | | */ |
621 | | void |
622 | | pa_detach_all_error_mq(void) |
623 | 0 | { |
624 | 0 | ListCell *lc; |
625 | |
|
626 | 0 | foreach(lc, ParallelApplyWorkerPool) |
627 | 0 | { |
628 | 0 | ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); |
629 | |
|
630 | 0 | if (winfo->error_mq_handle) |
631 | 0 | { |
632 | 0 | shm_mq_detach(winfo->error_mq_handle); |
633 | 0 | winfo->error_mq_handle = NULL; |
634 | 0 | } |
635 | 0 | } |
636 | 0 | } |
637 | | |
638 | | /* |
639 | | * Check if there are any pending spooled messages. |
640 | | */ |
641 | | static bool |
642 | | pa_has_spooled_message_pending() |
643 | 0 | { |
644 | 0 | PartialFileSetState fileset_state; |
645 | |
|
646 | 0 | fileset_state = pa_get_fileset_state(); |
647 | |
|
648 | 0 | return (fileset_state != FS_EMPTY); |
649 | 0 | } |
650 | | |
651 | | /* |
652 | | * Replay the spooled messages once the leader apply worker has finished |
653 | | * serializing changes to the file. |
654 | | * |
655 | | * Returns false if there aren't any pending spooled messages, true otherwise. |
656 | | */ |
657 | | static bool |
658 | | pa_process_spooled_messages_if_required(void) |
659 | 0 | { |
660 | 0 | PartialFileSetState fileset_state; |
661 | |
|
662 | 0 | fileset_state = pa_get_fileset_state(); |
663 | |
|
664 | 0 | if (fileset_state == FS_EMPTY) |
665 | 0 | return false; |
666 | | |
667 | | /* |
668 | | * If the leader apply worker is busy serializing the partial changes then |
669 | | * acquire the stream lock now and wait for the leader worker to finish |
670 | | * serializing the changes. Otherwise, the parallel apply worker won't get |
671 | | * a chance to receive a STREAM_STOP (and acquire the stream lock) until |
672 | | * the leader had serialized all changes which can lead to undetected |
673 | | * deadlock. |
674 | | * |
675 | | * Note that the fileset state can be FS_SERIALIZE_DONE once the leader |
676 | | * worker has finished serializing the changes. |
677 | | */ |
678 | 0 | if (fileset_state == FS_SERIALIZE_IN_PROGRESS) |
679 | 0 | { |
680 | 0 | pa_lock_stream(MyParallelShared->xid, AccessShareLock); |
681 | 0 | pa_unlock_stream(MyParallelShared->xid, AccessShareLock); |
682 | |
|
683 | 0 | fileset_state = pa_get_fileset_state(); |
684 | 0 | } |
685 | | |
686 | | /* |
687 | | * We cannot read the file immediately after the leader has serialized all |
688 | | * changes to the file because there may still be messages in the memory |
689 | | * queue. We will apply all spooled messages the next time we call this |
690 | | * function and that will ensure there are no messages left in the memory |
691 | | * queue. |
692 | | */ |
693 | 0 | if (fileset_state == FS_SERIALIZE_DONE) |
694 | 0 | { |
695 | 0 | pa_set_fileset_state(MyParallelShared, FS_READY); |
696 | 0 | } |
697 | 0 | else if (fileset_state == FS_READY) |
698 | 0 | { |
699 | 0 | apply_spooled_messages(&MyParallelShared->fileset, |
700 | 0 | MyParallelShared->xid, |
701 | 0 | InvalidXLogRecPtr); |
702 | 0 | pa_set_fileset_state(MyParallelShared, FS_EMPTY); |
703 | 0 | } |
704 | |
|
705 | 0 | return true; |
706 | 0 | } |
707 | | |
708 | | /* |
709 | | * Interrupt handler for main loop of parallel apply worker. |
710 | | */ |
711 | | static void |
712 | | ProcessParallelApplyInterrupts(void) |
713 | 0 | { |
714 | 0 | CHECK_FOR_INTERRUPTS(); |
715 | |
|
716 | 0 | if (ShutdownRequestPending) |
717 | 0 | { |
718 | 0 | ereport(LOG, |
719 | 0 | (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished", |
720 | 0 | MySubscription->name))); |
721 | | |
722 | 0 | proc_exit(0); |
723 | 0 | } |
724 | | |
725 | 0 | if (ConfigReloadPending) |
726 | 0 | { |
727 | 0 | ConfigReloadPending = false; |
728 | 0 | ProcessConfigFile(PGC_SIGHUP); |
729 | 0 | } |
730 | 0 | } |
731 | | |
732 | | /* Parallel apply worker main loop. */ |
733 | | static void |
734 | | LogicalParallelApplyLoop(shm_mq_handle *mqh) |
735 | 0 | { |
736 | 0 | shm_mq_result shmq_res; |
737 | 0 | ErrorContextCallback errcallback; |
738 | 0 | MemoryContext oldcxt = CurrentMemoryContext; |
739 | | |
740 | | /* |
741 | | * Init the ApplyMessageContext which we clean up after each replication |
742 | | * protocol message. |
743 | | */ |
744 | 0 | ApplyMessageContext = AllocSetContextCreate(ApplyContext, |
745 | 0 | "ApplyMessageContext", |
746 | 0 | ALLOCSET_DEFAULT_SIZES); |
747 | | |
748 | | /* |
749 | | * Push apply error context callback. Fields will be filled while applying |
750 | | * a change. |
751 | | */ |
752 | 0 | errcallback.callback = apply_error_callback; |
753 | 0 | errcallback.previous = error_context_stack; |
754 | 0 | error_context_stack = &errcallback; |
755 | |
|
756 | 0 | for (;;) |
757 | 0 | { |
758 | 0 | void *data; |
759 | 0 | Size len; |
760 | |
|
761 | 0 | ProcessParallelApplyInterrupts(); |
762 | | |
763 | | /* Ensure we are reading the data into our memory context. */ |
764 | 0 | MemoryContextSwitchTo(ApplyMessageContext); |
765 | |
|
766 | 0 | shmq_res = shm_mq_receive(mqh, &len, &data, true); |
767 | |
|
768 | 0 | if (shmq_res == SHM_MQ_SUCCESS) |
769 | 0 | { |
770 | 0 | StringInfoData s; |
771 | 0 | int c; |
772 | |
|
773 | 0 | if (len == 0) |
774 | 0 | elog(ERROR, "invalid message length"); |
775 | | |
776 | 0 | initReadOnlyStringInfo(&s, data, len); |
777 | | |
778 | | /* |
779 | | * The first byte of messages sent from leader apply worker to |
780 | | * parallel apply workers can only be 'w'. |
781 | | */ |
782 | 0 | c = pq_getmsgbyte(&s); |
783 | 0 | if (c != 'w') |
784 | 0 | elog(ERROR, "unexpected message \"%c\"", c); |
785 | | |
786 | | /* |
787 | | * Ignore statistics fields that have been updated by the leader |
788 | | * apply worker. |
789 | | * |
790 | | * XXX We can avoid sending the statistics fields from the leader |
791 | | * apply worker but for that, it needs to rebuild the entire |
792 | | * message by removing these fields which could be more work than |
793 | | * simply ignoring these fields in the parallel apply worker. |
794 | | */ |
795 | 0 | s.cursor += SIZE_STATS_MESSAGE; |
796 | |
|
797 | 0 | apply_dispatch(&s); |
798 | 0 | } |
799 | 0 | else if (shmq_res == SHM_MQ_WOULD_BLOCK) |
800 | 0 | { |
801 | | /* Replay the changes from the file, if any. */ |
802 | 0 | if (!pa_process_spooled_messages_if_required()) |
803 | 0 | { |
804 | 0 | int rc; |
805 | | |
806 | | /* Wait for more work. */ |
807 | 0 | rc = WaitLatch(MyLatch, |
808 | 0 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
809 | 0 | 1000L, |
810 | 0 | WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN); |
811 | |
|
812 | 0 | if (rc & WL_LATCH_SET) |
813 | 0 | ResetLatch(MyLatch); |
814 | 0 | } |
815 | 0 | } |
816 | 0 | else |
817 | 0 | { |
818 | 0 | Assert(shmq_res == SHM_MQ_DETACHED); |
819 | |
|
820 | 0 | ereport(ERROR, |
821 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
822 | 0 | errmsg("lost connection to the logical replication apply worker"))); |
823 | 0 | } |
824 | | |
825 | 0 | MemoryContextReset(ApplyMessageContext); |
826 | 0 | MemoryContextSwitchTo(oldcxt); |
827 | 0 | } |
828 | | |
829 | | /* Pop the error context stack. */ |
830 | 0 | error_context_stack = errcallback.previous; |
831 | |
|
832 | 0 | MemoryContextSwitchTo(oldcxt); |
833 | 0 | } |
834 | | |
835 | | /* |
836 | | * Make sure the leader apply worker tries to read from our error queue one more |
837 | | * time. This guards against the case where we exit uncleanly without sending |
838 | | * an ErrorResponse, for example because some code calls proc_exit directly. |
839 | | * |
840 | | * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks, |
841 | | * if any. See ParallelWorkerShutdown for details. |
842 | | */ |
843 | | static void |
844 | | pa_shutdown(int code, Datum arg) |
845 | 0 | { |
846 | 0 | SendProcSignal(MyLogicalRepWorker->leader_pid, |
847 | 0 | PROCSIG_PARALLEL_APPLY_MESSAGE, |
848 | 0 | INVALID_PROC_NUMBER); |
849 | |
|
850 | 0 | dsm_detach((dsm_segment *) DatumGetPointer(arg)); |
851 | 0 | } |
852 | | |
853 | | /* |
854 | | * Parallel apply worker entry point. |
855 | | */ |
856 | | void |
857 | | ParallelApplyWorkerMain(Datum main_arg) |
858 | 0 | { |
859 | 0 | ParallelApplyWorkerShared *shared; |
860 | 0 | dsm_handle handle; |
861 | 0 | dsm_segment *seg; |
862 | 0 | shm_toc *toc; |
863 | 0 | shm_mq *mq; |
864 | 0 | shm_mq_handle *mqh; |
865 | 0 | shm_mq_handle *error_mqh; |
866 | 0 | RepOriginId originid; |
867 | 0 | int worker_slot = DatumGetInt32(main_arg); |
868 | 0 | char originname[NAMEDATALEN]; |
869 | |
|
870 | 0 | InitializingApplyWorker = true; |
871 | | |
872 | | /* Setup signal handling. */ |
873 | 0 | pqsignal(SIGHUP, SignalHandlerForConfigReload); |
874 | 0 | pqsignal(SIGINT, SignalHandlerForShutdownRequest); |
875 | 0 | pqsignal(SIGTERM, die); |
876 | 0 | BackgroundWorkerUnblockSignals(); |
877 | | |
878 | | /* |
879 | | * Attach to the dynamic shared memory segment for the parallel apply, and |
880 | | * find its table of contents. |
881 | | * |
882 | | * Like parallel query, we don't need resource owner by this time. See |
883 | | * ParallelWorkerMain. |
884 | | */ |
885 | 0 | memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle)); |
886 | 0 | seg = dsm_attach(handle); |
887 | 0 | if (!seg) |
888 | 0 | ereport(ERROR, |
889 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
890 | 0 | errmsg("could not map dynamic shared memory segment"))); |
891 | | |
892 | 0 | toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg)); |
893 | 0 | if (!toc) |
894 | 0 | ereport(ERROR, |
895 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
896 | 0 | errmsg("invalid magic number in dynamic shared memory segment"))); |
897 | | |
898 | | /* Look up the shared information. */ |
899 | 0 | shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false); |
900 | 0 | MyParallelShared = shared; |
901 | | |
902 | | /* |
903 | | * Attach to the message queue. |
904 | | */ |
905 | 0 | mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false); |
906 | 0 | shm_mq_set_receiver(mq, MyProc); |
907 | 0 | mqh = shm_mq_attach(mq, seg, NULL); |
908 | | |
909 | | /* |
910 | | * Primary initialization is complete. Now, we can attach to our slot. |
911 | | * This is to ensure that the leader apply worker does not write data to |
912 | | * the uninitialized memory queue. |
913 | | */ |
914 | 0 | logicalrep_worker_attach(worker_slot); |
915 | | |
916 | | /* |
917 | | * Register the shutdown callback after we are attached to the worker |
918 | | * slot. This is to ensure that MyLogicalRepWorker remains valid when this |
919 | | * callback is invoked. |
920 | | */ |
921 | 0 | before_shmem_exit(pa_shutdown, PointerGetDatum(seg)); |
922 | |
|
923 | 0 | SpinLockAcquire(&MyParallelShared->mutex); |
924 | 0 | MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation; |
925 | 0 | MyParallelShared->logicalrep_worker_slot_no = worker_slot; |
926 | 0 | SpinLockRelease(&MyParallelShared->mutex); |
927 | | |
928 | | /* |
929 | | * Attach to the error queue. |
930 | | */ |
931 | 0 | mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false); |
932 | 0 | shm_mq_set_sender(mq, MyProc); |
933 | 0 | error_mqh = shm_mq_attach(mq, seg, NULL); |
934 | |
|
935 | 0 | pq_redirect_to_shm_mq(seg, error_mqh); |
936 | 0 | pq_set_parallel_leader(MyLogicalRepWorker->leader_pid, |
937 | 0 | INVALID_PROC_NUMBER); |
938 | |
|
939 | 0 | MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time = |
940 | 0 | MyLogicalRepWorker->reply_time = 0; |
941 | |
|
942 | 0 | InitializeLogRepWorker(); |
943 | |
|
944 | 0 | InitializingApplyWorker = false; |
945 | | |
946 | | /* Setup replication origin tracking. */ |
947 | 0 | StartTransactionCommand(); |
948 | 0 | ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid, |
949 | 0 | originname, sizeof(originname)); |
950 | 0 | originid = replorigin_by_name(originname, false); |
951 | | |
952 | | /* |
953 | | * The parallel apply worker doesn't need to monopolize this replication |
954 | | * origin which was already acquired by its leader process. |
955 | | */ |
956 | 0 | replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid); |
957 | 0 | replorigin_session_origin = originid; |
958 | 0 | CommitTransactionCommand(); |
959 | | |
960 | | /* |
961 | | * Setup callback for syscache so that we know when something changes in |
962 | | * the subscription relation state. |
963 | | */ |
964 | 0 | CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP, |
965 | 0 | invalidate_syncing_table_states, |
966 | 0 | (Datum) 0); |
967 | |
|
968 | 0 | set_apply_error_context_origin(originname); |
969 | |
|
970 | 0 | LogicalParallelApplyLoop(mqh); |
971 | | |
972 | | /* |
973 | | * The parallel apply worker must not get here because the parallel apply |
974 | | * worker will only stop when it receives a SIGTERM or SIGINT from the |
975 | | * leader, or when there is an error. None of these cases will allow the |
976 | | * code to reach here. |
977 | | */ |
978 | 0 | Assert(false); |
979 | 0 | } |
980 | | |
981 | | /* |
982 | | * Handle receipt of an interrupt indicating a parallel apply worker message. |
983 | | * |
984 | | * Note: this is called within a signal handler! All we can do is set a flag |
985 | | * that will cause the next CHECK_FOR_INTERRUPTS() to invoke |
986 | | * ProcessParallelApplyMessages(). |
987 | | */ |
988 | | void |
989 | | HandleParallelApplyMessageInterrupt(void) |
990 | 0 | { |
991 | 0 | InterruptPending = true; |
992 | 0 | ParallelApplyMessagePending = true; |
993 | 0 | SetLatch(MyLatch); |
994 | 0 | } |
995 | | |
996 | | /* |
997 | | * Process a single protocol message received from a single parallel apply |
998 | | * worker. |
999 | | */ |
1000 | | static void |
1001 | | ProcessParallelApplyMessage(StringInfo msg) |
1002 | 0 | { |
1003 | 0 | char msgtype; |
1004 | |
|
1005 | 0 | msgtype = pq_getmsgbyte(msg); |
1006 | |
|
1007 | 0 | switch (msgtype) |
1008 | 0 | { |
1009 | 0 | case 'E': /* ErrorResponse */ |
1010 | 0 | { |
1011 | 0 | ErrorData edata; |
1012 | | |
1013 | | /* Parse ErrorResponse. */ |
1014 | 0 | pq_parse_errornotice(msg, &edata); |
1015 | | |
1016 | | /* |
1017 | | * If desired, add a context line to show that this is a |
1018 | | * message propagated from a parallel apply worker. Otherwise, |
1019 | | * it can sometimes be confusing to understand what actually |
1020 | | * happened. |
1021 | | */ |
1022 | 0 | if (edata.context) |
1023 | 0 | edata.context = psprintf("%s\n%s", edata.context, |
1024 | 0 | _("logical replication parallel apply worker")); |
1025 | 0 | else |
1026 | 0 | edata.context = pstrdup(_("logical replication parallel apply worker")); |
1027 | | |
1028 | | /* |
1029 | | * Context beyond that should use the error context callbacks |
1030 | | * that were in effect in LogicalRepApplyLoop(). |
1031 | | */ |
1032 | 0 | error_context_stack = apply_error_context_stack; |
1033 | | |
1034 | | /* |
1035 | | * The actual error must have been reported by the parallel |
1036 | | * apply worker. |
1037 | | */ |
1038 | 0 | ereport(ERROR, |
1039 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1040 | 0 | errmsg("logical replication parallel apply worker exited due to error"), |
1041 | 0 | errcontext("%s", edata.context))); |
1042 | 0 | } |
1043 | | |
1044 | | /* |
1045 | | * Don't need to do anything about NoticeResponse and |
1046 | | * NotifyResponse as the logical replication worker doesn't need |
1047 | | * to send messages to the client. |
1048 | | */ |
1049 | 0 | case 'N': |
1050 | 0 | case 'A': |
1051 | 0 | break; |
1052 | | |
1053 | 0 | default: |
1054 | 0 | elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)", |
1055 | 0 | msgtype, msg->len); |
1056 | 0 | } |
1057 | 0 | } |
1058 | | |
1059 | | /* |
1060 | | * Handle any queued protocol messages received from parallel apply workers. |
1061 | | */ |
1062 | | void |
1063 | | ProcessParallelApplyMessages(void) |
1064 | 0 | { |
1065 | 0 | ListCell *lc; |
1066 | 0 | MemoryContext oldcontext; |
1067 | |
|
1068 | 0 | static MemoryContext hpam_context = NULL; |
1069 | | |
1070 | | /* |
1071 | | * This is invoked from ProcessInterrupts(), and since some of the |
1072 | | * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential |
1073 | | * for recursive calls if more signals are received while this runs. It's |
1074 | | * unclear that recursive entry would be safe, and it doesn't seem useful |
1075 | | * even if it is safe, so let's block interrupts until done. |
1076 | | */ |
1077 | 0 | HOLD_INTERRUPTS(); |
1078 | | |
1079 | | /* |
1080 | | * Moreover, CurrentMemoryContext might be pointing almost anywhere. We |
1081 | | * don't want to risk leaking data into long-lived contexts, so let's do |
1082 | | * our work here in a private context that we can reset on each use. |
1083 | | */ |
1084 | 0 | if (!hpam_context) /* first time through? */ |
1085 | 0 | hpam_context = AllocSetContextCreate(TopMemoryContext, |
1086 | 0 | "ProcessParallelApplyMessages", |
1087 | 0 | ALLOCSET_DEFAULT_SIZES); |
1088 | 0 | else |
1089 | 0 | MemoryContextReset(hpam_context); |
1090 | |
|
1091 | 0 | oldcontext = MemoryContextSwitchTo(hpam_context); |
1092 | |
|
1093 | 0 | ParallelApplyMessagePending = false; |
1094 | |
|
1095 | 0 | foreach(lc, ParallelApplyWorkerPool) |
1096 | 0 | { |
1097 | 0 | shm_mq_result res; |
1098 | 0 | Size nbytes; |
1099 | 0 | void *data; |
1100 | 0 | ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc); |
1101 | | |
1102 | | /* |
1103 | | * The leader will detach from the error queue and set it to NULL |
1104 | | * before preparing to stop all parallel apply workers, so we don't |
1105 | | * need to handle error messages anymore. See |
1106 | | * logicalrep_worker_detach. |
1107 | | */ |
1108 | 0 | if (!winfo->error_mq_handle) |
1109 | 0 | continue; |
1110 | | |
1111 | 0 | res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true); |
1112 | |
|
1113 | 0 | if (res == SHM_MQ_WOULD_BLOCK) |
1114 | 0 | continue; |
1115 | 0 | else if (res == SHM_MQ_SUCCESS) |
1116 | 0 | { |
1117 | 0 | StringInfoData msg; |
1118 | |
|
1119 | 0 | initStringInfo(&msg); |
1120 | 0 | appendBinaryStringInfo(&msg, data, nbytes); |
1121 | 0 | ProcessParallelApplyMessage(&msg); |
1122 | 0 | pfree(msg.data); |
1123 | 0 | } |
1124 | 0 | else |
1125 | 0 | ereport(ERROR, |
1126 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1127 | 0 | errmsg("lost connection to the logical replication parallel apply worker"))); |
1128 | 0 | } |
1129 | | |
1130 | 0 | MemoryContextSwitchTo(oldcontext); |
1131 | | |
1132 | | /* Might as well clear the context on our way out */ |
1133 | 0 | MemoryContextReset(hpam_context); |
1134 | |
|
1135 | 0 | RESUME_INTERRUPTS(); |
1136 | 0 | } |
1137 | | |
1138 | | /* |
1139 | | * Send the data to the specified parallel apply worker via shared-memory |
1140 | | * queue. |
1141 | | * |
1142 | | * Returns false if the attempt to send data via shared memory times out, true |
1143 | | * otherwise. |
1144 | | */ |
1145 | | bool |
1146 | | pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data) |
1147 | 0 | { |
1148 | 0 | int rc; |
1149 | 0 | shm_mq_result result; |
1150 | 0 | TimestampTz startTime = 0; |
1151 | |
|
1152 | 0 | Assert(!IsTransactionState()); |
1153 | 0 | Assert(!winfo->serialize_changes); |
1154 | | |
1155 | | /* |
1156 | | * We don't try to send data to parallel worker for 'immediate' mode. This |
1157 | | * is primarily used for testing purposes. |
1158 | | */ |
1159 | 0 | if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE)) |
1160 | 0 | return false; |
1161 | | |
1162 | | /* |
1163 | | * This timeout is a bit arbitrary but testing revealed that it is sufficient |
1164 | | * to send the message unless the parallel apply worker is waiting on some |
1165 | | * lock or there is a serious resource crunch. See the comments atop this file |
1166 | | * to know why we are using a non-blocking way to send the message. |
1167 | | */ |
1168 | 0 | #define SHM_SEND_RETRY_INTERVAL_MS 1000 |
1169 | 0 | #define SHM_SEND_TIMEOUT_MS (10000 - SHM_SEND_RETRY_INTERVAL_MS) |
1170 | | |
1171 | 0 | for (;;) |
1172 | 0 | { |
1173 | 0 | result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true); |
1174 | |
|
1175 | 0 | if (result == SHM_MQ_SUCCESS) |
1176 | 0 | return true; |
1177 | 0 | else if (result == SHM_MQ_DETACHED) |
1178 | 0 | ereport(ERROR, |
1179 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1180 | 0 | errmsg("could not send data to shared-memory queue"))); |
1181 | | |
1182 | 0 | Assert(result == SHM_MQ_WOULD_BLOCK); |
1183 | | |
1184 | | /* Wait before retrying. */ |
1185 | 0 | rc = WaitLatch(MyLatch, |
1186 | 0 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
1187 | 0 | SHM_SEND_RETRY_INTERVAL_MS, |
1188 | 0 | WAIT_EVENT_LOGICAL_APPLY_SEND_DATA); |
1189 | |
|
1190 | 0 | if (rc & WL_LATCH_SET) |
1191 | 0 | { |
1192 | 0 | ResetLatch(MyLatch); |
1193 | 0 | CHECK_FOR_INTERRUPTS(); |
1194 | 0 | } |
1195 | |
|
1196 | 0 | if (startTime == 0) |
1197 | 0 | startTime = GetCurrentTimestamp(); |
1198 | 0 | else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(), |
1199 | 0 | SHM_SEND_TIMEOUT_MS)) |
1200 | 0 | return false; |
1201 | 0 | } |
1202 | 0 | } |
1203 | | |
1204 | | /* |
1205 | | * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means |
1206 | | * that the current data and any subsequent data for this transaction will be |
1207 | | * serialized to a file. This is done to prevent possible deadlocks with |
1208 | | * another parallel apply worker (refer to the comments atop this file). |
1209 | | */ |
1210 | | void |
1211 | | pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo, |
1212 | | bool stream_locked) |
1213 | 0 | { |
1214 | 0 | ereport(LOG, |
1215 | 0 | (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file", |
1216 | 0 | winfo->shared->xid))); |
1217 | | |
1218 | | /* |
1219 | | * The parallel apply worker could be stuck for some reason (say waiting |
1220 | | * on some lock by other backend), so stop trying to send data directly to |
1221 | | * it and start serializing data to the file instead. |
1222 | | */ |
1223 | 0 | winfo->serialize_changes = true; |
1224 | | |
1225 | | /* Initialize the stream fileset. */ |
1226 | 0 | stream_start_internal(winfo->shared->xid, true); |
1227 | | |
1228 | | /* |
1229 | | * Acquires the stream lock if not already to make sure that the parallel |
1230 | | * apply worker will wait for the leader to release the stream lock until |
1231 | | * the end of the transaction. |
1232 | | */ |
1233 | 0 | if (!stream_locked) |
1234 | 0 | pa_lock_stream(winfo->shared->xid, AccessExclusiveLock); |
1235 | |
|
1236 | 0 | pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS); |
1237 | 0 | } |
1238 | | |
1239 | | /* |
1240 | | * Wait until the parallel apply worker's transaction state has reached or |
1241 | | * exceeded the given xact_state. |
1242 | | */ |
1243 | | static void |
1244 | | pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo, |
1245 | | ParallelTransState xact_state) |
1246 | 0 | { |
1247 | 0 | for (;;) |
1248 | 0 | { |
1249 | | /* |
1250 | | * Stop if the transaction state has reached or exceeded the given |
1251 | | * xact_state. |
1252 | | */ |
1253 | 0 | if (pa_get_xact_state(winfo->shared) >= xact_state) |
1254 | 0 | break; |
1255 | | |
1256 | | /* Wait to be signalled. */ |
1257 | 0 | (void) WaitLatch(MyLatch, |
1258 | 0 | WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH, |
1259 | 0 | 10L, |
1260 | 0 | WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE); |
1261 | | |
1262 | | /* Reset the latch so we don't spin. */ |
1263 | 0 | ResetLatch(MyLatch); |
1264 | | |
1265 | | /* An interrupt may have occurred while we were waiting. */ |
1266 | 0 | CHECK_FOR_INTERRUPTS(); |
1267 | 0 | } |
1268 | 0 | } |
1269 | | |
1270 | | /* |
1271 | | * Wait until the parallel apply worker's transaction finishes. |
1272 | | */ |
1273 | | static void |
1274 | | pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo) |
1275 | 0 | { |
1276 | | /* |
1277 | | * Wait until the parallel apply worker set the state to |
1278 | | * PARALLEL_TRANS_STARTED which means it has acquired the transaction |
1279 | | * lock. This is to prevent leader apply worker from acquiring the |
1280 | | * transaction lock earlier than the parallel apply worker. |
1281 | | */ |
1282 | 0 | pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED); |
1283 | | |
1284 | | /* |
1285 | | * Wait for the transaction lock to be released. This is required to |
1286 | | * detect deadlock among leader and parallel apply workers. Refer to the |
1287 | | * comments atop this file. |
1288 | | */ |
1289 | 0 | pa_lock_transaction(winfo->shared->xid, AccessShareLock); |
1290 | 0 | pa_unlock_transaction(winfo->shared->xid, AccessShareLock); |
1291 | | |
1292 | | /* |
1293 | | * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel |
1294 | | * apply worker failed while applying changes causing the lock to be |
1295 | | * released. |
1296 | | */ |
1297 | 0 | if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED) |
1298 | 0 | ereport(ERROR, |
1299 | 0 | (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), |
1300 | 0 | errmsg("lost connection to the logical replication parallel apply worker"))); |
1301 | 0 | } |
1302 | | |
1303 | | /* |
1304 | | * Set the transaction state for a given parallel apply worker. |
1305 | | */ |
1306 | | void |
1307 | | pa_set_xact_state(ParallelApplyWorkerShared *wshared, |
1308 | | ParallelTransState xact_state) |
1309 | 0 | { |
1310 | 0 | SpinLockAcquire(&wshared->mutex); |
1311 | 0 | wshared->xact_state = xact_state; |
1312 | 0 | SpinLockRelease(&wshared->mutex); |
1313 | 0 | } |
1314 | | |
1315 | | /* |
1316 | | * Get the transaction state for a given parallel apply worker. |
1317 | | */ |
1318 | | static ParallelTransState |
1319 | | pa_get_xact_state(ParallelApplyWorkerShared *wshared) |
1320 | 0 | { |
1321 | 0 | ParallelTransState xact_state; |
1322 | |
|
1323 | 0 | SpinLockAcquire(&wshared->mutex); |
1324 | 0 | xact_state = wshared->xact_state; |
1325 | 0 | SpinLockRelease(&wshared->mutex); |
1326 | |
|
1327 | 0 | return xact_state; |
1328 | 0 | } |
1329 | | |
1330 | | /* |
1331 | | * Cache the parallel apply worker information. |
1332 | | */ |
1333 | | void |
1334 | | pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo) |
1335 | 0 | { |
1336 | 0 | stream_apply_worker = winfo; |
1337 | 0 | } |
1338 | | |
1339 | | /* |
1340 | | * Form a unique savepoint name for the streaming transaction. |
1341 | | * |
1342 | | * Note that different subscriptions for publications on different nodes can |
1343 | | * receive same remote xid, so we need to use subscription id along with it. |
1344 | | * |
1345 | | * Returns the name in the supplied buffer. |
1346 | | */ |
1347 | | static void |
1348 | | pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp) |
1349 | 0 | { |
1350 | 0 | snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid); |
1351 | 0 | } |
1352 | | |
1353 | | /* |
1354 | | * Define a savepoint for a subxact in parallel apply worker if needed. |
1355 | | * |
1356 | | * The parallel apply worker can figure out if a new subtransaction was |
1357 | | * started by checking if the new change arrived with a different xid. In that |
1358 | | * case define a named savepoint, so that we are able to rollback to it |
1359 | | * if required. |
1360 | | */ |
1361 | | void |
1362 | | pa_start_subtrans(TransactionId current_xid, TransactionId top_xid) |
1363 | 0 | { |
1364 | 0 | if (current_xid != top_xid && |
1365 | 0 | !list_member_xid(subxactlist, current_xid)) |
1366 | 0 | { |
1367 | 0 | MemoryContext oldctx; |
1368 | 0 | char spname[NAMEDATALEN]; |
1369 | |
|
1370 | 0 | pa_savepoint_name(MySubscription->oid, current_xid, |
1371 | 0 | spname, sizeof(spname)); |
1372 | |
|
1373 | 0 | elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname); |
1374 | | |
1375 | | /* We must be in transaction block to define the SAVEPOINT. */ |
1376 | 0 | if (!IsTransactionBlock()) |
1377 | 0 | { |
1378 | 0 | if (!IsTransactionState()) |
1379 | 0 | StartTransactionCommand(); |
1380 | |
|
1381 | 0 | BeginTransactionBlock(); |
1382 | 0 | CommitTransactionCommand(); |
1383 | 0 | } |
1384 | |
|
1385 | 0 | DefineSavepoint(spname); |
1386 | | |
1387 | | /* |
1388 | | * CommitTransactionCommand is needed to start a subtransaction after |
1389 | | * issuing a SAVEPOINT inside a transaction block (see |
1390 | | * StartSubTransaction()). |
1391 | | */ |
1392 | 0 | CommitTransactionCommand(); |
1393 | |
|
1394 | 0 | oldctx = MemoryContextSwitchTo(TopTransactionContext); |
1395 | 0 | subxactlist = lappend_xid(subxactlist, current_xid); |
1396 | 0 | MemoryContextSwitchTo(oldctx); |
1397 | 0 | } |
1398 | 0 | } |
1399 | | |
1400 | | /* Reset the list that maintains subtransactions. */ |
1401 | | void |
1402 | | pa_reset_subtrans(void) |
1403 | 0 | { |
1404 | | /* |
1405 | | * We don't need to free this explicitly as the allocated memory will be |
1406 | | * freed at the transaction end. |
1407 | | */ |
1408 | 0 | subxactlist = NIL; |
1409 | 0 | } |
1410 | | |
1411 | | /* |
1412 | | * Handle STREAM ABORT message when the transaction was applied in a parallel |
1413 | | * apply worker. |
1414 | | */ |
1415 | | void |
1416 | | pa_stream_abort(LogicalRepStreamAbortData *abort_data) |
1417 | 0 | { |
1418 | 0 | TransactionId xid = abort_data->xid; |
1419 | 0 | TransactionId subxid = abort_data->subxid; |
1420 | | |
1421 | | /* |
1422 | | * Update origin state so we can restart streaming from correct position |
1423 | | * in case of crash. |
1424 | | */ |
1425 | 0 | replorigin_session_origin_lsn = abort_data->abort_lsn; |
1426 | 0 | replorigin_session_origin_timestamp = abort_data->abort_time; |
1427 | | |
1428 | | /* |
1429 | | * If the two XIDs are the same, it's in fact abort of toplevel xact, so |
1430 | | * just free the subxactlist. |
1431 | | */ |
1432 | 0 | if (subxid == xid) |
1433 | 0 | { |
1434 | 0 | pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED); |
1435 | | |
1436 | | /* |
1437 | | * Release the lock as we might be processing an empty streaming |
1438 | | * transaction in which case the lock won't be released during |
1439 | | * transaction rollback. |
1440 | | * |
1441 | | * Note that it's ok to release the transaction lock before aborting |
1442 | | * the transaction because even if the parallel apply worker dies due |
1443 | | * to crash or some other reason, such a transaction would still be |
1444 | | * considered aborted. |
1445 | | */ |
1446 | 0 | pa_unlock_transaction(xid, AccessExclusiveLock); |
1447 | |
|
1448 | 0 | AbortCurrentTransaction(); |
1449 | |
|
1450 | 0 | if (IsTransactionBlock()) |
1451 | 0 | { |
1452 | 0 | EndTransactionBlock(false); |
1453 | 0 | CommitTransactionCommand(); |
1454 | 0 | } |
1455 | |
|
1456 | 0 | pa_reset_subtrans(); |
1457 | |
|
1458 | 0 | pgstat_report_activity(STATE_IDLE, NULL); |
1459 | 0 | } |
1460 | 0 | else |
1461 | 0 | { |
1462 | | /* OK, so it's a subxact. Rollback to the savepoint. */ |
1463 | 0 | int i; |
1464 | 0 | char spname[NAMEDATALEN]; |
1465 | |
|
1466 | 0 | pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname)); |
1467 | |
|
1468 | 0 | elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname); |
1469 | | |
1470 | | /* |
1471 | | * Search the subxactlist, determine the offset tracked for the |
1472 | | * subxact, and truncate the list. |
1473 | | * |
1474 | | * Note that for an empty sub-transaction we won't find the subxid |
1475 | | * here. |
1476 | | */ |
1477 | 0 | for (i = list_length(subxactlist) - 1; i >= 0; i--) |
1478 | 0 | { |
1479 | 0 | TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i)); |
1480 | |
|
1481 | 0 | if (xid_tmp == subxid) |
1482 | 0 | { |
1483 | 0 | RollbackToSavepoint(spname); |
1484 | 0 | CommitTransactionCommand(); |
1485 | 0 | subxactlist = list_truncate(subxactlist, i); |
1486 | 0 | break; |
1487 | 0 | } |
1488 | 0 | } |
1489 | 0 | } |
1490 | 0 | } |
1491 | | |
1492 | | /* |
1493 | | * Set the fileset state for a particular parallel apply worker. The fileset |
1494 | | * will be set once the leader worker serialized all changes to the file |
1495 | | * so that it can be used by parallel apply worker. |
1496 | | */ |
1497 | | void |
1498 | | pa_set_fileset_state(ParallelApplyWorkerShared *wshared, |
1499 | | PartialFileSetState fileset_state) |
1500 | 0 | { |
1501 | 0 | SpinLockAcquire(&wshared->mutex); |
1502 | 0 | wshared->fileset_state = fileset_state; |
1503 | |
|
1504 | 0 | if (fileset_state == FS_SERIALIZE_DONE) |
1505 | 0 | { |
1506 | 0 | Assert(am_leader_apply_worker()); |
1507 | 0 | Assert(MyLogicalRepWorker->stream_fileset); |
1508 | 0 | wshared->fileset = *MyLogicalRepWorker->stream_fileset; |
1509 | 0 | } |
1510 | |
|
1511 | 0 | SpinLockRelease(&wshared->mutex); |
1512 | 0 | } |
1513 | | |
1514 | | /* |
1515 | | * Get the fileset state for the current parallel apply worker. |
1516 | | */ |
1517 | | static PartialFileSetState |
1518 | | pa_get_fileset_state(void) |
1519 | 0 | { |
1520 | 0 | PartialFileSetState fileset_state; |
1521 | |
|
1522 | 0 | Assert(am_parallel_apply_worker()); |
1523 | |
|
1524 | 0 | SpinLockAcquire(&MyParallelShared->mutex); |
1525 | 0 | fileset_state = MyParallelShared->fileset_state; |
1526 | 0 | SpinLockRelease(&MyParallelShared->mutex); |
1527 | |
|
1528 | 0 | return fileset_state; |
1529 | 0 | } |
1530 | | |
1531 | | /* |
1532 | | * Helper functions to acquire and release a lock for each stream block. |
1533 | | * |
1534 | | * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a |
1535 | | * stream lock. |
1536 | | * |
1537 | | * Refer to the comments atop this file to see how the stream lock is used. |
1538 | | */ |
1539 | | void |
1540 | | pa_lock_stream(TransactionId xid, LOCKMODE lockmode) |
1541 | 0 | { |
1542 | 0 | LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, |
1543 | 0 | PARALLEL_APPLY_LOCK_STREAM, lockmode); |
1544 | 0 | } |
1545 | | |
1546 | | void |
1547 | | pa_unlock_stream(TransactionId xid, LOCKMODE lockmode) |
1548 | 0 | { |
1549 | 0 | UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, |
1550 | 0 | PARALLEL_APPLY_LOCK_STREAM, lockmode); |
1551 | 0 | } |
1552 | | |
1553 | | /* |
1554 | | * Helper functions to acquire and release a lock for each local transaction |
1555 | | * apply. |
1556 | | * |
1557 | | * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a |
1558 | | * transaction lock. |
1559 | | * |
1560 | | * Note that all the callers must pass a remote transaction ID instead of a |
1561 | | * local transaction ID as xid. This is because the local transaction ID will |
1562 | | * only be assigned while applying the first change in the parallel apply but |
1563 | | * it's possible that the first change in the parallel apply worker is blocked |
1564 | | * by a concurrently executing transaction in another parallel apply worker. We |
1565 | | * can only communicate the local transaction id to the leader after applying |
1566 | | * the first change so it won't be able to wait after sending the xact finish |
1567 | | * command using this lock. |
1568 | | * |
1569 | | * Refer to the comments atop this file to see how the transaction lock is |
1570 | | * used. |
1571 | | */ |
1572 | | void |
1573 | | pa_lock_transaction(TransactionId xid, LOCKMODE lockmode) |
1574 | 0 | { |
1575 | 0 | LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, |
1576 | 0 | PARALLEL_APPLY_LOCK_XACT, lockmode); |
1577 | 0 | } |
1578 | | |
1579 | | void |
1580 | | pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode) |
1581 | 0 | { |
1582 | 0 | UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid, |
1583 | 0 | PARALLEL_APPLY_LOCK_XACT, lockmode); |
1584 | 0 | } |
1585 | | |
1586 | | /* |
1587 | | * Decrement the number of pending streaming blocks and wait on the stream lock |
1588 | | * if there is no pending block available. |
1589 | | */ |
1590 | | void |
1591 | | pa_decr_and_wait_stream_block(void) |
1592 | 0 | { |
1593 | 0 | Assert(am_parallel_apply_worker()); |
1594 | | |
1595 | | /* |
1596 | | * It is only possible to not have any pending stream chunks when we are |
1597 | | * applying spooled messages. |
1598 | | */ |
1599 | 0 | if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0) |
1600 | 0 | { |
1601 | 0 | if (pa_has_spooled_message_pending()) |
1602 | 0 | return; |
1603 | | |
1604 | 0 | elog(ERROR, "invalid pending streaming chunk 0"); |
1605 | 0 | } |
1606 | | |
1607 | 0 | if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0) |
1608 | 0 | { |
1609 | 0 | pa_lock_stream(MyParallelShared->xid, AccessShareLock); |
1610 | 0 | pa_unlock_stream(MyParallelShared->xid, AccessShareLock); |
1611 | 0 | } |
1612 | 0 | } |
1613 | | |
1614 | | /* |
1615 | | * Finish processing the streaming transaction in the leader apply worker. |
1616 | | */ |
1617 | | void |
1618 | | pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn) |
1619 | 0 | { |
1620 | 0 | Assert(am_leader_apply_worker()); |
1621 | | |
1622 | | /* |
1623 | | * Unlock the shared object lock so that parallel apply worker can |
1624 | | * continue to receive and apply changes. |
1625 | | */ |
1626 | 0 | pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock); |
1627 | | |
1628 | | /* |
1629 | | * Wait for that worker to finish. This is necessary to maintain commit |
1630 | | * order which avoids failures due to transaction dependencies and |
1631 | | * deadlocks. |
1632 | | */ |
1633 | 0 | pa_wait_for_xact_finish(winfo); |
1634 | |
|
1635 | 0 | if (!XLogRecPtrIsInvalid(remote_lsn)) |
1636 | 0 | store_flush_position(remote_lsn, winfo->shared->last_commit_end); |
1637 | |
|
1638 | 0 | pa_free_worker(winfo); |
1639 | 0 | } |