Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/replication/logical/applyparallelworker.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 * applyparallelworker.c
3
 *     Support routines for applying xact by parallel apply worker
4
 *
5
 * Copyright (c) 2023-2025, PostgreSQL Global Development Group
6
 *
7
 * IDENTIFICATION
8
 *    src/backend/replication/logical/applyparallelworker.c
9
 *
10
 * This file contains the code to launch, set up, and teardown a parallel apply
11
 * worker which receives the changes from the leader worker and invokes routines
12
 * to apply those on the subscriber database. Additionally, this file contains
13
 * routines that are intended to support setting up, using, and tearing down a
14
 * ParallelApplyWorkerInfo which is required so the leader worker and parallel
15
 * apply workers can communicate with each other.
16
 *
17
 * The parallel apply workers are assigned (if available) as soon as xact's
18
 * first stream is received for subscriptions that have set their 'streaming'
19
 * option as parallel. The leader apply worker will send changes to this new
20
 * worker via shared memory. We keep this worker assigned till the transaction
21
 * commit is received and also wait for the worker to finish at commit. This
22
 * preserves commit ordering and avoid file I/O in most cases, although we
23
 * still need to spill to a file if there is no worker available. See comments
24
 * atop logical/worker to know more about streamed xacts whose changes are
25
 * spilled to disk. It is important to maintain commit order to avoid failures
26
 * due to: (a) transaction dependencies - say if we insert a row in the first
27
 * transaction and update it in the second transaction on publisher then
28
 * allowing the subscriber to apply both in parallel can lead to failure in the
29
 * update; (b) deadlocks - allowing transactions that update the same set of
30
 * rows/tables in the opposite order to be applied in parallel can lead to
31
 * deadlocks.
32
 *
33
 * A worker pool is used to avoid restarting workers for each streaming
34
 * transaction. We maintain each worker's information (ParallelApplyWorkerInfo)
35
 * in the ParallelApplyWorkerPool. After successfully launching a new worker,
36
 * its information is added to the ParallelApplyWorkerPool. Once the worker
37
 * finishes applying the transaction, it is marked as available for re-use.
38
 * Now, before starting a new worker to apply the streaming transaction, we
39
 * check the list for any available worker. Note that we retain a maximum of
40
 * half the max_parallel_apply_workers_per_subscription workers in the pool and
41
 * after that, we simply exit the worker after applying the transaction.
42
 *
43
 * XXX This worker pool threshold is arbitrary and we can provide a GUC
44
 * variable for this in the future if required.
45
 *
46
 * The leader apply worker will create a separate dynamic shared memory segment
47
 * when each parallel apply worker starts. The reason for this design is that
48
 * we cannot predict how many workers will be needed. It may be possible to
49
 * allocate enough shared memory in one segment based on the maximum number of
50
 * parallel apply workers (max_parallel_apply_workers_per_subscription), but
51
 * this would waste memory if no process is actually started.
52
 *
53
 * The dynamic shared memory segment contains: (a) a shm_mq that is used to
54
 * send changes in the transaction from leader apply worker to parallel apply
55
 * worker; (b) another shm_mq that is used to send errors (and other messages
56
 * reported via elog/ereport) from the parallel apply worker to leader apply
57
 * worker; (c) necessary information to be shared among parallel apply workers
58
 * and the leader apply worker (i.e. members of ParallelApplyWorkerShared).
59
 *
60
 * Locking Considerations
61
 * ----------------------
62
 * We have a risk of deadlock due to concurrently applying the transactions in
63
 * parallel mode that were independent on the publisher side but became
64
 * dependent on the subscriber side due to the different database structures
65
 * (like schema of subscription tables, constraints, etc.) on each side. This
66
 * can happen even without parallel mode when there are concurrent operations
67
 * on the subscriber. In order to detect the deadlocks among leader (LA) and
68
 * parallel apply (PA) workers, we used lmgr locks when the PA waits for the
69
 * next stream (set of changes) and LA waits for PA to finish the transaction.
70
 * An alternative approach could be to not allow parallelism when the schema of
71
 * tables is different between the publisher and subscriber but that would be
72
 * too restrictive and would require the publisher to send much more
73
 * information than it is currently sending.
74
 *
75
 * Consider a case where the subscribed table does not have a unique key on the
76
 * publisher and has a unique key on the subscriber. The deadlock can happen in
77
 * the following ways:
78
 *
79
 * 1) Deadlock between the leader apply worker and a parallel apply worker
80
 *
81
 * Consider that the parallel apply worker (PA) is executing TX-1 and the
82
 * leader apply worker (LA) is executing TX-2 concurrently on the subscriber.
83
 * Now, LA is waiting for PA because of the unique key constraint of the
84
 * subscribed table while PA is waiting for LA to send the next stream of
85
 * changes or transaction finish command message.
86
 *
87
 * In order for lmgr to detect this, we have LA acquire a session lock on the
88
 * remote transaction (by pa_lock_stream()) and have PA wait on the lock before
89
 * trying to receive the next stream of changes. Specifically, LA will acquire
90
 * the lock in AccessExclusive mode before sending the STREAM_STOP and will
91
 * release it if already acquired after sending the STREAM_START, STREAM_ABORT
92
 * (for toplevel transaction), STREAM_PREPARE, and STREAM_COMMIT. The PA will
93
 * acquire the lock in AccessShare mode after processing STREAM_STOP and
94
 * STREAM_ABORT (for subtransaction) and then release the lock immediately
95
 * after acquiring it.
96
 *
97
 * The lock graph for the above example will look as follows:
98
 * LA (waiting to acquire the lock on the unique index) -> PA (waiting to
99
 * acquire the stream lock) -> LA
100
 *
101
 * This way, when PA is waiting for LA for the next stream of changes, we can
102
 * have a wait-edge from PA to LA in lmgr, which will make us detect the
103
 * deadlock between LA and PA.
104
 *
105
 * 2) Deadlock between the leader apply worker and parallel apply workers
106
 *
107
 * This scenario is similar to the first case but TX-1 and TX-2 are executed by
108
 * two parallel apply workers (PA-1 and PA-2 respectively). In this scenario,
109
 * PA-2 is waiting for PA-1 to complete its transaction while PA-1 is waiting
110
 * for subsequent input from LA. Also, LA is waiting for PA-2 to complete its
111
 * transaction in order to preserve the commit order. There is a deadlock among
112
 * the three processes.
113
 *
114
 * In order for lmgr to detect this, we have PA acquire a session lock (this is
115
 * a different lock than referred in the previous case, see
116
 * pa_lock_transaction()) on the transaction being applied and have LA wait on
117
 * the lock before proceeding in the transaction finish commands. Specifically,
118
 * PA will acquire this lock in AccessExclusive mode before executing the first
119
 * message of the transaction and release it at the xact end. LA will acquire
120
 * this lock in AccessShare mode at transaction finish commands and release it
121
 * immediately.
122
 *
123
 * The lock graph for the above example will look as follows:
124
 * LA (waiting to acquire the transaction lock) -> PA-2 (waiting to acquire the
125
 * lock due to unique index constraint) -> PA-1 (waiting to acquire the stream
126
 * lock) -> LA
127
 *
128
 * This way when LA is waiting to finish the transaction end command to preserve
129
 * the commit order, we will be able to detect deadlock, if any.
130
 *
131
 * One might think we can use XactLockTableWait(), but XactLockTableWait()
132
 * considers PREPARED TRANSACTION as still in progress which means the lock
133
 * won't be released even after the parallel apply worker has prepared the
134
 * transaction.
135
 *
136
 * 3) Deadlock when the shm_mq buffer is full
137
 *
138
 * In the previous scenario (ie. PA-1 and PA-2 are executing transactions
139
 * concurrently), if the shm_mq buffer between LA and PA-2 is full, LA has to
140
 * wait to send messages, and this wait doesn't appear in lmgr.
141
 *
142
 * To avoid this wait, we use a non-blocking write and wait with a timeout. If
143
 * the timeout is exceeded, the LA will serialize all the pending messages to
144
 * a file and indicate PA-2 that it needs to read that file for the remaining
145
 * messages. Then LA will start waiting for commit as in the previous case
