Coverage Report

Created: 2025-08-12 06:43

/src/postgres/src/backend/access/transam/parallel.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * parallel.c
4
 *    Infrastructure for launching parallel workers
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * IDENTIFICATION
10
 *    src/backend/access/transam/parallel.c
11
 *
12
 *-------------------------------------------------------------------------
13
 */
14
15
#include "postgres.h"
16
17
#include "access/brin.h"
18
#include "access/gin.h"
19
#include "access/nbtree.h"
20
#include "access/parallel.h"
21
#include "access/session.h"
22
#include "access/xact.h"
23
#include "access/xlog.h"
24
#include "catalog/index.h"
25
#include "catalog/namespace.h"
26
#include "catalog/pg_enum.h"
27
#include "catalog/storage.h"
28
#include "commands/async.h"
29
#include "commands/vacuum.h"
30
#include "executor/execParallel.h"
31
#include "libpq/libpq.h"
32
#include "libpq/pqformat.h"
33
#include "libpq/pqmq.h"
34
#include "miscadmin.h"
35
#include "optimizer/optimizer.h"
36
#include "pgstat.h"
37
#include "storage/ipc.h"
38
#include "storage/predicate.h"
39
#include "storage/spin.h"
40
#include "tcop/tcopprot.h"
41
#include "utils/combocid.h"
42
#include "utils/guc.h"
43
#include "utils/inval.h"
44
#include "utils/memutils.h"
45
#include "utils/relmapper.h"
46
#include "utils/snapmgr.h"
47
48
/*
49
 * We don't want to waste a lot of memory on an error queue which, most of
50
 * the time, will process only a handful of small messages.  However, it is
51
 * desirable to make it large enough that a typical ErrorResponse can be sent
52
 * without blocking.  That way, a worker that errors out can write the whole
53
 * message into the queue and terminate without waiting for the user backend.
54
 */
55
0
#define PARALLEL_ERROR_QUEUE_SIZE     16384
56
57
/* Magic number for parallel context TOC. */
58
0
#define PARALLEL_MAGIC            0x50477c7c
59
60
/*
61
 * Magic numbers for per-context parallel state sharing.  Higher-level code
62
 * should use smaller values, leaving these very large ones for use by this
63
 * module.
64
 */
65
0
#define PARALLEL_KEY_FIXED          UINT64CONST(0xFFFFFFFFFFFF0001)
66
0
#define PARALLEL_KEY_ERROR_QUEUE      UINT64CONST(0xFFFFFFFFFFFF0002)
67
0
#define PARALLEL_KEY_LIBRARY        UINT64CONST(0xFFFFFFFFFFFF0003)
68
0
#define PARALLEL_KEY_GUC          UINT64CONST(0xFFFFFFFFFFFF0004)
69
0
#define PARALLEL_KEY_COMBO_CID        UINT64CONST(0xFFFFFFFFFFFF0005)
70
0
#define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006)
71
0
#define PARALLEL_KEY_ACTIVE_SNAPSHOT    UINT64CONST(0xFFFFFFFFFFFF0007)
72
0
#define PARALLEL_KEY_TRANSACTION_STATE    UINT64CONST(0xFFFFFFFFFFFF0008)
73
0
#define PARALLEL_KEY_ENTRYPOINT       UINT64CONST(0xFFFFFFFFFFFF0009)
74
0
#define PARALLEL_KEY_SESSION_DSM      UINT64CONST(0xFFFFFFFFFFFF000A)
75
0
#define PARALLEL_KEY_PENDING_SYNCS      UINT64CONST(0xFFFFFFFFFFFF000B)
76
0
#define PARALLEL_KEY_REINDEX_STATE      UINT64CONST(0xFFFFFFFFFFFF000C)
77
0
#define PARALLEL_KEY_RELMAPPER_STATE    UINT64CONST(0xFFFFFFFFFFFF000D)
78
0
#define PARALLEL_KEY_UNCOMMITTEDENUMS   UINT64CONST(0xFFFFFFFFFFFF000E)
79
0
#define PARALLEL_KEY_CLIENTCONNINFO     UINT64CONST(0xFFFFFFFFFFFF000F)
80
81
/* Fixed-size parallel state. */
82
typedef struct FixedParallelState
83
{
84
  /* Fixed-size state that workers must restore. */
85
  Oid     database_id;
86
  Oid     authenticated_user_id;
87
  Oid     session_user_id;
88
  Oid     outer_user_id;
89
  Oid     current_user_id;
90
  Oid     temp_namespace_id;
91
  Oid     temp_toast_namespace_id;
92
  int     sec_context;
93
  bool    session_user_is_superuser;
94
  bool    role_is_superuser;
95
  PGPROC     *parallel_leader_pgproc;
96
  pid_t   parallel_leader_pid;
97
  ProcNumber  parallel_leader_proc_number;
98
  TimestampTz xact_ts;
99
  TimestampTz stmt_ts;
100
  SerializableXactHandle serializable_xact_handle;
101
102
  /* Mutex protects remaining fields. */
103
  slock_t   mutex;
104
105
  /* Maximum XactLastRecEnd of any worker. */
106
  XLogRecPtr  last_xlog_end;
107
} FixedParallelState;
108
109
/*
110
 * Our parallel worker number.  We initialize this to -1, meaning that we are
111
 * not a parallel worker.  In parallel workers, it will be set to a value >= 0
112
 * and < the number of workers before any user code is invoked; each parallel
113
 * worker will get a different parallel worker number.
114
 */
115
int     ParallelWorkerNumber = -1;
116
117
/* Is there a parallel message pending which we need to receive? */
118
volatile sig_atomic_t ParallelMessagePending = false;
119
120
/* Are we initializing a parallel worker? */
121
bool    InitializingParallelWorker = false;
122
123
/* Pointer to our fixed parallel state. */
124
static FixedParallelState *MyFixedParallelState;
125
126
/* List of active parallel contexts. */
127
static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
128
129
/* Backend-local copy of data from FixedParallelState. */
130
static pid_t ParallelLeaderPid;
131
132
/*
133
 * List of internal parallel worker entry points.  We need this for
134
 * reasons explained in LookupParallelWorkerFunction(), below.
135
 */
136
static const struct
137
{
138
  const char *fn_name;
139
  parallel_worker_main_type fn_addr;
140
}     InternalParallelWorkers[] =
141
142
{
143
  {
144
    "ParallelQueryMain", ParallelQueryMain
145
  },
146
  {
147
    "_bt_parallel_build_main", _bt_parallel_build_main
148
  },
149
  {
150
    "_brin_parallel_build_main", _brin_parallel_build_main
151
  },
152
  {
153
    "_gin_parallel_build_main", _gin_parallel_build_main
154
  },
155
  {
156
    "parallel_vacuum_main", parallel_vacuum_main
157
  }
158
};
159
160
/* Private functions. */
161
static void ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
162
static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
163
static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
164
static void ParallelWorkerShutdown(int code, Datum arg);
165
166
167
/*
168
 * Establish a new parallel context.  This should be done after entering
169
 * parallel mode, and (unless there is an error) the context should be
170
 * destroyed before exiting the current subtransaction.
171
 */
172
ParallelContext *
173
CreateParallelContext(const char *library_name, const char *function_name,
174
            int nworkers)
175
0
{
176
0
  MemoryContext oldcontext;
177
0
  ParallelContext *pcxt;
178
179
  /* It is unsafe to create a parallel context if not in parallel mode. */
180
0
  Assert(IsInParallelMode());
181
182
  /* Number of workers should be non-negative. */
183
0
  Assert(nworkers >= 0);
184
185
  /* We might be running in a short-lived memory context. */
186
0
  oldcontext = MemoryContextSwitchTo(TopTransactionContext);
187
188
  /* Initialize a new ParallelContext. */
189
0
  pcxt = palloc0(sizeof(ParallelContext));
190
0
  pcxt->subid = GetCurrentSubTransactionId();
191
0
  pcxt->nworkers = nworkers;
192
0
  pcxt->nworkers_to_launch = nworkers;
193
0
  pcxt->library_name = pstrdup(library_name);
194
0
  pcxt->function_name = pstrdup(function_name);
195
0
  pcxt->error_context_stack = error_context_stack;
196
0
  shm_toc_initialize_estimator(&pcxt->estimator);
197
0
  dlist_push_head(&pcxt_list, &pcxt->node);
198
199
  /* Restore previous memory context. */
200
0
  MemoryContextSwitchTo(oldcontext);
201
202
0
  return pcxt;
203
0
}
204
205
/*
206
 * Establish the dynamic shared memory segment for a parallel context and
207
 * copy state and other bookkeeping information that will be needed by
208
 * parallel workers into it.
209
 */