146
 * which will detect deadlock if any. See pa_send_data() and
147
 * enum TransApplyAction.
148
 *
149
 * Lock types
150
 * ----------
151
 * Both the stream lock and the transaction lock mentioned above are
152
 * session-level locks because both locks could be acquired outside the
153
 * transaction, and the stream lock in the leader needs to persist across
154
 * transaction boundaries i.e. until the end of the streaming transaction.
155
 *-------------------------------------------------------------------------
156
 */
157
158
#include "postgres.h"
159
160
#include "libpq/pqformat.h"
161
#include "libpq/pqmq.h"
162
#include "pgstat.h"
163
#include "postmaster/interrupt.h"
164
#include "replication/logicallauncher.h"
165
#include "replication/logicalworker.h"
166
#include "replication/origin.h"
167
#include "replication/worker_internal.h"
168
#include "storage/ipc.h"
169
#include "storage/lmgr.h"
170
#include "tcop/tcopprot.h"
171
#include "utils/inval.h"
172
#include "utils/memutils.h"
173
#include "utils/syscache.h"
174
175
0
#define PG_LOGICAL_APPLY_SHM_MAGIC 0x787ca067
176
177
/*
178
 * DSM keys for parallel apply worker. Unlike other parallel execution code,
179
 * since we don't need to worry about DSM keys conflicting with plan_node_id we
180
 * can use small integers.
181
 */
182
0
#define PARALLEL_APPLY_KEY_SHARED   1
183
0
#define PARALLEL_APPLY_KEY_MQ     2
184
0
#define PARALLEL_APPLY_KEY_ERROR_QUEUE  3
185
186
/* Queue size of DSM, 16 MB for now. */
187
0
#define DSM_QUEUE_SIZE  (16 * 1024 * 1024)
188
189
/*
190
 * Error queue size of DSM. It is desirable to make it large enough that a
191
 * typical ErrorResponse can be sent without blocking. That way, a worker that
192
 * errors out can write the whole message into the queue and terminate without
193
 * waiting for the user backend.
194
 */
195
0
#define DSM_ERROR_QUEUE_SIZE      (16 * 1024)
196
197
/*
198
 * There are three fields in each message received by the parallel apply
199
 * worker: start_lsn, end_lsn and send_time. Because we have updated these
200
 * statistics in the leader apply worker, we can ignore these fields in the
201
 * parallel apply worker (see function LogicalRepApplyLoop).
202
 */
203
0
#define SIZE_STATS_MESSAGE (2 * sizeof(XLogRecPtr) + sizeof(TimestampTz))
204
205
/*
206
 * The type of session-level lock on a transaction being applied on a logical
207
 * replication subscriber.
208
 */
209
0
#define PARALLEL_APPLY_LOCK_STREAM  0
210
0
#define PARALLEL_APPLY_LOCK_XACT  1
211
212
/*
213
 * Hash table entry to map xid to the parallel apply worker state.
214
 */
215
typedef struct ParallelApplyWorkerEntry
216
{
217
  TransactionId xid;      /* Hash key -- must be first */
218
  ParallelApplyWorkerInfo *winfo;
219
} ParallelApplyWorkerEntry;
220
221
/*
222
 * A hash table used to cache the state of streaming transactions being applied
223
 * by the parallel apply workers.
224
 */
225
static HTAB *ParallelApplyTxnHash = NULL;
226
227
/*
228
* A list (pool) of active parallel apply workers. The information for
229
* the new worker is added to the list after successfully launching it. The
230
* list entry is removed if there are already enough workers in the worker
231
* pool at the end of the transaction. For more information about the worker
232
* pool, see comments atop this file.
233
 */
234
static List *ParallelApplyWorkerPool = NIL;
235
236
/*
237
 * Information shared between leader apply worker and parallel apply worker.
238
 */
239
ParallelApplyWorkerShared *MyParallelShared = NULL;
240
241
/*
242
 * Is there a message sent by a parallel apply worker that the leader apply
243
 * worker needs to receive?
244
 */
245
volatile sig_atomic_t ParallelApplyMessagePending = false;
246
247
/*
248
 * Cache the parallel apply worker information required for applying the
249
 * current streaming transaction. It is used to save the cost of searching the
250
 * hash table when applying the changes between STREAM_START and STREAM_STOP.
251
 */
252
static ParallelApplyWorkerInfo *stream_apply_worker = NULL;
253
254
/* A list to maintain subtransactions, if any. */
255
static List *subxactlist = NIL;
256
257
static void pa_free_worker_info(ParallelApplyWorkerInfo *winfo);
258
static ParallelTransState pa_get_xact_state(ParallelApplyWorkerShared *wshared);
259
static PartialFileSetState pa_get_fileset_state(void);
260
261
/*
262
 * Returns true if it is OK to start a parallel apply worker, false otherwise.
263
 */
264
static bool
265
pa_can_start(void)
266
0
{
267
  /* Only leader apply workers can start parallel apply workers. */
268
0
  if (!am_leader_apply_worker())
269
0
    return false;
270
271
  /*
272
   * It is good to check for any change in the subscription parameter to
273
   * avoid the case where for a very long time the change doesn't get
274
   * reflected. This can happen when there is a constant flow of streaming
275
   * transactions that are handled by parallel apply workers.
276
   *
277
   * It is better to do it before the below checks so that the latest values
278
   * of subscription can be used for the checks.
279
   */
280
0
  maybe_reread_subscription();
281
282
  /*
283
   * Don't start a new parallel apply worker if the subscription is not
284
   * using parallel streaming mode, or if the publisher does not support
285
   * parallel apply.
286
   */
287
0
  if (!MyLogicalRepWorker->parallel_apply)
288
0
    return false;
289
290
  /*
291
   * Don't start a new parallel worker if user has set skiplsn as it's
292
   * possible that they want to skip the streaming transaction. For
293
   * streaming transactions, we need to serialize the transaction to a file
294
   * so that we can get the last LSN of the transaction to judge whether to
295
   * skip before starting to apply the change.
296
   *
297
   * One might think that we could allow parallelism if the first lsn of the
298
   * transaction is greater than skiplsn, but we don't send it with the
299
   * STREAM START message, and it doesn't seem worth sending the extra eight
300
   * bytes with the STREAM START to enable parallelism for this case.
301
   */
302
0
  if (!XLogRecPtrIsInvalid(MySubscription->skiplsn))
303
0
    return false;
304
305
  /*
306
   * For streaming transactions that are being applied using a parallel
307
   * apply worker, we cannot decide whether to apply the change for a
308
   * relation that is not in the READY state (see
309
   * should_apply_changes_for_rel) as we won't know remote_final_lsn by that
310
   * time. So, we don't start the new parallel apply worker in this case.
311
   */
312
0
  if (!AllTablesyncsReady())
313
0
    return false;
314
315
0
  return true;
316
0
}
317
318
/*
319
 * Set up a dynamic shared memory segment.
320
 *
321
 * We set up a control region that contains a fixed-size worker info
322
 * (ParallelApplyWorkerShared), a message queue, and an error queue.
323
 *
324
 * Returns true on success, false on failure.
325
 */
326
static bool
327
pa_setup_dsm(ParallelApplyWorkerInfo *winfo)
328
0
{
329
0
  shm_toc_estimator e;
330
0
  Size    segsize;
331
0
  dsm_segment *seg;
332
0
  shm_toc    *toc;
333
0
  ParallelApplyWorkerShared *shared;
334
0
  shm_mq     *mq;
335
0
  Size    queue_size = DSM_QUEUE_SIZE;
336
0
  Size    error_queue_size = DSM_ERROR_QUEUE_SIZE;
337
338
  /*
339
   * Estimate how much shared memory we need.
340
   *
341
   * Because the TOC machinery may choose to insert padding of oddly-sized
342
   * requests, we must estimate each chunk separately.
343
   *
344
   * We need one key to register the location of the header, and two other
345
   * keys to track the locations of the message queue and the error message
346
   * queue.
347
   */
348
0
  shm_toc_initialize_estimator(&e);
349
0
  shm_toc_estimate_chunk(&e, sizeof(ParallelApplyWorkerShared));
350
0
  shm_toc_estimate_chunk(&e, queue_size);
351
0
  shm_toc_estimate_chunk(&e, error_queue_size);
352
353
0
  shm_toc_estimate_keys(&e, 3);
354
0
  segsize = shm_toc_estimate(&e);
355
356
  /* Create the shared memory segment and establish a table of contents. */
357
0
  seg = dsm_create(shm_toc_estimate(&e), 0);
358
0
  if (!seg)
359
0
    return false;
360
361
0
  toc = shm_toc_create(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg),
362
0
             segsize);