210
void
211
InitializeParallelDSM(ParallelContext *pcxt)
212
0
{
213
0
  MemoryContext oldcontext;
214
0
  Size    library_len = 0;
215
0
  Size    guc_len = 0;
216
0
  Size    combocidlen = 0;
217
0
  Size    tsnaplen = 0;
218
0
  Size    asnaplen = 0;
219
0
  Size    tstatelen = 0;
220
0
  Size    pendingsyncslen = 0;
221
0
  Size    reindexlen = 0;
222
0
  Size    relmapperlen = 0;
223
0
  Size    uncommittedenumslen = 0;
224
0
  Size    clientconninfolen = 0;
225
0
  Size    segsize = 0;
226
0
  int     i;
227
0
  FixedParallelState *fps;
228
0
  dsm_handle  session_dsm_handle = DSM_HANDLE_INVALID;
229
0
  Snapshot  transaction_snapshot = GetTransactionSnapshot();
230
0
  Snapshot  active_snapshot = GetActiveSnapshot();
231
232
  /* We might be running in a very short-lived memory context. */
233
0
  oldcontext = MemoryContextSwitchTo(TopTransactionContext);
234
235
  /* Allow space to store the fixed-size parallel state. */
236
0
  shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
237
0
  shm_toc_estimate_keys(&pcxt->estimator, 1);
238
239
  /*
240
   * If we manage to reach here while non-interruptible, it's unsafe to
241
   * launch any workers: we would fail to process interrupts sent by them.
242
   * We can deal with that edge case by pretending no workers were
243
   * requested.
244
   */
245
0
  if (!INTERRUPTS_CAN_BE_PROCESSED())
246
0
    pcxt->nworkers = 0;
247
248
  /*
249
   * Normally, the user will have requested at least one worker process, but
250
   * if by chance they have not, we can skip a bunch of things here.
251
   */
252
0
  if (pcxt->nworkers > 0)
253
0
  {
254
    /* Get (or create) the per-session DSM segment's handle. */
255
0
    session_dsm_handle = GetSessionDsmHandle();
256
257
    /*
258
     * If we weren't able to create a per-session DSM segment, then we can
259
     * continue but we can't safely launch any workers because their
260
     * record typmods would be incompatible so they couldn't exchange
261
     * tuples.
262
     */
263
0
    if (session_dsm_handle == DSM_HANDLE_INVALID)
264
0
      pcxt->nworkers = 0;
265
0
  }
266
267
0
  if (pcxt->nworkers > 0)
268
0
  {
269
    /* Estimate space for various kinds of state sharing. */
270
0
    library_len = EstimateLibraryStateSpace();
271
0
    shm_toc_estimate_chunk(&pcxt->estimator, library_len);
272
0
    guc_len = EstimateGUCStateSpace();
273
0
    shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
274
0
    combocidlen = EstimateComboCIDStateSpace();
275
0
    shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
276
0
    if (IsolationUsesXactSnapshot())
277
0
    {
278
0
      tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
279
0
      shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
280
0
    }
281
0
    asnaplen = EstimateSnapshotSpace(active_snapshot);
282
0
    shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
283
0
    tstatelen = EstimateTransactionStateSpace();
284
0
    shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
285
0
    shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
286
0
    pendingsyncslen = EstimatePendingSyncsSpace();
287
0
    shm_toc_estimate_chunk(&pcxt->estimator, pendingsyncslen);
288
0
    reindexlen = EstimateReindexStateSpace();
289
0
    shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
290
0
    relmapperlen = EstimateRelationMapSpace();
291
0
    shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen);
292
0
    uncommittedenumslen = EstimateUncommittedEnumsSpace();
293
0
    shm_toc_estimate_chunk(&pcxt->estimator, uncommittedenumslen);
294
0
    clientconninfolen = EstimateClientConnectionInfoSpace();
295
0
    shm_toc_estimate_chunk(&pcxt->estimator, clientconninfolen);
296
    /* If you add more chunks here, you probably need to add keys. */
297
0
    shm_toc_estimate_keys(&pcxt->estimator, 12);
298
299
    /* Estimate space need for error queues. */
300
0
    StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
301
0
             PARALLEL_ERROR_QUEUE_SIZE,
302
0
             "parallel error queue size not buffer-aligned");
303
0
    shm_toc_estimate_chunk(&pcxt->estimator,
304
0
                 mul_size(PARALLEL_ERROR_QUEUE_SIZE,
305
0
                    pcxt->nworkers));
306
0
    shm_toc_estimate_keys(&pcxt->estimator, 1);
307
308
    /* Estimate how much we'll need for the entrypoint info. */
309
0
    shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
310
0
                 strlen(pcxt->function_name) + 2);
311
0
    shm_toc_estimate_keys(&pcxt->estimator, 1);
312
0
  }
313
314
  /*
315
   * Create DSM and initialize with new table of contents.  But if the user
316
   * didn't request any workers, then don't bother creating a dynamic shared
317
   * memory segment; instead, just use backend-private memory.
318
   *
319
   * Also, if we can't create a dynamic shared memory segment because the
320
   * maximum number of segments have already been created, then fall back to
321
   * backend-private memory, and plan not to use any workers.  We hope this
322
   * won't happen very often, but it's better to abandon the use of
323
   * parallelism than to fail outright.
324
   */
325
0
  segsize = shm_toc_estimate(&pcxt->estimator);
326
0
  if (pcxt->nworkers > 0)
327
0
    pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
328
0
  if (pcxt->seg != NULL)
329
0
    pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
330
0
                   dsm_segment_address(pcxt->seg),
331
0
                   segsize);
332
0
  else
333
0
  {
334
0
    pcxt->nworkers = 0;
335
0
    pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
336
0
    pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
337
0
                   segsize);
338
0
  }
339
340
  /* Initialize fixed-size state in shared memory. */
341
0
  fps = (FixedParallelState *)
342
0
    shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
343
0
  fps->database_id = MyDatabaseId;
344
0
  fps->authenticated_user_id = GetAuthenticatedUserId();
345
0
  fps->session_user_id = GetSessionUserId();
346
0
  fps->outer_user_id = GetCurrentRoleId();
347
0
  GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
348
0
  fps->session_user_is_superuser = GetSessionUserIsSuperuser();
349
0
  fps->role_is_superuser = current_role_is_superuser;
350
0
  GetTempNamespaceState(&fps->temp_namespace_id,
351
0
              &fps->temp_toast_namespace_id);
352
0
  fps->parallel_leader_pgproc = MyProc;
353
0
  fps->parallel_leader_pid = MyProcPid;
354
0
  fps->parallel_leader_proc_number = MyProcNumber;
355
0
  fps->xact_ts = GetCurrentTransactionStartTimestamp();
356
0
  fps->stmt_ts = GetCurrentStatementStartTimestamp();
357
0
  fps->serializable_xact_handle = ShareSerializableXact();
358
0
  SpinLockInit(&fps->mutex);
359
0
  fps->last_xlog_end = 0;
360
0
  shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
361
362
  /* We can skip the rest of this if we're not budgeting for any workers. */
363
0
  if (pcxt->nworkers > 0)