363
364
  /* Set up the header region. */
365
0
  shared = shm_toc_allocate(toc, sizeof(ParallelApplyWorkerShared));
366
0
  SpinLockInit(&shared->mutex);
367
368
0
  shared->xact_state = PARALLEL_TRANS_UNKNOWN;
369
0
  pg_atomic_init_u32(&(shared->pending_stream_count), 0);
370
0
  shared->last_commit_end = InvalidXLogRecPtr;
371
0
  shared->fileset_state = FS_EMPTY;
372
373
0
  shm_toc_insert(toc, PARALLEL_APPLY_KEY_SHARED, shared);
374
375
  /* Set up message queue for the worker. */
376
0
  mq = shm_mq_create(shm_toc_allocate(toc, queue_size), queue_size);
377
0
  shm_toc_insert(toc, PARALLEL_APPLY_KEY_MQ, mq);
378
0
  shm_mq_set_sender(mq, MyProc);
379
380
  /* Attach the queue. */
381
0
  winfo->mq_handle = shm_mq_attach(mq, seg, NULL);
382
383
  /* Set up error queue for the worker. */
384
0
  mq = shm_mq_create(shm_toc_allocate(toc, error_queue_size),
385
0
             error_queue_size);
386
0
  shm_toc_insert(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, mq);
387
0
  shm_mq_set_receiver(mq, MyProc);
388
389
  /* Attach the queue. */
390
0
  winfo->error_mq_handle = shm_mq_attach(mq, seg, NULL);
391
392
  /* Return results to caller. */
393
0
  winfo->dsm_seg = seg;
394
0
  winfo->shared = shared;
395
396
0
  return true;
397
0
}
398
399
/*
400
 * Try to get a parallel apply worker from the pool. If none is available then
401
 * start a new one.
402
 */
403
static ParallelApplyWorkerInfo *
404
pa_launch_parallel_worker(void)
405
0
{
406
0
  MemoryContext oldcontext;
407
0
  bool    launched;
408
0
  ParallelApplyWorkerInfo *winfo;
409
0
  ListCell   *lc;
410
411
  /* Try to get an available parallel apply worker from the worker pool. */
412
0
  foreach(lc, ParallelApplyWorkerPool)
413
0
  {
414
0
    winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
415
416
0
    if (!winfo->in_use)
417
0
      return winfo;
418
0
  }
419
420
  /*
421
   * Start a new parallel apply worker.
422
   *
423
   * The worker info can be used for the lifetime of the worker process, so
424
   * create it in a permanent context.
425
   */
426
0
  oldcontext = MemoryContextSwitchTo(ApplyContext);
427
428
0
  winfo = (ParallelApplyWorkerInfo *) palloc0(sizeof(ParallelApplyWorkerInfo));
429
430
  /* Setup shared memory. */
431
0
  if (!pa_setup_dsm(winfo))
432
0
  {
433
0
    MemoryContextSwitchTo(oldcontext);
434
0
    pfree(winfo);
435
0
    return NULL;
436
0
  }
437
438
0
  launched = logicalrep_worker_launch(WORKERTYPE_PARALLEL_APPLY,
439
0
                    MyLogicalRepWorker->dbid,
440
0
                    MySubscription->oid,
441
0
                    MySubscription->name,
442
0
                    MyLogicalRepWorker->userid,
443
0
                    InvalidOid,
444
0
                    dsm_segment_handle(winfo->dsm_seg));
445
446
0
  if (launched)
447
0
  {
448
0
    ParallelApplyWorkerPool = lappend(ParallelApplyWorkerPool, winfo);
449
0
  }
450
0
  else
451
0
  {
452
0
    pa_free_worker_info(winfo);
453
0
    winfo = NULL;
454
0
  }
455
456
0
  MemoryContextSwitchTo(oldcontext);
457
458
0
  return winfo;
459
0
}
460
461
/*
462
 * Allocate a parallel apply worker that will be used for the specified xid.
463
 *
464
 * We first try to get an available worker from the pool, if any and then try
465
 * to launch a new worker. On successful allocation, remember the worker
466
 * information in the hash table so that we can get it later for processing the
467
 * streaming changes.
468
 */
469
void
470
pa_allocate_worker(TransactionId xid)
471
0
{
472
0
  bool    found;
473
0
  ParallelApplyWorkerInfo *winfo = NULL;
474
0
  ParallelApplyWorkerEntry *entry;
475
476
0
  if (!pa_can_start())
477
0
    return;
478
479
0
  winfo = pa_launch_parallel_worker();
480
0
  if (!winfo)
481
0
    return;
482
483
  /* First time through, initialize parallel apply worker state hashtable. */
484
0
  if (!ParallelApplyTxnHash)
485
0
  {
486
0
    HASHCTL   ctl;
487
488
0
    MemSet(&ctl, 0, sizeof(ctl));
489
0
    ctl.keysize = sizeof(TransactionId);
490
0
    ctl.entrysize = sizeof(ParallelApplyWorkerEntry);
491
0
    ctl.hcxt = ApplyContext;
492
493
0
    ParallelApplyTxnHash = hash_create("logical replication parallel apply workers hash",
494
0
                       16, &ctl,
495
0
                       HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
496
0
  }
497
498
  /* Create an entry for the requested transaction. */
499
0
  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_ENTER, &found);
500
0
  if (found)
501
0
    elog(ERROR, "hash table corrupted");
502
503
  /* Update the transaction information in shared memory. */
504
0
  SpinLockAcquire(&winfo->shared->mutex);
505
0
  winfo->shared->xact_state = PARALLEL_TRANS_UNKNOWN;
506
0
  winfo->shared->xid = xid;
507
0
  SpinLockRelease(&winfo->shared->mutex);
508
509
0
  winfo->in_use = true;
510
0
  winfo->serialize_changes = false;
511
0
  entry->winfo = winfo;
512
0
}
513
514
/*
515
 * Find the assigned worker for the given transaction, if any.
516
 */
517
ParallelApplyWorkerInfo *
518
pa_find_worker(TransactionId xid)
519
0
{
520
0
  bool    found;
521
0
  ParallelApplyWorkerEntry *entry;
522
523
0
  if (!TransactionIdIsValid(xid))
524
0
    return NULL;
525
526
0
  if (!ParallelApplyTxnHash)
527
0
    return NULL;
528
529
  /* Return the cached parallel apply worker if valid. */
530
0
  if (stream_apply_worker)
531
0
    return stream_apply_worker;
532
533
  /* Find an entry for the requested transaction. */
534
0
  entry = hash_search(ParallelApplyTxnHash, &xid, HASH_FIND, &found);
535
0
  if (found)
536
0
  {
537
    /* The worker must not have exited.  */
538
0
    Assert(entry->winfo->in_use);
539
0
    return entry->winfo;
540
0
  }
541
542
0
  return NULL;
543
0
}
544
545
/*
546
 * Makes the worker available for reuse.
547
 *
548
 * This removes the parallel apply worker entry from the hash table so that it
549
 * can't be used. If there are enough workers in the pool, it stops the worker
550
 * and frees the corresponding info. Otherwise it just marks the worker as
551
 * available for reuse.
552
 *
553
 * For more information about the worker pool, see comments atop this file.
554
 */