364
0
  {
365
0
    char     *libraryspace;
366
0
    char     *gucspace;
367
0
    char     *combocidspace;
368
0
    char     *tsnapspace;
369
0
    char     *asnapspace;
370
0
    char     *tstatespace;
371
0
    char     *pendingsyncsspace;
372
0
    char     *reindexspace;
373
0
    char     *relmapperspace;
374
0
    char     *error_queue_space;
375
0
    char     *session_dsm_handle_space;
376
0
    char     *entrypointstate;
377
0
    char     *uncommittedenumsspace;
378
0
    char     *clientconninfospace;
379
0
    Size    lnamelen;
380
381
    /* Serialize shared libraries we have loaded. */
382
0
    libraryspace = shm_toc_allocate(pcxt->toc, library_len);
383
0
    SerializeLibraryState(library_len, libraryspace);
384
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
385
386
    /* Serialize GUC settings. */
387
0
    gucspace = shm_toc_allocate(pcxt->toc, guc_len);
388
0
    SerializeGUCState(guc_len, gucspace);
389
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
390
391
    /* Serialize combo CID state. */
392
0
    combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
393
0
    SerializeComboCIDState(combocidlen, combocidspace);
394
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
395
396
    /*
397
     * Serialize the transaction snapshot if the transaction isolation
398
     * level uses a transaction snapshot.
399
     */
400
0
    if (IsolationUsesXactSnapshot())
401
0
    {
402
0
      tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
403
0
      SerializeSnapshot(transaction_snapshot, tsnapspace);
404
0
      shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
405
0
               tsnapspace);
406
0
    }
407
408
    /* Serialize the active snapshot. */
409
0
    asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
410
0
    SerializeSnapshot(active_snapshot, asnapspace);
411
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
412
413
    /* Provide the handle for per-session segment. */
414
0
    session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
415
0
                          sizeof(dsm_handle));
416
0
    *(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
417
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
418
0
             session_dsm_handle_space);
419
420
    /* Serialize transaction state. */
421
0
    tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
422
0
    SerializeTransactionState(tstatelen, tstatespace);
423
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
424
425
    /* Serialize pending syncs. */
426
0
    pendingsyncsspace = shm_toc_allocate(pcxt->toc, pendingsyncslen);
427
0
    SerializePendingSyncs(pendingsyncslen, pendingsyncsspace);
428
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_PENDING_SYNCS,
429
0
             pendingsyncsspace);
430
431
    /* Serialize reindex state. */
432
0
    reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
433
0
    SerializeReindexState(reindexlen, reindexspace);
434
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
435
436
    /* Serialize relmapper state. */
437
0
    relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen);
438
0
    SerializeRelationMap(relmapperlen, relmapperspace);
439
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE,
440
0
             relmapperspace);
441
442
    /* Serialize uncommitted enum state. */
443
0
    uncommittedenumsspace = shm_toc_allocate(pcxt->toc,
444
0
                         uncommittedenumslen);
445
0
    SerializeUncommittedEnums(uncommittedenumsspace, uncommittedenumslen);
446
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
447
0
             uncommittedenumsspace);
448
449
    /* Serialize our ClientConnectionInfo. */
450
0
    clientconninfospace = shm_toc_allocate(pcxt->toc, clientconninfolen);
451
0
    SerializeClientConnectionInfo(clientconninfolen, clientconninfospace);
452
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_CLIENTCONNINFO,
453
0
             clientconninfospace);
454
455
    /* Allocate space for worker information. */
456
0
    pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
457
458
    /*
459
     * Establish error queues in dynamic shared memory.
460
     *
461
     * These queues should be used only for transmitting ErrorResponse,
462
     * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
463
     * should be transmitted via separate (possibly larger?) queues.
464
     */
465
0
    error_queue_space =
466
0
      shm_toc_allocate(pcxt->toc,
467
0
               mul_size(PARALLEL_ERROR_QUEUE_SIZE,
468
0
                    pcxt->nworkers));
469
0
    for (i = 0; i < pcxt->nworkers; ++i)
470
0
    {
471
0
      char     *start;
472
0
      shm_mq     *mq;
473
474
0
      start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
475
0
      mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
476
0
      shm_mq_set_receiver(mq, MyProc);
477
0
      pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
478
0
    }
479
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
480
481
    /*
482
     * Serialize entrypoint information.  It's unsafe to pass function
483
     * pointers across processes, as the function pointer may be different
484
     * in each process in EXEC_BACKEND builds, so we always pass library
485
     * and function name.  (We use library name "postgres" for functions
486
     * in the core backend.)
487
     */
488
0
    lnamelen = strlen(pcxt->library_name);
489
0
    entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
490
0
                       strlen(pcxt->function_name) + 2);
491
0
    strcpy(entrypointstate, pcxt->library_name);
492
0
    strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
493
0
    shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
494
0
  }
495
496
  /* Update nworkers_to_launch, in case we changed nworkers above. */
497
0
  pcxt->nworkers_to_launch = pcxt->nworkers;
498
499
  /* Restore previous memory context. */
500
0
  MemoryContextSwitchTo(oldcontext);
501
0
}
502
503
/*
504
 * Reinitialize the dynamic shared memory segment for a parallel context such
505
 * that we could launch workers for it again.
506
 */
507
void
508
ReinitializeParallelDSM(ParallelContext *pcxt)
509
0
{
510
0
  FixedParallelState *fps;
511
512
  /* Wait for any old workers to exit. */
513
0
  if (pcxt->nworkers_launched > 0)
514
0
  {
515
0
    WaitForParallelWorkersToFinish(pcxt);
516
0
    WaitForParallelWorkersToExit(pcxt);
517
0
    pcxt->nworkers_launched = 0;
518
0
    if (pcxt->known_attached_workers)
519
0
    {
520
0
      pfree(pcxt->known_attached_workers);
521
0
      pcxt->known_attached_workers = NULL;
522
0
      pcxt->nknown_attached_workers = 0;
523
0
    }
524
0
  }
525
526
  /* Reset a few bits of fixed parallel state to a clean state. */
527
0
  fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
528
0
  fps->last_xlog_end = 0;
529
530
  /* Recreate error queues (if they exist). */
531
0
  if (pcxt->nworkers > 0)
532
0
  {
533
0
    char     *error_queue_space;
534
0
    int     i;
535
536
0
    error_queue_space =
537
0
      shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
538
0
    for (i = 0; i < pcxt->nworkers; ++i)
539
0
    {
540
0
      char     *start;
541
0
      shm_mq     *mq;
542
543
0
      start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
544
0
      mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
545
0
      shm_mq_set_receiver(mq, MyProc);
546
0
      pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
547
0
    }
548
0
  }
549
0
}
550
551
/*
552
 * Reinitialize parallel workers for a parallel context such that we could
553
 * launch a different number of workers.  This is required for cases where
554
 * we need to reuse the same DSM segment, but the number of workers can
555
 * vary from run-to-run.
556
 */
557
void
558
ReinitializeParallelWorkers(ParallelContext *pcxt, int nworkers_to_launch)
559
0
{
560
  /*
561
   * The number of workers that need to be launched must be less than the
562
   * number of workers with which the parallel context is initialized.  But
563
   * the caller might not know that InitializeParallelDSM reduced nworkers,
564
   * so just silently trim the request.
565
   */
566
0
  pcxt->nworkers_to_launch = Min(pcxt->nworkers, nworkers_to_launch);
567
0
}
568
569
/*
570
 * Launch parallel workers.
571
 */
572
void
573
LaunchParallelWorkers(ParallelContext *pcxt)
574
0
{
575
0
  MemoryContext oldcontext;
576
0
  BackgroundWorker worker;
577
0
  int     i;
578
0
  bool    any_registrations_failed = false;
579
580
  /* Skip this if we have no workers. */
581
0
  if (pcxt->nworkers == 0 || pcxt->nworkers_to_launch == 0)
582
0
    return;
583
584
  /* We need to be a lock group leader. */
585
0
  BecomeLockGroupLeader();
586
587
  /* If we do have workers, we'd better have a DSM segment. */
588
0
  Assert(pcxt->seg != NULL);
589
590
  /* We might be running in a short-lived memory context. */
591
0
  oldcontext = MemoryContextSwitchTo(TopTransactionContext);
592
593
  /* Configure a worker. */
594
0
  memset(&worker, 0, sizeof(worker));
595
0
  snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
596
0
       MyProcPid);
597
0
  snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
598
0
  worker.bgw_flags =
599
0
    BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
600
0
    | BGWORKER_CLASS_PARALLEL;
601
0
  worker.bgw_start_time = BgWorkerStart_ConsistentState;
602
0
  worker.bgw_restart_time = BGW_NEVER_RESTART;
603
0
  sprintf(worker.bgw_library_name, "postgres");
604
0
  sprintf(worker.bgw_function_name, "ParallelWorkerMain");
605
0
  worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
606
0
  worker.bgw_notify_pid = MyProcPid;
607
608
  /*
609
   * Start workers.
610
   *
611
   * The caller must be able to tolerate ending up with fewer workers than
612
   * expected, so there is no need to throw an error here if registration
613
   * fails.  It wouldn't help much anyway, because registering the worker in
614
   * no way guarantees that it will start up and initialize successfully.
615
   */
616
0
  for (i = 0; i < pcxt->nworkers_to_launch; ++i)
617
0
  {
618
0
    memcpy(worker.bgw_extra, &i, sizeof(int));
619
0
    if (!any_registrations_failed &&
620
0
      RegisterDynamicBackgroundWorker(&worker,
621
0
                      &pcxt->worker[i].bgwhandle))
622
0
    {
623
0
      shm_mq_set_handle(pcxt->worker[i].error_mqh,
624
0
                pcxt->worker[i].bgwhandle);
625
0
      pcxt->nworkers_launched++;
626
0
    }
627
0
    else
628
0
    {
629
      /*
630
       * If we weren't able to register the worker, then we've bumped up
631
       * against the max_worker_processes limit, and future
632
       * registrations will probably fail too, so arrange to skip them.
633
       * But we still have to execute this code for the remaining slots
634
       * to make sure that we forget about the error queues we budgeted
635
       * for those workers.  Otherwise, we'll wait for them to start,
636
       * but they never will.
637
       */
638
0
      any_registrations_failed = true;
639
0
      pcxt->worker[i].bgwhandle = NULL;
640
0
      shm_mq_detach(pcxt->worker[i].error_mqh);
641
0
      pcxt->worker[i].error_mqh = NULL;
642
0
    }
643
0
  }
644
645
  /*
646
   * Now that nworkers_launched has taken its final value, we can initialize
647
   * known_attached_workers.
648
   */
649
0
  if (pcxt->nworkers_launched > 0)
650
0
  {
651
0
    pcxt->known_attached_workers =
652
0
      palloc0(sizeof(bool) * pcxt->nworkers_launched);
653
0
    pcxt->nknown_attached_workers = 0;
654
0
  }
655
656
  /* Restore previous memory context. */
657
0
  MemoryContextSwitchTo(oldcontext);
658
0
}
659
660
/*
661
 * Wait for all workers to attach to their error queues, and throw an error if
662
 * any worker fails to do this.
663
 *
664
 * Callers can assume that if this function returns successfully, then the
665
 * number of workers given by pcxt->nworkers_launched have initialized and
666
 * attached to their error queues.  Whether or not these workers are guaranteed
667
 * to still be running depends on what code the caller asked them to run;
668
 * this function does not guarantee that they have not exited.  However, it
669
 * does guarantee that any workers which exited must have done so cleanly and
670
 * after successfully performing the work with which they were tasked.
671
 *
672
 * If this function is not called, then some of the workers that were launched
673
 * may not have been started due to a fork() failure, or may have exited during
674
 * early startup prior to attaching to the error queue, so nworkers_launched
675
 * cannot be viewed as completely reliable.  It will never be less than the
676
 * number of workers which actually started, but it might be more.  Any workers
677
 * that failed to start will still be discovered by
678
 * WaitForParallelWorkersToFinish and an error will be thrown at that time,
679
 * provided that function is eventually reached.
680
 *
681
 * In general, the leader process should do as much work as possible before
682
 * calling this function.  fork() failures and other early-startup failures
683
 * are very uncommon, and having the leader sit idle when it could be doing
684
 * useful work is undesirable.  However, if the leader needs to wait for
685
 * all of its workers or for a specific worker, it may want to call this
686
 * function before doing so.  If not, it must make some other provision for
687
 * the failure-to-start case, lest it wait forever.  On the other hand, a
688
 * leader which never waits for a worker that might not be started yet, or
689
 * at least never does so prior to WaitForParallelWorkersToFinish(), need not
690
 * call this function at all.
691
 */
692
void
693
WaitForParallelWorkersToAttach(ParallelContext *pcxt)
694
0
{
695
0
  int     i;
696
697
  /* Skip this if we have no launched workers. */
698
0
  if (pcxt->nworkers_launched == 0)
699
0
    return;
700
701
0
  for (;;)
702
0
  {
703
    /*
704
     * This will process any parallel messages that are pending and it may
705
     * also throw an error propagated from a worker.
706
     */
707
0
    CHECK_FOR_INTERRUPTS();
708
709
0
    for (i = 0; i < pcxt->nworkers_launched; ++i)
710
0
    {
711
0
      BgwHandleStatus status;
712
0
      shm_mq     *mq;
713
0
      int     rc;
714
0
      pid_t   pid;
715
716
0
      if (pcxt->known_attached_workers[i])
717
0
        continue;
718
719
      /*
720
       * If error_mqh is NULL, then the worker has already exited
721
       * cleanly.
722
       */
723
0
      if (pcxt->worker[i].error_mqh == NULL)
724
0
      {
725
0
        pcxt->known_attached_workers[i] = true;
726
0
        ++pcxt->nknown_attached_workers;
727
0
        continue;
728
0
      }
729
730
0
      status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
731
0
      if (status == BGWH_STARTED)
732
0
      {
733
        /* Has the worker attached to the error queue? */
734
0
        mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
735
0
        if (shm_mq_get_sender(mq) != NULL)
736
0
        {
737
          /* Yes, so it is known to be attached. */
738
0
          pcxt->known_attached_workers[i] = true;
739
0
          ++pcxt->nknown_attached_workers;
740
0
        }
741
0
      }
742
0
      else if (status == BGWH_STOPPED)
743
0
      {
744
        /*
745
         * If the worker stopped without attaching to the error queue,
746
         * throw an error.
747
         */
748
0
        mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
749
0
        if (shm_mq_get_sender(mq) == NULL)
750
0
          ereport(ERROR,
751
0
              (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
752
0
               errmsg("parallel worker failed to initialize"),
753
0
               errhint("More details may be available in the server log.")));
754
755
0
        pcxt->known_attached_workers[i] = true;
756
0
        ++pcxt->nknown_attached_workers;
757
0
      }
758
0
      else
759
0
      {
760
        /*
761
         * Worker not yet started, so we must wait.  The postmaster
762
         * will notify us if the worker's state changes.  Our latch
763
         * might also get set for some other reason, but if so we'll
764
         * just end up waiting for the same worker again.
765
         */
766
0
        rc = WaitLatch(MyLatch,
767
0
                 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH,
768
0
                 -1, WAIT_EVENT_BGWORKER_STARTUP);
769
770
0
        if (rc & WL_LATCH_SET)
771
0
          ResetLatch(MyLatch);
772
0
      }
773
0
    }
774
775
    /* If all workers are known to have started, we're done. */
776
0
    if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
777
0
    {
778
0
      Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
779
0
      break;
780
0
    }
781
0
  }
782
0
}
783
784
/*
785
 * Wait for all workers to finish computing.
786
 *
787
 * Even if the parallel operation seems to have completed successfully, it's
788
 * important to call this function afterwards.  We must not miss any errors
789
 * the workers may have thrown during the parallel operation, or any that they
790
 * may yet throw while shutting down.
791
 *
792
 * Also, we want to update our notion of XactLastRecEnd based on worker
793
 * feedback.
794
 */