555
static void
556
pa_free_worker(ParallelApplyWorkerInfo *winfo)
557
0
{
558
0
  Assert(!am_parallel_apply_worker());
559
0
  Assert(winfo->in_use);
560
0
  Assert(pa_get_xact_state(winfo->shared) == PARALLEL_TRANS_FINISHED);
561
562
0
  if (!hash_search(ParallelApplyTxnHash, &winfo->shared->xid, HASH_REMOVE, NULL))
563
0
    elog(ERROR, "hash table corrupted");
564
565
  /*
566
   * Stop the worker if there are enough workers in the pool.
567
   *
568
   * XXX Additionally, we also stop the worker if the leader apply worker
569
   * serialize part of the transaction data due to a send timeout. This is
570
   * because the message could be partially written to the queue and there
571
   * is no way to clean the queue other than resending the message until it
572
   * succeeds. Instead of trying to send the data which anyway would have
573
   * been serialized and then letting the parallel apply worker deal with
574
   * the spurious message, we stop the worker.
575
   */
576
0
  if (winfo->serialize_changes ||
577
0
    list_length(ParallelApplyWorkerPool) >
578
0
    (max_parallel_apply_workers_per_subscription / 2))
579
0
  {
580
0
    logicalrep_pa_worker_stop(winfo);
581
0
    pa_free_worker_info(winfo);
582
583
0
    return;
584
0
  }
585
586
0
  winfo->in_use = false;
587
0
  winfo->serialize_changes = false;
588
0
}
589
590
/*
591
 * Free the parallel apply worker information and unlink the files with
592
 * serialized changes if any.
593
 */
594
static void
595
pa_free_worker_info(ParallelApplyWorkerInfo *winfo)
596
0
{
597
0
  Assert(winfo);
598
599
0
  if (winfo->mq_handle)
600
0
    shm_mq_detach(winfo->mq_handle);
601
602
0
  if (winfo->error_mq_handle)
603
0
    shm_mq_detach(winfo->error_mq_handle);
604
605
  /* Unlink the files with serialized changes. */
606
0
  if (winfo->serialize_changes)
607
0
    stream_cleanup_files(MyLogicalRepWorker->subid, winfo->shared->xid);
608
609
0
  if (winfo->dsm_seg)
610
0
    dsm_detach(winfo->dsm_seg);
611
612
  /* Remove from the worker pool. */
613
0
  ParallelApplyWorkerPool = list_delete_ptr(ParallelApplyWorkerPool, winfo);
614
615
0
  pfree(winfo);
616
0
}
617
618
/*
619
 * Detach the error queue for all parallel apply workers.
620
 */
621
void
622
pa_detach_all_error_mq(void)
623
0
{
624
0
  ListCell   *lc;
625
626
0
  foreach(lc, ParallelApplyWorkerPool)
627
0
  {
628
0
    ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
629
630
0
    if (winfo->error_mq_handle)
631
0
    {
632
0
      shm_mq_detach(winfo->error_mq_handle);
633
0
      winfo->error_mq_handle = NULL;
634
0
    }
635
0
  }
636
0
}
637
638
/*
639
 * Check if there are any pending spooled messages.
640
 */
641
static bool
642
pa_has_spooled_message_pending()
643
0
{
644
0
  PartialFileSetState fileset_state;
645
646
0
  fileset_state = pa_get_fileset_state();
647
648
0
  return (fileset_state != FS_EMPTY);
649
0
}
650
651
/*
652
 * Replay the spooled messages once the leader apply worker has finished
653
 * serializing changes to the file.
654
 *
655
 * Returns false if there aren't any pending spooled messages, true otherwise.
656
 */
657
static bool
658
pa_process_spooled_messages_if_required(void)
659
0
{
660
0
  PartialFileSetState fileset_state;
661
662
0
  fileset_state = pa_get_fileset_state();
663
664
0
  if (fileset_state == FS_EMPTY)
665
0
    return false;
666
667
  /*
668
   * If the leader apply worker is busy serializing the partial changes then
669
   * acquire the stream lock now and wait for the leader worker to finish
670
   * serializing the changes. Otherwise, the parallel apply worker won't get
671
   * a chance to receive a STREAM_STOP (and acquire the stream lock) until
672
   * the leader had serialized all changes which can lead to undetected
673
   * deadlock.
674
   *
675
   * Note that the fileset state can be FS_SERIALIZE_DONE once the leader
676
   * worker has finished serializing the changes.
677
   */
678
0
  if (fileset_state == FS_SERIALIZE_IN_PROGRESS)
679
0
  {
680
0
    pa_lock_stream(MyParallelShared->xid, AccessShareLock);
681
0
    pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
682
683
0
    fileset_state = pa_get_fileset_state();
684
0
  }
685
686
  /*
687
   * We cannot read the file immediately after the leader has serialized all
688
   * changes to the file because there may still be messages in the memory
689
   * queue. We will apply all spooled messages the next time we call this
690
   * function and that will ensure there are no messages left in the memory
691
   * queue.
692
   */
693
0
  if (fileset_state == FS_SERIALIZE_DONE)
694
0
  {
695
0
    pa_set_fileset_state(MyParallelShared, FS_READY);
696
0
  }
697
0
  else if (fileset_state == FS_READY)
698
0
  {
699
0
    apply_spooled_messages(&MyParallelShared->fileset,
700
0
                 MyParallelShared->xid,
701
0
                 InvalidXLogRecPtr);
702
0
    pa_set_fileset_state(MyParallelShared, FS_EMPTY);
703
0
  }
704
705
0
  return true;
706
0
}
707
708
/*
709
 * Interrupt handler for main loop of parallel apply worker.
710
 */
711
static void
712
ProcessParallelApplyInterrupts(void)
713
0
{
714
0
  CHECK_FOR_INTERRUPTS();
715
716
0
  if (ShutdownRequestPending)
717
0
  {
718
0
    ereport(LOG,
719
0
        (errmsg("logical replication parallel apply worker for subscription \"%s\" has finished",
720
0
            MySubscription->name)));
721
722
0
    proc_exit(0);
723
0
  }
724
725
0
  if (ConfigReloadPending)
726
0
  {
727
0
    ConfigReloadPending = false;
728
0
    ProcessConfigFile(PGC_SIGHUP);
729
0
  }
730
0
}
731
732
/* Parallel apply worker main loop. */
733
static void
734
LogicalParallelApplyLoop(shm_mq_handle *mqh)
735
0
{
736
0
  shm_mq_result shmq_res;
737
0
  ErrorContextCallback errcallback;
738
0
  MemoryContext oldcxt = CurrentMemoryContext;
739
740
  /*
741
   * Init the ApplyMessageContext which we clean up after each replication
742
   * protocol message.
743
   */
744
0
  ApplyMessageContext = AllocSetContextCreate(ApplyContext,
745
0
                        "ApplyMessageContext",
746
0
                        ALLOCSET_DEFAULT_SIZES);
747
748
  /*
749
   * Push apply error context callback. Fields will be filled while applying
750
   * a change.
751
   */
752
0
  errcallback.callback = apply_error_callback;
753
0
  errcallback.previous = error_context_stack;
754
0
  error_context_stack = &errcallback;
755
756
0
  for (;;)
757
0
  {
758
0
    void     *data;
759
0
    Size    len;
760
761
0
    ProcessParallelApplyInterrupts();
762
763
    /* Ensure we are reading the data into our memory context. */
764
0
    MemoryContextSwitchTo(ApplyMessageContext);
765
766
0
    shmq_res = shm_mq_receive(mqh, &len, &data, true);
767
768
0
    if (shmq_res == SHM_MQ_SUCCESS)
769
0
    {
770
0
      StringInfoData s;
771
0
      int     c;
772
773
0
      if (len == 0)
774
0
        elog(ERROR, "invalid message length");
775
776
0
      initReadOnlyStringInfo(&s, data, len);
777
778
      /*
779
       * The first byte of messages sent from leader apply worker to
780
       * parallel apply workers can only be 'w'.
781
       */
782
0
      c = pq_getmsgbyte(&s);
783
0
      if (c != 'w')
784
0
        elog(ERROR, "unexpected message \"%c\"", c);
785
786
      /*
787
       * Ignore statistics fields that have been updated by the leader
788
       * apply worker.
789
       *
790
       * XXX We can avoid sending the statistics fields from the leader
791
       * apply worker but for that, it needs to rebuild the entire
792
       * message by removing these fields which could be more work than
793
       * simply ignoring these fields in the parallel apply worker.
794
       */
795
0
      s.cursor += SIZE_STATS_MESSAGE;
796
797
0
      apply_dispatch(&s);
798
0
    }
799
0
    else if (shmq_res == SHM_MQ_WOULD_BLOCK)
800
0
    {
801
      /* Replay the changes from the file, if any. */
802
0
      if (!pa_process_spooled_messages_if_required())
803
0
      {
804
0
        int     rc;
805
806
        /* Wait for more work. */
807
0
        rc = WaitLatch(MyLatch,
808
0
                 WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
809
0
                 1000L,
810
0
                 WAIT_EVENT_LOGICAL_PARALLEL_APPLY_MAIN);
811
812
0
        if (rc & WL_LATCH_SET)
813
0
          ResetLatch(MyLatch);
814
0
      }
815
0
    }
816
0
    else
817
0
    {
818
0
      Assert(shmq_res == SHM_MQ_DETACHED);
819
820
0
      ereport(ERROR,
821
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
822
0
           errmsg("lost connection to the logical replication apply worker")));
823
0
    }
824
825
0
    MemoryContextReset(ApplyMessageContext);
826
0
    MemoryContextSwitchTo(oldcxt);
827
0
  }