795
void
796
WaitForParallelWorkersToFinish(ParallelContext *pcxt)
797
0
{
798
0
  for (;;)
799
0
  {
800
0
    bool    anyone_alive = false;
801
0
    int     nfinished = 0;
802
0
    int     i;
803
804
    /*
805
     * This will process any parallel messages that are pending, which may
806
     * change the outcome of the loop that follows.  It may also throw an
807
     * error propagated from a worker.
808
     */
809
0
    CHECK_FOR_INTERRUPTS();
810
811
0
    for (i = 0; i < pcxt->nworkers_launched; ++i)
812
0
    {
813
      /*
814
       * If error_mqh is NULL, then the worker has already exited
815
       * cleanly.  If we have received a message through error_mqh from
816
       * the worker, we know it started up cleanly, and therefore we're
817
       * certain to be notified when it exits.
818
       */
819
0
      if (pcxt->worker[i].error_mqh == NULL)
820
0
        ++nfinished;
821
0
      else if (pcxt->known_attached_workers[i])
822
0
      {
823
0
        anyone_alive = true;
824
0
        break;
825
0
      }
826
0
    }
827
828
0
    if (!anyone_alive)
829
0
    {
830
      /* If all workers are known to have finished, we're done. */
831
0
      if (nfinished >= pcxt->nworkers_launched)
832
0
      {
833
0
        Assert(nfinished == pcxt->nworkers_launched);
834
0
        break;
835
0
      }
836
837
      /*
838
       * We didn't detect any living workers, but not all workers are
839
       * known to have exited cleanly.  Either not all workers have
840
       * launched yet, or maybe some of them failed to start or
841
       * terminated abnormally.
842
       */
843
0
      for (i = 0; i < pcxt->nworkers_launched; ++i)
844
0
      {
845
0
        pid_t   pid;
846
0
        shm_mq     *mq;
847
848
        /*
849
         * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
850
         * should just keep waiting.  If it is BGWH_STOPPED, then
851
         * further investigation is needed.
852
         */
853
0
        if (pcxt->worker[i].error_mqh == NULL ||
854
0
          pcxt->worker[i].bgwhandle == NULL ||
855
0
          GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
856
0
                       &pid) != BGWH_STOPPED)
857
0
          continue;
858
859
        /*
860
         * Check whether the worker ended up stopped without ever
861
         * attaching to the error queue.  If so, the postmaster was
862
         * unable to fork the worker or it exited without initializing
863
         * properly.  We must throw an error, since the caller may
864
         * have been expecting the worker to do some work before
865
         * exiting.
866
         */
867
0
        mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
868
0
        if (shm_mq_get_sender(mq) == NULL)
869
0
          ereport(ERROR,
870
0
              (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
871
0
               errmsg("parallel worker failed to initialize"),
872
0
               errhint("More details may be available in the server log.")));
873
874
        /*
875
         * The worker is stopped, but is attached to the error queue.
876
         * Unless there's a bug somewhere, this will only happen when
877
         * the worker writes messages and terminates after the
878
         * CHECK_FOR_INTERRUPTS() near the top of this function and
879
         * before the call to GetBackgroundWorkerPid().  In that case,
880
         * or latch should have been set as well and the right things
881
         * will happen on the next pass through the loop.
882
         */
883
0
      }
884
0
    }
885
886
0
    (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1,
887
0
             WAIT_EVENT_PARALLEL_FINISH);
888
0
    ResetLatch(MyLatch);
889
0
  }
890
891
0
  if (pcxt->toc != NULL)
892
0
  {
893
0
    FixedParallelState *fps;
894
895
0
    fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
896
0
    if (fps->last_xlog_end > XactLastRecEnd)
897
0
      XactLastRecEnd = fps->last_xlog_end;
898
0
  }
899
0
}
900
901
/*
902
 * Wait for all workers to exit.
903
 *
904
 * This function ensures that workers have been completely shutdown.  The
905
 * difference between WaitForParallelWorkersToFinish and this function is
906
 * that the former just ensures that last message sent by a worker backend is
907
 * received by the leader backend whereas this ensures the complete shutdown.
908
 */
909
static void
910
WaitForParallelWorkersToExit(ParallelContext *pcxt)
911
0
{
912
0
  int     i;
913
914
  /* Wait until the workers actually die. */
915
0
  for (i = 0; i < pcxt->nworkers_launched; ++i)
916
0
  {
917
0
    BgwHandleStatus status;
918
919
0
    if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
920
0
      continue;
921
922
0
    status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
923
924
    /*
925
     * If the postmaster kicked the bucket, we have no chance of cleaning
926
     * up safely -- we won't be able to tell when our workers are actually
927
     * dead.  This doesn't necessitate a PANIC since they will all abort
928
     * eventually, but we can't safely continue this session.
929
     */
930
0
    if (status == BGWH_POSTMASTER_DIED)
931
0
      ereport(FATAL,
932
0
          (errcode(ERRCODE_ADMIN_SHUTDOWN),
933
0
           errmsg("postmaster exited during a parallel transaction")));
934
935
    /* Release memory. */
936
0
    pfree(pcxt->worker[i].bgwhandle);
937
0
    pcxt->worker[i].bgwhandle = NULL;
938
0
  }
939
0
}
940
941
/*
942
 * Destroy a parallel context.
943
 *
944
 * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
945
 * first, before calling this function.  When this function is invoked, any
946
 * remaining workers are forcibly killed; the dynamic shared memory segment
947
 * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
948
 */
949
void
950
DestroyParallelContext(ParallelContext *pcxt)
951
0
{
952
0
  int     i;
953
954
  /*
955
   * Be careful about order of operations here!  We remove the parallel
956
   * context from the list before we do anything else; otherwise, if an
957
   * error occurs during a subsequent step, we might try to nuke it again
958
   * from AtEOXact_Parallel or AtEOSubXact_Parallel.
959
   */
960
0
  dlist_delete(&pcxt->node);
961
962
  /* Kill each worker in turn, and forget their error queues. */
963
0
  if (pcxt->worker != NULL)
964
0
  {
965
0
    for (i = 0; i < pcxt->nworkers_launched; ++i)
966
0
    {
967
0
      if (pcxt->worker[i].error_mqh != NULL)
968
0
      {
969
0
        TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
970
971
0
        shm_mq_detach(pcxt->worker[i].error_mqh);
972
0
        pcxt->worker[i].error_mqh = NULL;
973
0
      }
974
0
    }
975
0
  }
976
977
  /*
978
   * If we have allocated a shared memory segment, detach it.  This will
979
   * implicitly detach the error queues, and any other shared memory queues,
980
   * stored there.
981
   */
982
0
  if (pcxt->seg != NULL)
983
0
  {
984
0
    dsm_detach(pcxt->seg);
985
0
    pcxt->seg = NULL;
986
0
  }
987
988
  /*
989
   * If this parallel context is actually in backend-private memory rather
990
   * than shared memory, free that memory instead.
991
   */
992
0
  if (pcxt->private_memory != NULL)
993
0
  {
994
0
    pfree(pcxt->private_memory);
995
0
    pcxt->private_memory = NULL;
996
0
  }
997
998
  /*
999
   * We can't finish transaction commit or abort until all of the workers
1000
   * have exited.  This means, in particular, that we can't respond to
1001
   * interrupts at this stage.
1002
   */
1003
0
  HOLD_INTERRUPTS();
1004
0
  WaitForParallelWorkersToExit(pcxt);
1005
0
  RESUME_INTERRUPTS();
1006
1007
  /* Free the worker array itself. */
1008
0
  if (pcxt->worker != NULL)
1009
0
  {
1010
0
    pfree(pcxt->worker);
1011
0
    pcxt->worker = NULL;
1012
0
  }
1013
1014
  /* Free memory. */
1015
0
  pfree(pcxt->library_name);
1016
0
  pfree(pcxt->function_name);
1017
0
  pfree(pcxt);
1018
0
}
1019
1020
/*
1021
 * Are there any parallel contexts currently active?
1022
 */
1023
bool
1024
ParallelContextActive(void)
1025
0
{
1026
0
  return !dlist_is_empty(&pcxt_list);
1027
0
}
1028
1029
/*
1030
 * Handle receipt of an interrupt indicating a parallel worker message.
1031
 *
1032
 * Note: this is called within a signal handler!  All we can do is set
1033
 * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
1034
 * ProcessParallelMessages().
1035
 */
1036
void
1037
HandleParallelMessageInterrupt(void)
1038
0
{
1039
0
  InterruptPending = true;
1040
0
  ParallelMessagePending = true;
1041
0
  SetLatch(MyLatch);
1042
0
}
1043
1044
/*
1045
 * Process any queued protocol messages received from parallel workers.
1046
 */
1047
void
1048
ProcessParallelMessages(void)
1049
0
{
1050
0
  dlist_iter  iter;
1051
0
  MemoryContext oldcontext;
1052
1053
0
  static MemoryContext hpm_context = NULL;
1054
1055
  /*
1056
   * This is invoked from ProcessInterrupts(), and since some of the
1057
   * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
1058
   * for recursive calls if more signals are received while this runs.  It's
1059
   * unclear that recursive entry would be safe, and it doesn't seem useful
1060
   * even if it is safe, so let's block interrupts until done.
1061
   */
1062
0
  HOLD_INTERRUPTS();
1063
1064
  /*
1065
   * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
1066
   * don't want to risk leaking data into long-lived contexts, so let's do
1067
   * our work here in a private context that we can reset on each use.
1068
   */
1069
0
  if (hpm_context == NULL) /* first time through? */
1070
0
    hpm_context = AllocSetContextCreate(TopMemoryContext,
1071
0
                      "ProcessParallelMessages",
1072
0
                      ALLOCSET_DEFAULT_SIZES);
1073
0
  else
1074
0
    MemoryContextReset(hpm_context);
1075
1076
0
  oldcontext = MemoryContextSwitchTo(hpm_context);
1077
1078
  /* OK to process messages.  Reset the flag saying there are more to do. */
1079
0
  ParallelMessagePending = false;
1080
1081
0
  dlist_foreach(iter, &pcxt_list)
1082
0
  {
1083
0
    ParallelContext *pcxt;
1084
0
    int     i;
1085
1086
0
    pcxt = dlist_container(ParallelContext, node, iter.cur);
1087
0
    if (pcxt->worker == NULL)
1088
0
      continue;
1089
1090
0
    for (i = 0; i < pcxt->nworkers_launched; ++i)
1091
0
    {
1092
      /*
1093
       * Read as many messages as we can from each worker, but stop when
1094
       * either (1) the worker's error queue goes away, which can happen
1095
       * if we receive a Terminate message from the worker; or (2) no
1096
       * more messages can be read from the worker without blocking.
1097
       */
1098
0
      while (pcxt->worker[i].error_mqh != NULL)
1099
0
      {
1100
0
        shm_mq_result res;
1101
0
        Size    nbytes;
1102
0
        void     *data;
1103
1104
0
        res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1105
0
                   &data, true);
1106
0
        if (res == SHM_MQ_WOULD_BLOCK)
1107
0
          break;
1108
0
        else if (res == SHM_MQ_SUCCESS)
1109
0
        {
1110
0
          StringInfoData msg;
1111
1112
0
          initStringInfo(&msg);
1113
0
          appendBinaryStringInfo(&msg, data, nbytes);
1114
0
          ProcessParallelMessage(pcxt, i, &msg);
1115
0
          pfree(msg.data);
1116
0
        }
1117
0
        else
1118
0
          ereport(ERROR,
1119
0
              (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1120
0
               errmsg("lost connection to parallel worker")));
1121
0
      }
1122
0
    }
1123
0
  }
1124
1125
0
  MemoryContextSwitchTo(oldcontext);
1126
1127
  /* Might as well clear the context on our way out */
1128
0
  MemoryContextReset(hpm_context);
1129
1130
0
  RESUME_INTERRUPTS();
1131
0
}
1132
1133
/*
1134
 * Process a single protocol message received from a single parallel worker.
1135
 */
1136
static void
1137
ProcessParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1138
0
{
1139
0
  char    msgtype;
1140
1141
0
  if (pcxt->known_attached_workers != NULL &&
1142
0
    !pcxt->known_attached_workers[i])
1143
0
  {
1144
0
    pcxt->known_attached_workers[i] = true;
1145
0
    pcxt->nknown_attached_workers++;
1146
0
  }
1147
1148
0
  msgtype = pq_getmsgbyte(msg);
1149
1150
0
  switch (msgtype)
1151
0
  {
1152
0
    case PqMsg_ErrorResponse:
1153
0
    case PqMsg_NoticeResponse:
1154
0
      {
1155
0
        ErrorData edata;
1156
0
        ErrorContextCallback *save_error_context_stack;
1157
1158
        /* Parse ErrorResponse or NoticeResponse. */
1159
0
        pq_parse_errornotice(msg, &edata);
1160
1161
        /* Death of a worker isn't enough justification for suicide. */
1162
0
        edata.elevel = Min(edata.elevel, ERROR);
1163
1164
        /*
1165
         * If desired, add a context line to show that this is a
1166
         * message propagated from a parallel worker.  Otherwise, it
1167
         * can sometimes be confusing to understand what actually
1168
         * happened.  (We don't do this in DEBUG_PARALLEL_REGRESS mode
1169
         * because it causes test-result instability depending on
1170
         * whether a parallel worker is actually used or not.)
1171
         */
1172
0
        if (debug_parallel_query != DEBUG_PARALLEL_REGRESS)
1173
0
        {
1174
0
          if (edata.context)
1175
0
            edata.context = psprintf("%s\n%s", edata.context,
1176
0
                         _("parallel worker"));
1177
0
          else
1178
0
            edata.context = pstrdup(_("parallel worker"));
1179
0
        }
1180
1181
        /*
1182
         * Context beyond that should use the error context callbacks
1183
         * that were in effect when the ParallelContext was created,
1184
         * not the current ones.
1185
         */
1186
0
        save_error_context_stack = error_context_stack;
1187
0
        error_context_stack = pcxt->error_context_stack;
1188
1189
        /* Rethrow error or print notice. */
1190
0
        ThrowErrorData(&edata);
1191
1192
        /* Not an error, so restore previous context stack. */
1193
0
        error_context_stack = save_error_context_stack;
1194
1195
0
        break;
1196
0
      }
1197
1198
0
    case PqMsg_NotificationResponse:
1199
0
      {
1200
        /* Propagate NotifyResponse. */
1201
0
        int32   pid;
1202
0
        const char *channel;
1203
0
        const char *payload;
1204
1205
0
        pid = pq_getmsgint(msg, 4);
1206
0
        channel = pq_getmsgrawstring(msg);
1207
0
        payload = pq_getmsgrawstring(msg);
1208
0
        pq_endmessage(msg);
1209
1210
0
        NotifyMyFrontEnd(channel, payload, pid);
1211
1212
0
        break;
1213
0
      }
1214
1215
0
    case PqMsg_Progress:
1216
0
      {
1217
        /*
1218
         * Only incremental progress reporting is currently supported.
1219
         * However, it's possible to add more fields to the message to
1220
         * allow for handling of other backend progress APIs.
1221
         */
1222
0
        int     index = pq_getmsgint(msg, 4);
1223
0
        int64   incr = pq_getmsgint64(msg);
1224
1225
0
        pq_getmsgend(msg);
1226
1227
0
        pgstat_progress_incr_param(index, incr);
1228
1229
0
        break;
1230
0
      }
1231
1232
0
    case PqMsg_Terminate:
1233
0
      {
1234
0
        shm_mq_detach(pcxt->worker[i].error_mqh);
1235
0
        pcxt->worker[i].error_mqh = NULL;
1236
0
        break;
1237
0
      }
1238
1239
0
    default:
1240
0
      {
1241
0
        elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1242
0
           msgtype, msg->len);
1243
0
      }
1244
0
  }
1245
0
}
1246
1247
/*
1248
 * End-of-subtransaction cleanup for parallel contexts.
1249
 *
1250
 * Here we remove only parallel contexts initiated within the current
1251
 * subtransaction.
1252
 */
1253
void
1254
AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1255
0
{
1256
0
  while (!dlist_is_empty(&pcxt_list))
1257
0
  {
1258
0
    ParallelContext *pcxt;
1259
1260
0
    pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1261
0
    if (pcxt->subid != mySubId)
1262
0
      break;
1263
0
    if (isCommit)
1264
0
      elog(WARNING, "leaked parallel context");
1265
0
    DestroyParallelContext(pcxt);
1266
0
  }
1267
0
}
1268
1269
/*
1270
 * End-of-transaction cleanup for parallel contexts.
1271
 *
1272
 * We nuke all remaining parallel contexts.
1273
 */