828
829
  /* Pop the error context stack. */
830
0
  error_context_stack = errcallback.previous;
831
832
0
  MemoryContextSwitchTo(oldcxt);
833
0
}
834
835
/*
836
 * Make sure the leader apply worker tries to read from our error queue one more
837
 * time. This guards against the case where we exit uncleanly without sending
838
 * an ErrorResponse, for example because some code calls proc_exit directly.
839
 *
840
 * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
841
 * if any. See ParallelWorkerShutdown for details.
842
 */
843
static void
844
pa_shutdown(int code, Datum arg)
845
0
{
846
0
  SendProcSignal(MyLogicalRepWorker->leader_pid,
847
0
           PROCSIG_PARALLEL_APPLY_MESSAGE,
848
0
           INVALID_PROC_NUMBER);
849
850
0
  dsm_detach((dsm_segment *) DatumGetPointer(arg));
851
0
}
852
853
/*
854
 * Parallel apply worker entry point.
855
 */
856
void
857
ParallelApplyWorkerMain(Datum main_arg)
858
0
{
859
0
  ParallelApplyWorkerShared *shared;
860
0
  dsm_handle  handle;
861
0
  dsm_segment *seg;
862
0
  shm_toc    *toc;
863
0
  shm_mq     *mq;
864
0
  shm_mq_handle *mqh;
865
0
  shm_mq_handle *error_mqh;
866
0
  RepOriginId originid;
867
0
  int     worker_slot = DatumGetInt32(main_arg);
868
0
  char    originname[NAMEDATALEN];
869
870
0
  InitializingApplyWorker = true;
871
872
  /* Setup signal handling. */
873
0
  pqsignal(SIGHUP, SignalHandlerForConfigReload);
874
0
  pqsignal(SIGINT, SignalHandlerForShutdownRequest);
875
0
  pqsignal(SIGTERM, die);
876
0
  BackgroundWorkerUnblockSignals();
877
878
  /*
879
   * Attach to the dynamic shared memory segment for the parallel apply, and
880
   * find its table of contents.
881
   *
882
   * Like parallel query, we don't need resource owner by this time. See
883
   * ParallelWorkerMain.
884
   */
885
0
  memcpy(&handle, MyBgworkerEntry->bgw_extra, sizeof(dsm_handle));
886
0
  seg = dsm_attach(handle);
887
0
  if (!seg)
888
0
    ereport(ERROR,
889
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
890
0
         errmsg("could not map dynamic shared memory segment")));
891
892
0
  toc = shm_toc_attach(PG_LOGICAL_APPLY_SHM_MAGIC, dsm_segment_address(seg));
893
0
  if (!toc)
894
0
    ereport(ERROR,
895
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
896
0
         errmsg("invalid magic number in dynamic shared memory segment")));
897
898
  /* Look up the shared information. */
899
0
  shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
900
0
  MyParallelShared = shared;
901
902
  /*
903
   * Attach to the message queue.
904
   */
905
0
  mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_MQ, false);
906
0
  shm_mq_set_receiver(mq, MyProc);
907
0
  mqh = shm_mq_attach(mq, seg, NULL);
908
909
  /*
910
   * Primary initialization is complete. Now, we can attach to our slot.
911
   * This is to ensure that the leader apply worker does not write data to
912
   * the uninitialized memory queue.
913
   */
914
0
  logicalrep_worker_attach(worker_slot);
915
916
  /*
917
   * Register the shutdown callback after we are attached to the worker
918
   * slot. This is to ensure that MyLogicalRepWorker remains valid when this
919
   * callback is invoked.
920
   */
921
0
  before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
922
923
0
  SpinLockAcquire(&MyParallelShared->mutex);
924
0
  MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
925
0
  MyParallelShared->logicalrep_worker_slot_no = worker_slot;
926
0
  SpinLockRelease(&MyParallelShared->mutex);
927
928
  /*
929
   * Attach to the error queue.
930
   */
931
0
  mq = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_ERROR_QUEUE, false);
932
0
  shm_mq_set_sender(mq, MyProc);
933
0
  error_mqh = shm_mq_attach(mq, seg, NULL);
934
935
0
  pq_redirect_to_shm_mq(seg, error_mqh);
936
0
  pq_set_parallel_leader(MyLogicalRepWorker->leader_pid,
937
0
               INVALID_PROC_NUMBER);
938
939
0
  MyLogicalRepWorker->last_send_time = MyLogicalRepWorker->last_recv_time =
940
0
    MyLogicalRepWorker->reply_time = 0;
941
942
0
  InitializeLogRepWorker();
943
944
0
  InitializingApplyWorker = false;
945
946
  /* Setup replication origin tracking. */
947
0
  StartTransactionCommand();
948
0
  ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
949
0
                     originname, sizeof(originname));
950
0
  originid = replorigin_by_name(originname, false);
951
952
  /*
953
   * The parallel apply worker doesn't need to monopolize this replication
954
   * origin which was already acquired by its leader process.
955
   */
956
0
  replorigin_session_setup(originid, MyLogicalRepWorker->leader_pid);
957
0
  replorigin_session_origin = originid;
958
0
  CommitTransactionCommand();
959
960
  /*
961
   * Setup callback for syscache so that we know when something changes in
962
   * the subscription relation state.
963
   */
964
0
  CacheRegisterSyscacheCallback(SUBSCRIPTIONRELMAP,
965
0
                  invalidate_syncing_table_states,
966
0
                  (Datum) 0);
967
968
0
  set_apply_error_context_origin(originname);
969
970
0
  LogicalParallelApplyLoop(mqh);
971
972
  /*
973
   * The parallel apply worker must not get here because the parallel apply
974
   * worker will only stop when it receives a SIGTERM or SIGINT from the
975
   * leader, or when there is an error. None of these cases will allow the
976
   * code to reach here.
977
   */
978
0
  Assert(false);
979
0
}
980
981
/*
982
 * Handle receipt of an interrupt indicating a parallel apply worker message.
983
 *
984
 * Note: this is called within a signal handler! All we can do is set a flag
985
 * that will cause the next CHECK_FOR_INTERRUPTS() to invoke
986
 * ProcessParallelApplyMessages().
987
 */
988
void
989
HandleParallelApplyMessageInterrupt(void)
990
0
{
991
0
  InterruptPending = true;
992
0
  ParallelApplyMessagePending = true;
993
0
  SetLatch(MyLatch);
994
0
}
995
996
/*
997
 * Process a single protocol message received from a single parallel apply
998
 * worker.
999
 */
1000
static void
1001
ProcessParallelApplyMessage(StringInfo msg)
1002
0
{
1003
0
  char    msgtype;
1004
1005
0
  msgtype = pq_getmsgbyte(msg);
1006
1007
0
  switch (msgtype)
1008
0
  {
1009
0
    case 'E':       /* ErrorResponse */
1010
0
      {
1011
0
        ErrorData edata;
1012
1013
        /* Parse ErrorResponse. */
1014
0
        pq_parse_errornotice(msg, &edata);
1015
1016
        /*
1017
         * If desired, add a context line to show that this is a
1018
         * message propagated from a parallel apply worker. Otherwise,
1019
         * it can sometimes be confusing to understand what actually
1020
         * happened.
1021
         */
1022
0
        if (edata.context)
1023
0
          edata.context = psprintf("%s\n%s", edata.context,
1024
0
                       _("logical replication parallel apply worker"));
1025
0
        else
1026
0
          edata.context = pstrdup(_("logical replication parallel apply worker"));
1027
1028
        /*
1029
         * Context beyond that should use the error context callbacks
1030
         * that were in effect in LogicalRepApplyLoop().
1031
         */
1032
0
        error_context_stack = apply_error_context_stack;
1033
1034
        /*
1035
         * The actual error must have been reported by the parallel
1036
         * apply worker.
1037
         */
1038
0
        ereport(ERROR,
1039
0
            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1040
0
             errmsg("logical replication parallel apply worker exited due to error"),
1041
0
             errcontext("%s", edata.context)));
1042
0
      }
1043
1044
      /*
1045
       * Don't need to do anything about NoticeResponse and
1046
       * NotifyResponse as the logical replication worker doesn't need
1047
       * to send messages to the client.
1048
       */
1049
0
    case 'N':
1050
0
    case 'A':
1051
0
      break;
1052
1053
0
    default:
1054
0
      elog(ERROR, "unrecognized message type received from logical replication parallel apply worker: %c (message length %d bytes)",
1055
0
         msgtype, msg->len);
1056
0
  }
1057
0
}
1058
1059
/*
1060
 * Handle any queued protocol messages received from parallel apply workers.
1061
 */
1062
void
1063
ProcessParallelApplyMessages(void)
1064
0
{
1065
0
  ListCell   *lc;
1066
0
  MemoryContext oldcontext;
1067
1068
0
  static MemoryContext hpam_context = NULL;
1069
1070
  /*
1071
   * This is invoked from ProcessInterrupts(), and since some of the
1072
   * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1073
   * for recursive calls if more signals are received while this runs. It's
1074
   * unclear that recursive entry would be safe, and it doesn't seem useful
1075
   * even if it is safe, so let's block interrupts until done.
1076
   */
1077
0
  HOLD_INTERRUPTS();
1078
1079
  /*
1080
   * Moreover, CurrentMemoryContext might be pointing almost anywhere. We
1081
   * don't want to risk leaking data into long-lived contexts, so let's do
1082
   * our work here in a private context that we can reset on each use.
1083
   */
1084
0
  if (!hpam_context)     /* first time through? */
1085
0
    hpam_context = AllocSetContextCreate(TopMemoryContext,
1086
0
                       "ProcessParallelApplyMessages",
1087
0
                       ALLOCSET_DEFAULT_SIZES);
1088
0
  else
1089
0
    MemoryContextReset(hpam_context);
1090
1091
0
  oldcontext = MemoryContextSwitchTo(hpam_context);
1092
1093
0
  ParallelApplyMessagePending = false;
1094
1095
0
  foreach(lc, ParallelApplyWorkerPool)
1096
0
  {
1097
0
    shm_mq_result res;
1098
0
    Size    nbytes;
1099
0
    void     *data;
1100
0
    ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
1101
1102
    /*
1103
     * The leader will detach from the error queue and set it to NULL
1104
     * before preparing to stop all parallel apply workers, so we don't
1105
     * need to handle error messages anymore. See
1106
     * logicalrep_worker_detach.
1107
     */
1108
0
    if (!winfo->error_mq_handle)
1109
0
      continue;
1110
1111
0
    res = shm_mq_receive(winfo->error_mq_handle, &nbytes, &data, true);
1112
1113
0
    if (res == SHM_MQ_WOULD_BLOCK)
1114
0
      continue;
1115
0
    else if (res == SHM_MQ_SUCCESS)
1116
0
    {
1117
0
      StringInfoData msg;
1118
1119
0
      initStringInfo(&msg);
1120
0
      appendBinaryStringInfo(&msg, data, nbytes);
1121
0
      ProcessParallelApplyMessage(&msg);
1122
0
      pfree(msg.data);
1123
0
    }
1124
0
    else
1125
0
      ereport(ERROR,
1126
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1127
0
           errmsg("lost connection to the logical replication parallel apply worker")));
1128
0
  }
1129
1130
0
  MemoryContextSwitchTo(oldcontext);
1131
1132
  /* Might as well clear the context on our way out */
1133
0
  MemoryContextReset(hpam_context);
1134
1135
0
  RESUME_INTERRUPTS();
1136
0
}
1137
1138
/*
1139
 * Send the data to the specified parallel apply worker via shared-memory
1140
 * queue.
1141
 *
1142
 * Returns false if the attempt to send data via shared memory times out, true
1143
 * otherwise.
1144
 */
1145
bool
1146
pa_send_data(ParallelApplyWorkerInfo *winfo, Size nbytes, const void *data)
1147
0
{
1148
0
  int     rc;
1149
0
  shm_mq_result result;
1150
0
  TimestampTz startTime = 0;
1151
1152
0
  Assert(!IsTransactionState());
1153
0
  Assert(!winfo->serialize_changes);
1154
1155
  /*
1156
   * We don't try to send data to parallel worker for 'immediate' mode. This
1157
   * is primarily used for testing purposes.
1158
   */
1159
0
  if (unlikely(debug_logical_replication_streaming == DEBUG_LOGICAL_REP_STREAMING_IMMEDIATE))
1160
0
    return false;
1161
1162
/*
1163
 * This timeout is a bit arbitrary but testing revealed that it is sufficient
1164
 * to send the message unless the parallel apply worker is waiting on some
1165
 * lock or there is a serious resource crunch. See the comments atop this file
1166
 * to know why we are using a non-blocking way to send the message.
1167
 */
1168
0
#define SHM_SEND_RETRY_INTERVAL_MS 1000
1169
0
#define SHM_SEND_TIMEOUT_MS   (10000 - SHM_SEND_RETRY_INTERVAL_MS)
1170
1171
0
  for (;;)
1172
0
  {
1173
0
    result = shm_mq_send(winfo->mq_handle, nbytes, data, true, true);
1174
1175
0
    if (result == SHM_MQ_SUCCESS)
1176
0
      return true;
1177
0
    else if (result == SHM_MQ_DETACHED)
1178
0
      ereport(ERROR,
1179
0
          (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1180
0
           errmsg("could not send data to shared-memory queue")));
1181
1182
0
    Assert(result == SHM_MQ_WOULD_BLOCK);
1183
1184
    /* Wait before retrying. */
1185
0
    rc = WaitLatch(MyLatch,
1186
0
             WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1187
0
             SHM_SEND_RETRY_INTERVAL_MS,
1188
0
             WAIT_EVENT_LOGICAL_APPLY_SEND_DATA);
1189
1190
0
    if (rc & WL_LATCH_SET)
1191
0
    {
1192
0
      ResetLatch(MyLatch);
1193
0
      CHECK_FOR_INTERRUPTS();
1194
0
    }
1195
1196
0
    if (startTime == 0)
1197
0
      startTime = GetCurrentTimestamp();
1198
0
    else if (TimestampDifferenceExceeds(startTime, GetCurrentTimestamp(),
1199
0
                      SHM_SEND_TIMEOUT_MS))
1200
0
      return false;
1201
0
  }
1202
0
}
1203
1204
/*
1205
 * Switch to PARTIAL_SERIALIZE mode for the current transaction -- this means
1206
 * that the current data and any subsequent data for this transaction will be
1207
 * serialized to a file. This is done to prevent possible deadlocks with
1208
 * another parallel apply worker (refer to the comments atop this file).
1209
 */
1210
void
1211
pa_switch_to_partial_serialize(ParallelApplyWorkerInfo *winfo,
1212
                 bool stream_locked)