1274
void
1275
AtEOXact_Parallel(bool isCommit)
1276
0
{
1277
0
  while (!dlist_is_empty(&pcxt_list))
1278
0
  {
1279
0
    ParallelContext *pcxt;
1280
1281
0
    pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1282
0
    if (isCommit)
1283
0
      elog(WARNING, "leaked parallel context");
1284
0
    DestroyParallelContext(pcxt);
1285
0
  }
1286
0
}
1287
1288
/*
1289
 * Main entrypoint for parallel workers.
1290
 */
1291
void
1292
ParallelWorkerMain(Datum main_arg)
1293
0
{
1294
0
  dsm_segment *seg;
1295
0
  shm_toc    *toc;
1296
0
  FixedParallelState *fps;
1297
0
  char     *error_queue_space;
1298
0
  shm_mq     *mq;
1299
0
  shm_mq_handle *mqh;
1300
0
  char     *libraryspace;
1301
0
  char     *entrypointstate;
1302
0
  char     *library_name;
1303
0
  char     *function_name;
1304
0
  parallel_worker_main_type entrypt;
1305
0
  char     *gucspace;
1306
0
  char     *combocidspace;
1307
0
  char     *tsnapspace;
1308
0
  char     *asnapspace;
1309
0
  char     *tstatespace;
1310
0
  char     *pendingsyncsspace;
1311
0
  char     *reindexspace;
1312
0
  char     *relmapperspace;
1313
0
  char     *uncommittedenumsspace;
1314
0
  char     *clientconninfospace;
1315
0
  char     *session_dsm_handle_space;
1316
0
  Snapshot  tsnapshot;
1317
0
  Snapshot  asnapshot;
1318
1319
  /* Set flag to indicate that we're initializing a parallel worker. */
1320
0
  InitializingParallelWorker = true;
1321
1322
  /* Establish signal handlers. */
1323
0
  pqsignal(SIGTERM, die);
1324
0
  BackgroundWorkerUnblockSignals();
1325
1326
  /* Determine and set our parallel worker number. */
1327
0
  Assert(ParallelWorkerNumber == -1);
1328
0
  memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1329
1330
  /* Set up a memory context to work in, just for cleanliness. */
1331
0
  CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1332
0
                         "Parallel worker",
1333
0
                         ALLOCSET_DEFAULT_SIZES);
1334
1335
  /*
1336
   * Attach to the dynamic shared memory segment for the parallel query, and
1337
   * find its table of contents.
1338
   *
1339
   * Note: at this point, we have not created any ResourceOwner in this
1340
   * process.  This will result in our DSM mapping surviving until process
1341
   * exit, which is fine.  If there were a ResourceOwner, it would acquire
1342
   * ownership of the mapping, but we have no need for that.
1343
   */
1344
0
  seg = dsm_attach(DatumGetUInt32(main_arg));
1345
0
  if (seg == NULL)
1346
0
    ereport(ERROR,
1347
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1348
0
         errmsg("could not map dynamic shared memory segment")));
1349
0
  toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1350
0
  if (toc == NULL)
1351
0
    ereport(ERROR,
1352
0
        (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1353
0
         errmsg("invalid magic number in dynamic shared memory segment")));
1354
1355
  /* Look up fixed parallel state. */
1356
0
  fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1357
0
  MyFixedParallelState = fps;
1358
1359
  /* Arrange to signal the leader if we exit. */
1360
0
  ParallelLeaderPid = fps->parallel_leader_pid;
1361
0
  ParallelLeaderProcNumber = fps->parallel_leader_proc_number;
1362
0
  before_shmem_exit(ParallelWorkerShutdown, PointerGetDatum(seg));
1363
1364
  /*
1365
   * Now we can find and attach to the error queue provided for us.  That's
1366
   * good, because until we do that, any errors that happen here will not be
1367
   * reported back to the process that requested that this worker be
1368
   * launched.
1369
   */
1370
0
  error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1371
0
  mq = (shm_mq *) (error_queue_space +
1372
0
           ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1373
0
  shm_mq_set_sender(mq, MyProc);
1374
0
  mqh = shm_mq_attach(mq, seg, NULL);
1375
0
  pq_redirect_to_shm_mq(seg, mqh);
1376
0
  pq_set_parallel_leader(fps->parallel_leader_pid,
1377
0
               fps->parallel_leader_proc_number);
1378
1379
  /*
1380
   * Hooray! Primary initialization is complete.  Now, we need to set up our
1381
   * backend-local state to match the original backend.
1382
   */
1383
1384
  /*
1385
   * Join locking group.  We must do this before anything that could try to
1386
   * acquire a heavyweight lock, because any heavyweight locks acquired to
1387
   * this point could block either directly against the parallel group
1388
   * leader or against some process which in turn waits for a lock that
1389
   * conflicts with the parallel group leader, causing an undetected
1390
   * deadlock.  (If we can't join the lock group, the leader has gone away,
1391
   * so just exit quietly.)
1392
   */
1393
0
  if (!BecomeLockGroupMember(fps->parallel_leader_pgproc,
1394
0
                 fps->parallel_leader_pid))
1395
0
    return;
1396
1397
  /*
1398
   * Restore transaction and statement start-time timestamps.  This must
1399
   * happen before anything that would start a transaction, else asserts in
1400
   * xact.c will fire.
1401
   */
1402
0
  SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1403
1404
  /*
1405
   * Identify the entry point to be called.  In theory this could result in
1406
   * loading an additional library, though most likely the entry point is in
1407
   * the core backend or in a library we just loaded.
1408
   */
1409
0
  entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1410
0
  library_name = entrypointstate;
1411
0
  function_name = entrypointstate + strlen(library_name) + 1;
1412
1413
0
  entrypt = LookupParallelWorkerFunction(library_name, function_name);
1414
1415
  /*
1416
   * Restore current session authorization and role id.  No verification
1417
   * happens here, we just blindly adopt the leader's state.  Note that this
1418
   * has to happen before InitPostgres, since InitializeSessionUserId will
1419
   * not set these variables.
1420
   */
1421
0
  SetAuthenticatedUserId(fps->authenticated_user_id);
1422
0
  SetSessionAuthorization(fps->session_user_id,
1423
0
              fps->session_user_is_superuser);
1424
0
  SetCurrentRoleId(fps->outer_user_id, fps->role_is_superuser);
1425
1426
  /*
1427
   * Restore database connection.  We skip connection authorization checks,
1428
   * reasoning that (a) the leader checked these things when it started, and
1429
   * (b) we do not want parallel mode to cause these failures, because that
1430
   * would make use of parallel query plans not transparent to applications.
1431
   */
1432
0
  BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1433
0
                        fps->authenticated_user_id,
1434
0
                        BGWORKER_BYPASS_ALLOWCONN |
1435
0
                        BGWORKER_BYPASS_ROLELOGINCHECK);
1436
1437
  /*
1438
   * Set the client encoding to the database encoding, since that is what
1439
   * the leader will expect.  (We're cheating a bit by not calling
1440
   * PrepareClientEncoding first.  It's okay because this call will always
1441
   * result in installing a no-op conversion.  No error should be possible,
1442
   * but check anyway.)
1443
   */
1444
0
  if (SetClientEncoding(GetDatabaseEncoding()) < 0)
1445
0
    elog(ERROR, "SetClientEncoding(%d) failed", GetDatabaseEncoding());
1446
1447
  /*
1448
   * Load libraries that were loaded by original backend.  We want to do
1449
   * this before restoring GUCs, because the libraries might define custom
1450
   * variables.
1451
   */
1452
0
  libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1453
0
  StartTransactionCommand();
1454
0
  RestoreLibraryState(libraryspace);
1455
0
  CommitTransactionCommand();
1456
1457
  /* Crank up a transaction state appropriate to a parallel worker. */
1458
0
  tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1459
0
  StartParallelWorkerTransaction(tstatespace);
1460
1461
  /*
1462
   * Restore state that affects catalog access.  Ideally we'd do this even
1463
   * before calling InitPostgres, but that has order-of-initialization
1464
   * problems, and also the relmapper would get confused during the
1465
   * CommitTransactionCommand call above.
1466
   */
1467
0
  pendingsyncsspace = shm_toc_lookup(toc, PARALLEL_KEY_PENDING_SYNCS,
1468
0
                     false);
1469
0
  RestorePendingSyncs(pendingsyncsspace);