1213
0
{
1214
0
  ereport(LOG,
1215
0
      (errmsg("logical replication apply worker will serialize the remaining changes of remote transaction %u to a file",
1216
0
          winfo->shared->xid)));
1217
1218
  /*
1219
   * The parallel apply worker could be stuck for some reason (say waiting
1220
   * on some lock by other backend), so stop trying to send data directly to
1221
   * it and start serializing data to the file instead.
1222
   */
1223
0
  winfo->serialize_changes = true;
1224
1225
  /* Initialize the stream fileset. */
1226
0
  stream_start_internal(winfo->shared->xid, true);
1227
1228
  /*
1229
   * Acquires the stream lock if not already to make sure that the parallel
1230
   * apply worker will wait for the leader to release the stream lock until
1231
   * the end of the transaction.
1232
   */
1233
0
  if (!stream_locked)
1234
0
    pa_lock_stream(winfo->shared->xid, AccessExclusiveLock);
1235
1236
0
  pa_set_fileset_state(winfo->shared, FS_SERIALIZE_IN_PROGRESS);
1237
0
}
1238
1239
/*
1240
 * Wait until the parallel apply worker's transaction state has reached or
1241
 * exceeded the given xact_state.
1242
 */
1243
static void
1244
pa_wait_for_xact_state(ParallelApplyWorkerInfo *winfo,
1245
             ParallelTransState xact_state)
1246
0
{
1247
0
  for (;;)
1248
0
  {
1249
    /*
1250
     * Stop if the transaction state has reached or exceeded the given
1251
     * xact_state.
1252
     */
1253
0
    if (pa_get_xact_state(winfo->shared) >= xact_state)
1254
0
      break;
1255
1256
    /* Wait to be signalled. */
1257
0
    (void) WaitLatch(MyLatch,
1258
0
             WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1259
0
             10L,
1260
0
             WAIT_EVENT_LOGICAL_PARALLEL_APPLY_STATE_CHANGE);
1261
1262
    /* Reset the latch so we don't spin. */
1263
0
    ResetLatch(MyLatch);
1264
1265
    /* An interrupt may have occurred while we were waiting. */
1266
0
    CHECK_FOR_INTERRUPTS();
1267
0
  }
1268
0
}
1269
1270
/*
1271
 * Wait until the parallel apply worker's transaction finishes.
1272
 */
1273
static void
1274
pa_wait_for_xact_finish(ParallelApplyWorkerInfo *winfo)
1275
0
{
1276
  /*
1277
   * Wait until the parallel apply worker set the state to
1278
   * PARALLEL_TRANS_STARTED which means it has acquired the transaction
1279
   * lock. This is to prevent leader apply worker from acquiring the
1280
   * transaction lock earlier than the parallel apply worker.
1281
   */
1282
0
  pa_wait_for_xact_state(winfo, PARALLEL_TRANS_STARTED);
1283
1284
  /*
1285
   * Wait for the transaction lock to be released. This is required to
1286
   * detect deadlock among leader and parallel apply workers. Refer to the
1287
   * comments atop this file.
1288
   */
1289
0
  pa_lock_transaction(winfo->shared->xid, AccessShareLock);
1290
0
  pa_unlock_transaction(winfo->shared->xid, AccessShareLock);
1291
1292
  /*
1293
   * Check if the state becomes PARALLEL_TRANS_FINISHED in case the parallel
1294
   * apply worker failed while applying changes causing the lock to be
1295
   * released.
1296
   */
1297
0
  if (pa_get_xact_state(winfo->shared) != PARALLEL_TRANS_FINISHED)
1298
0
    ereport(ERROR,
1299
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1300
0
         errmsg("lost connection to the logical replication parallel apply worker")));
1301
0
}
1302
1303
/*
1304
 * Set the transaction state for a given parallel apply worker.
1305
 */
1306
void
1307
pa_set_xact_state(ParallelApplyWorkerShared *wshared,
1308
          ParallelTransState xact_state)
1309
0
{
1310
0
  SpinLockAcquire(&wshared->mutex);
1311
0
  wshared->xact_state = xact_state;
1312
0
  SpinLockRelease(&wshared->mutex);
1313
0
}
1314
1315
/*
1316
 * Get the transaction state for a given parallel apply worker.
1317
 */
1318
static ParallelTransState
1319
pa_get_xact_state(ParallelApplyWorkerShared *wshared)
1320
0
{
1321
0
  ParallelTransState xact_state;
1322
1323
0
  SpinLockAcquire(&wshared->mutex);
1324
0
  xact_state = wshared->xact_state;
1325
0
  SpinLockRelease(&wshared->mutex);
1326
1327
0
  return xact_state;
1328
0
}
1329
1330
/*
1331
 * Cache the parallel apply worker information.
1332
 */
1333
void
1334
pa_set_stream_apply_worker(ParallelApplyWorkerInfo *winfo)
1335
0
{
1336
0
  stream_apply_worker = winfo;
1337
0
}
1338
1339
/*
1340
 * Form a unique savepoint name for the streaming transaction.
1341
 *
1342
 * Note that different subscriptions for publications on different nodes can
1343
 * receive same remote xid, so we need to use subscription id along with it.
1344
 *
1345
 * Returns the name in the supplied buffer.
1346
 */
1347
static void
1348
pa_savepoint_name(Oid suboid, TransactionId xid, char *spname, Size szsp)
1349
0
{
1350
0
  snprintf(spname, szsp, "pg_sp_%u_%u", suboid, xid);
1351
0
}
1352
1353
/*
1354
 * Define a savepoint for a subxact in parallel apply worker if needed.
1355
 *
1356
 * The parallel apply worker can figure out if a new subtransaction was
1357
 * started by checking if the new change arrived with a different xid. In that
1358
 * case define a named savepoint, so that we are able to rollback to it
1359
 * if required.
1360
 */
1361
void
1362
pa_start_subtrans(TransactionId current_xid, TransactionId top_xid)
1363
0
{
1364
0
  if (current_xid != top_xid &&
1365
0
    !list_member_xid(subxactlist, current_xid))
1366
0
  {
1367
0
    MemoryContext oldctx;
1368
0
    char    spname[NAMEDATALEN];
1369
1370
0
    pa_savepoint_name(MySubscription->oid, current_xid,
1371
0
              spname, sizeof(spname));
1372
1373
0
    elog(DEBUG1, "defining savepoint %s in logical replication parallel apply worker", spname);
1374
1375
    /* We must be in transaction block to define the SAVEPOINT. */
1376
0
    if (!IsTransactionBlock())
1377
0
    {
1378
0
      if (!IsTransactionState())
1379
0
        StartTransactionCommand();
1380
1381
0
      BeginTransactionBlock();
1382
0
      CommitTransactionCommand();
1383
0
    }
1384
1385
0
    DefineSavepoint(spname);
1386
1387
    /*
1388
     * CommitTransactionCommand is needed to start a subtransaction after
1389
     * issuing a SAVEPOINT inside a transaction block (see
1390
     * StartSubTransaction()).
1391
     */
1392
0
    CommitTransactionCommand();
1393
1394
0
    oldctx = MemoryContextSwitchTo(TopTransactionContext);
1395
0
    subxactlist = lappend_xid(subxactlist, current_xid);
1396
0
    MemoryContextSwitchTo(oldctx);
1397
0
  }
1398
0
}
1399
1400
/* Reset the list that maintains subtransactions. */
1401
void
1402
pa_reset_subtrans(void)
1403
0
{
1404
  /*
1405
   * We don't need to free this explicitly as the allocated memory will be
1406
   * freed at the transaction end.
1407
   */
1408
0
  subxactlist = NIL;
1409
0
}
1410
1411
/*
1412
 * Handle STREAM ABORT message when the transaction was applied in a parallel
1413
 * apply worker.
1414
 */
1415
void
1416
pa_stream_abort(LogicalRepStreamAbortData *abort_data)
1417
0
{
1418
0
  TransactionId xid = abort_data->xid;
1419
0
  TransactionId subxid = abort_data->subxid;
1420
1421
  /*
1422
   * Update origin state so we can restart streaming from correct position
1423
   * in case of crash.
1424
   */
1425
0
  replorigin_session_origin_lsn = abort_data->abort_lsn;
1426
0
  replorigin_session_origin_timestamp = abort_data->abort_time;
1427
1428
  /*
1429
   * If the two XIDs are the same, it's in fact abort of toplevel xact, so
1430
   * just free the subxactlist.
1431
   */
1432
0
  if (subxid == xid)
1433
0
  {
1434
0
    pa_set_xact_state(MyParallelShared, PARALLEL_TRANS_FINISHED);
1435
1436
    /*
1437
     * Release the lock as we might be processing an empty streaming
1438
     * transaction in which case the lock won't be released during
1439
     * transaction rollback.
1440
     *
1441
     * Note that it's ok to release the transaction lock before aborting
1442
     * the transaction because even if the parallel apply worker dies due
1443
     * to crash or some other reason, such a transaction would still be
1444
     * considered aborted.
1445
     */
1446
0
    pa_unlock_transaction(xid, AccessExclusiveLock);
1447
1448
0
    AbortCurrentTransaction();
1449
1450
0
    if (IsTransactionBlock())
1451
0
    {
1452
0
      EndTransactionBlock(false);
1453
0
      CommitTransactionCommand();
1454
0
    }
1455
1456
0
    pa_reset_subtrans();
1457
1458
0
    pgstat_report_activity(STATE_IDLE, NULL);
1459
0
  }
1460
0
  else
1461
0
  {
1462
    /* OK, so it's a subxact. Rollback to the savepoint. */
1463
0
    int     i;
1464
0
    char    spname[NAMEDATALEN];
1465
1466
0
    pa_savepoint_name(MySubscription->oid, subxid, spname, sizeof(spname));
1467
1468
0
    elog(DEBUG1, "rolling back to savepoint %s in logical replication parallel apply worker", spname);
1469
1470
    /*
1471
     * Search the subxactlist, determine the offset tracked for the
1472
     * subxact, and truncate the list.
1473
     *
1474
     * Note that for an empty sub-transaction we won't find the subxid
1475
     * here.
1476
     */
1477
0
    for (i = list_length(subxactlist) - 1; i >= 0; i--)
1478
0
    {
1479
0
      TransactionId xid_tmp = lfirst_xid(list_nth_cell(subxactlist, i));
1480
1481
0
      if (xid_tmp == subxid)
1482
0
      {
1483
0
        RollbackToSavepoint(spname);
1484
0
        CommitTransactionCommand();
1485
0
        subxactlist = list_truncate(subxactlist, i);
1486
0
        break;
1487
0
      }
1488
0
    }
1489
0
  }
1490
0
}
1491
1492
/*
1493
 * Set the fileset state for a particular parallel apply worker. The fileset
1494
 * will be set once the leader worker serialized all changes to the file
1495
 * so that it can be used by parallel apply worker.
1496
 */
1497
void
1498
pa_set_fileset_state(ParallelApplyWorkerShared *wshared,
1499
           PartialFileSetState fileset_state)
1500
0
{
1501
0
  SpinLockAcquire(&wshared->mutex);
1502
0
  wshared->fileset_state = fileset_state;
1503
1504
0
  if (fileset_state == FS_SERIALIZE_DONE)
1505
0
  {
1506
0
    Assert(am_leader_apply_worker());
1507
0
    Assert(MyLogicalRepWorker->stream_fileset);
1508
0
    wshared->fileset = *MyLogicalRepWorker->stream_fileset;
1509
0
  }
1510
1511
0
  SpinLockRelease(&wshared->mutex);
1512
0
}
1513
1514
/*
1515
 * Get the fileset state for the current parallel apply worker.
1516
 */
1517
static PartialFileSetState
1518
pa_get_fileset_state(void)
1519
0
{
1520
0
  PartialFileSetState fileset_state;
1521
1522
0
  Assert(am_parallel_apply_worker());
1523
1524
0
  SpinLockAcquire(&MyParallelShared->mutex);
1525
0
  fileset_state = MyParallelShared->fileset_state;
1526
0
  SpinLockRelease(&MyParallelShared->mutex);
1527
1528
0
  return fileset_state;
1529
0
}
1530
1531
/*
1532
 * Helper functions to acquire and release a lock for each stream block.
1533
 *
1534
 * Set locktag_field4 to PARALLEL_APPLY_LOCK_STREAM to indicate that it's a
1535
 * stream lock.
1536
 *
1537
 * Refer to the comments atop this file to see how the stream lock is used.
1538
 */
1539
void
1540
pa_lock_stream(TransactionId xid, LOCKMODE lockmode)
1541
0
{
1542
0
  LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1543
0
                   PARALLEL_APPLY_LOCK_STREAM, lockmode);
1544
0
}
1545
1546
void
1547
pa_unlock_stream(TransactionId xid, LOCKMODE lockmode)
1548
0
{
1549
0
  UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1550
0
                   PARALLEL_APPLY_LOCK_STREAM, lockmode);
1551
0
}
1552
1553
/*
1554
 * Helper functions to acquire and release a lock for each local transaction
1555
 * apply.
1556
 *
1557
 * Set locktag_field4 to PARALLEL_APPLY_LOCK_XACT to indicate that it's a
1558
 * transaction lock.
1559
 *
1560
 * Note that all the callers must pass a remote transaction ID instead of a
1561
 * local transaction ID as xid. This is because the local transaction ID will
1562
 * only be assigned while applying the first change in the parallel apply but
1563
 * it's possible that the first change in the parallel apply worker is blocked
1564
 * by a concurrently executing transaction in another parallel apply worker. We
1565
 * can only communicate the local transaction id to the leader after applying
1566
 * the first change so it won't be able to wait after sending the xact finish
1567
 * command using this lock.
1568
 *
1569
 * Refer to the comments atop this file to see how the transaction lock is
1570
 * used.
1571
 */
1572
void
1573
pa_lock_transaction(TransactionId xid, LOCKMODE lockmode)
1574
0
{
1575
0
  LockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1576
0
                   PARALLEL_APPLY_LOCK_XACT, lockmode);
1577
0
}
1578
1579
void
1580
pa_unlock_transaction(TransactionId xid, LOCKMODE lockmode)
1581
0
{
1582
0
  UnlockApplyTransactionForSession(MyLogicalRepWorker->subid, xid,
1583
0
                   PARALLEL_APPLY_LOCK_XACT, lockmode);
1584
0
}
1585
1586
/*
1587
 * Decrement the number of pending streaming blocks and wait on the stream lock
1588
 * if there is no pending block available.
1589
 */
1590
void
1591
pa_decr_and_wait_stream_block(void)
1592
0
{
1593
0
  Assert(am_parallel_apply_worker());
1594
1595
  /*
1596
   * It is only possible to not have any pending stream chunks when we are
1597
   * applying spooled messages.
1598
   */
1599
0
  if (pg_atomic_read_u32(&MyParallelShared->pending_stream_count) == 0)
1600
0
  {
1601
0
    if (pa_has_spooled_message_pending())
1602
0
      return;
1603
1604
0
    elog(ERROR, "invalid pending streaming chunk 0");
1605
0
  }
1606
1607
0
  if (pg_atomic_sub_fetch_u32(&MyParallelShared->pending_stream_count, 1) == 0)
1608
0
  {
1609
0
    pa_lock_stream(MyParallelShared->xid, AccessShareLock);
1610
0
    pa_unlock_stream(MyParallelShared->xid, AccessShareLock);
1611
0
  }
1612
0
}
1613
1614
/*
1615
 * Finish processing the streaming transaction in the leader apply worker.
1616
 */
1617
void
1618
pa_xact_finish(ParallelApplyWorkerInfo *winfo, XLogRecPtr remote_lsn)
1619
0
{
1620
0
  Assert(am_leader_apply_worker());
1621
1622
  /*
1623
   * Unlock the shared object lock so that parallel apply worker can
1624
   * continue to receive and apply changes.
1625
   */
1626
0
  pa_unlock_stream(winfo->shared->xid, AccessExclusiveLock);
1627
1628
  /*
1629
   * Wait for that worker to finish. This is necessary to maintain commit
1630
   * order which avoids failures due to transaction dependencies and
1631
   * deadlocks.
1632
   */
1633
0
  pa_wait_for_xact_finish(winfo);
1634
1635
0
  if (!XLogRecPtrIsInvalid(remote_lsn))
1636
0
    store_flush_position(remote_lsn, winfo->shared->last_commit_end);
1637
1638
0
  pa_free_worker(winfo);
1639
0
}