1470
0
  relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false);
1471
0
  RestoreRelationMap(relmapperspace);
1472
0
  reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1473
0
  RestoreReindexState(reindexspace);
1474
0
  combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1475
0
  RestoreComboCIDState(combocidspace);
1476
1477
  /* Attach to the per-session DSM segment and contained objects. */
1478
0
  session_dsm_handle_space =
1479
0
    shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1480
0
  AttachSession(*(dsm_handle *) session_dsm_handle_space);
1481
1482
  /*
1483
   * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1484
   * the leader has serialized the transaction snapshot and we must restore
1485
   * it. At lower isolation levels, there is no transaction-lifetime
1486
   * snapshot, but we need TransactionXmin to get set to a value which is
1487
   * less than or equal to the xmin of every snapshot that will be used by
1488
   * this worker. The easiest way to accomplish that is to install the
1489
   * active snapshot as the transaction snapshot. Code running in this
1490
   * parallel worker might take new snapshots via GetTransactionSnapshot()
1491
   * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1492
   * snapshot older than the active snapshot.
1493
   */
1494
0
  asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1495
0
  tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1496
0
  asnapshot = RestoreSnapshot(asnapspace);
1497
0
  tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1498
0
  RestoreTransactionSnapshot(tsnapshot,
1499
0
                 fps->parallel_leader_pgproc);
1500
0
  PushActiveSnapshot(asnapshot);
1501
1502
  /*
1503
   * We've changed which tuples we can see, and must therefore invalidate
1504
   * system caches.
1505
   */
1506
0
  InvalidateSystemCaches();
1507
1508
  /*
1509
   * Restore GUC values from launching backend.  We can't do this earlier,
1510
   * because GUC check hooks that do catalog lookups need to see the same
1511
   * database state as the leader.  Also, the check hooks for
1512
   * session_authorization and role assume we already set the correct role
1513
   * OIDs.
1514
   */
1515
0
  gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1516
0
  RestoreGUCState(gucspace);
1517
1518
  /*
1519
   * Restore current user ID and security context.  No verification happens
1520
   * here, we just blindly adopt the leader's state.  We can't do this till
1521
   * after restoring GUCs, else we'll get complaints about restoring
1522
   * session_authorization and role.  (In effect, we're assuming that all
1523
   * the restored values are okay to set, even if we are now inside a
1524
   * restricted context.)
1525
   */
1526
0
  SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1527
1528
  /* Restore temp-namespace state to ensure search path matches leader's. */
1529
0
  SetTempNamespaceState(fps->temp_namespace_id,
1530
0
              fps->temp_toast_namespace_id);
1531
1532
  /* Restore uncommitted enums. */
1533
0
  uncommittedenumsspace = shm_toc_lookup(toc, PARALLEL_KEY_UNCOMMITTEDENUMS,
1534
0
                       false);
1535
0
  RestoreUncommittedEnums(uncommittedenumsspace);
1536
1537
  /* Restore the ClientConnectionInfo. */
1538
0
  clientconninfospace = shm_toc_lookup(toc, PARALLEL_KEY_CLIENTCONNINFO,
1539
0
                     false);
1540
0
  RestoreClientConnectionInfo(clientconninfospace);
1541
1542
  /*
1543
   * Initialize SystemUser now that MyClientConnectionInfo is restored. Also
1544
   * ensure that auth_method is actually valid, aka authn_id is not NULL.
1545
   */
1546
0
  if (MyClientConnectionInfo.authn_id)
1547
0
    InitializeSystemUser(MyClientConnectionInfo.authn_id,
1548
0
               hba_authname(MyClientConnectionInfo.auth_method));
1549
1550
  /* Attach to the leader's serializable transaction, if SERIALIZABLE. */
1551
0
  AttachSerializableXact(fps->serializable_xact_handle);
1552
1553
  /*
1554
   * We've initialized all of our state now; nothing should change
1555
   * hereafter.
1556
   */
1557
0
  InitializingParallelWorker = false;
1558
0
  EnterParallelMode();
1559
1560
  /*
1561
   * Time to do the real work: invoke the caller-supplied code.
1562
   */
1563
0
  entrypt(seg, toc);
1564
1565
  /* Must exit parallel mode to pop active snapshot. */
1566
0
  ExitParallelMode();
1567
1568
  /* Must pop active snapshot so snapmgr.c doesn't complain. */
1569
0
  PopActiveSnapshot();
1570
1571
  /* Shut down the parallel-worker transaction. */
1572
0
  EndParallelWorkerTransaction();
1573
1574
  /* Detach from the per-session DSM segment. */
1575
0
  DetachSession();
1576
1577
  /* Report success. */
1578
0
  pq_putmessage(PqMsg_Terminate, NULL, 0);
1579
0
}
1580
1581
/*
1582
 * Update shared memory with the ending location of the last WAL record we
1583
 * wrote, if it's greater than the value already stored there.
1584
 */
1585
void
1586
ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1587
0
{
1588
0
  FixedParallelState *fps = MyFixedParallelState;
1589
1590
0
  Assert(fps != NULL);
1591
0
  SpinLockAcquire(&fps->mutex);
1592
0
  if (fps->last_xlog_end < last_xlog_end)
1593
0
    fps->last_xlog_end = last_xlog_end;
1594
0
  SpinLockRelease(&fps->mutex);
1595
0
}
1596
1597
/*
1598
 * Make sure the leader tries to read from our error queue one more time.
1599
 * This guards against the case where we exit uncleanly without sending an
1600
 * ErrorResponse to the leader, for example because some code calls proc_exit
1601
 * directly.
1602
 *
1603
 * Also explicitly detach from dsm segment so that subsystems using
1604
 * on_dsm_detach() have a chance to send stats before the stats subsystem is
1605
 * shut down as part of a before_shmem_exit() hook.
1606
 *
1607
 * One might think this could instead be solved by carefully ordering the
1608
 * attaching to dsm segments, so that the pgstats segments get detached from
1609
 * later than the parallel query one. That turns out to not work because the
1610
 * stats hash might need to grow which can cause new segments to be allocated,
1611
 * which then will be detached from earlier.
1612
 */
1613
static void
1614
ParallelWorkerShutdown(int code, Datum arg)
1615
0
{
1616
0
  SendProcSignal(ParallelLeaderPid,
1617
0
           PROCSIG_PARALLEL_MESSAGE,
1618
0
           ParallelLeaderProcNumber);
1619
1620
0
  dsm_detach((dsm_segment *) DatumGetPointer(arg));
1621
0
}
1622
1623
/*
1624
 * Look up (and possibly load) a parallel worker entry point function.
1625
 *
1626
 * For functions contained in the core code, we use library name "postgres"
1627
 * and consult the InternalParallelWorkers array.  External functions are
1628
 * looked up, and loaded if necessary, using load_external_function().
1629
 *
1630
 * The point of this is to pass function names as strings across process
1631
 * boundaries.  We can't pass actual function addresses because of the
1632
 * possibility that the function has been loaded at a different address
1633
 * in a different process.  This is obviously a hazard for functions in
1634
 * loadable libraries, but it can happen even for functions in the core code
1635
 * on platforms using EXEC_BACKEND (e.g., Windows).
1636
 *
1637
 * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1638
 * in favor of applying load_external_function() for core functions too;
1639
 * but that raises portability issues that are not worth addressing now.
1640
 */
1641
static parallel_worker_main_type
1642
LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1643
0
{
1644
  /*
1645
   * If the function is to be loaded from postgres itself, search the
1646
   * InternalParallelWorkers array.
1647
   */
1648
0
  if (strcmp(libraryname, "postgres") == 0)
1649
0
  {
1650
0
    int     i;
1651
1652
0
    for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1653
0
    {
1654
0
      if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1655
0
        return InternalParallelWorkers[i].fn_addr;
1656
0
    }
1657
1658
    /* We can only reach this by programming error. */
1659
0
    elog(ERROR, "internal function \"%s\" not found", funcname);
1660
0
  }
1661
1662
  /* Otherwise load from external library. */
1663
0
  return (parallel_worker_main_type)
1664
0
    load_external_function(libraryname, funcname, true, NULL);
1665
0
}