Coverage Report

Created: 2025-06-13 06:06

/src/postgres/src/backend/executor/nodeGather.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * nodeGather.c
4
 *    Support routines for scanning a plan via multiple workers.
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * A Gather executor launches parallel workers to run multiple copies of a
10
 * plan.  It can also run the plan itself, if the workers are not available
11
 * or have not started up yet.  It then merges all of the results it produces
12
 * and the results from the workers into a single output stream.  Therefore,
13
 * it will normally be used with a plan where running multiple copies of the
14
 * same plan does not produce duplicate output, such as parallel-aware
15
 * SeqScan.
16
 *
17
 * Alternatively, a Gather node can be configured to use just one worker
18
 * and the single-copy flag can be set.  In this case, the Gather node will
19
 * run the plan in one worker and will not execute the plan itself.  In
20
 * this case, it simply returns whatever tuples were returned by the worker.
21
 * If a worker cannot be obtained, then it will run the plan itself and
22
 * return the results.  Therefore, a plan used with a single-copy Gather
23
 * node need not be parallel-aware.
24
 *
25
 * IDENTIFICATION
26
 *    src/backend/executor/nodeGather.c
27
 *
28
 *-------------------------------------------------------------------------
29
 */
30
31
#include "postgres.h"
32
33
#include "executor/execParallel.h"
34
#include "executor/executor.h"
35
#include "executor/nodeGather.h"
36
#include "executor/tqueue.h"
37
#include "miscadmin.h"
38
#include "optimizer/optimizer.h"
39
#include "utils/wait_event.h"
40
41
42
static TupleTableSlot *ExecGather(PlanState *pstate);
43
static TupleTableSlot *gather_getnext(GatherState *gatherstate);
44
static MinimalTuple gather_readnext(GatherState *gatherstate);
45
static void ExecShutdownGatherWorkers(GatherState *node);
46
47
48
/* ----------------------------------------------------------------
49
 *    ExecInitGather
50
 * ----------------------------------------------------------------
51
 */
52
GatherState *
53
ExecInitGather(Gather *node, EState *estate, int eflags)
54
0
{
55
0
  GatherState *gatherstate;
56
0
  Plan     *outerNode;
57
0
  TupleDesc tupDesc;
58
59
  /* Gather node doesn't have innerPlan node. */
60
0
  Assert(innerPlan(node) == NULL);
61
62
  /*
63
   * create state structure
64
   */
65
0
  gatherstate = makeNode(GatherState);
66
0
  gatherstate->ps.plan = (Plan *) node;
67
0
  gatherstate->ps.state = estate;
68
0
  gatherstate->ps.ExecProcNode = ExecGather;
69
70
0
  gatherstate->initialized = false;
71
0
  gatherstate->need_to_scan_locally =
72
0
    !node->single_copy && parallel_leader_participation;
73
0
  gatherstate->tuples_needed = -1;
74
75
  /*
76
   * Miscellaneous initialization
77
   *
78
   * create expression context for node
79
   */
80
0
  ExecAssignExprContext(estate, &gatherstate->ps);
81
82
  /*
83
   * now initialize outer plan
84
   */
85
0
  outerNode = outerPlan(node);
86
0
  outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags);
87
0
  tupDesc = ExecGetResultType(outerPlanState(gatherstate));
88
89
  /*
90
   * Leader may access ExecProcNode result directly (if
91
   * need_to_scan_locally), or from workers via tuple queue.  So we can't
92
   * trivially rely on the slot type being fixed for expressions evaluated
93
   * within this node.
94
   */
95
0
  gatherstate->ps.outeropsset = true;
96
0
  gatherstate->ps.outeropsfixed = false;
97
98
  /*
99
   * Initialize result type and projection.
100
   */
101
0
  ExecInitResultTypeTL(&gatherstate->ps);
102
0
  ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR);
103
104
  /*
105
   * Without projections result slot type is not trivially known, see
106
   * comment above.
107
   */
108
0
  if (gatherstate->ps.ps_ProjInfo == NULL)
109
0
  {
110
0
    gatherstate->ps.resultopsset = true;
111
0
    gatherstate->ps.resultopsfixed = false;
112
0
  }
113
114
  /*
115
   * Initialize funnel slot to same tuple descriptor as outer plan.
116
   */
117
0
  gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc,
118
0
                            &TTSOpsMinimalTuple);
119
120
  /*
121
   * Gather doesn't support checking a qual (it's always more efficient to
122
   * do it in the child node).
123
   */
124
0
  Assert(!node->plan.qual);
125
126
0
  return gatherstate;
127
0
}
128
129
/* ----------------------------------------------------------------
130
 *    ExecGather(node)
131
 *
132
 *    Scans the relation via multiple workers and returns
133
 *    the next qualifying tuple.
134
 * ----------------------------------------------------------------
135
 */
136
static TupleTableSlot *
137
ExecGather(PlanState *pstate)
138
0
{
139
0
  GatherState *node = castNode(GatherState, pstate);
140
0
  TupleTableSlot *slot;
141
0
  ExprContext *econtext;
142
143
0
  CHECK_FOR_INTERRUPTS();
144
145
  /*
146
   * Initialize the parallel context and workers on first execution. We do
147
   * this on first execution rather than during node initialization, as it
148
   * needs to allocate a large dynamic segment, so it is better to do it
149
   * only if it is really needed.
150
   */
151
0
  if (!node->initialized)
152
0
  {
153
0
    EState     *estate = node->ps.state;
154
0
    Gather     *gather = (Gather *) node->ps.plan;
155
156
    /*
157
     * Sometimes we might have to run without parallelism; but if parallel
158
     * mode is active then we can try to fire up some workers.
159
     */
160
0
    if (gather->num_workers > 0 && estate->es_use_parallel_mode)
161
0
    {
162
0
      ParallelContext *pcxt;
163
164
      /* Initialize, or re-initialize, shared state needed by workers. */
165
0
      if (!node->pei)
166
0
        node->pei = ExecInitParallelPlan(outerPlanState(node),
167
0
                         estate,
168
0
                         gather->initParam,
169
0
                         gather->num_workers,
170
0
                         node->tuples_needed);
171
0
      else
172
0
        ExecParallelReinitialize(outerPlanState(node),
173
0
                     node->pei,
174
0
                     gather->initParam);
175
176
      /*
177
       * Register backend workers. We might not get as many as we
178
       * requested, or indeed any at all.
179
       */
180
0
      pcxt = node->pei->pcxt;
181
0
      LaunchParallelWorkers(pcxt);
182
      /* We save # workers launched for the benefit of EXPLAIN */
183
0
      node->nworkers_launched = pcxt->nworkers_launched;
184
185
      /*
186
       * Count number of workers originally wanted and actually
187
       * launched.
188
       */
189
0
      estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch;
190
0
      estate->es_parallel_workers_launched += pcxt->nworkers_launched;
191
192
      /* Set up tuple queue readers to read the results. */
193
0
      if (pcxt->nworkers_launched > 0)
194
0
      {
195
0
        ExecParallelCreateReaders(node->pei);
196
        /* Make a working array showing the active readers */
197
0
        node->nreaders = pcxt->nworkers_launched;
198
0
        node->reader = (TupleQueueReader **)
199
0
          palloc(node->nreaders * sizeof(TupleQueueReader *));
200
0
        memcpy(node->reader, node->pei->reader,
201
0
             node->nreaders * sizeof(TupleQueueReader *));
202
0
      }
203
0
      else
204
0
      {
205
        /* No workers?  Then never mind. */
206
0
        node->nreaders = 0;
207
0
        node->reader = NULL;
208
0
      }
209
0
      node->nextreader = 0;
210
0
    }
211
212
    /* Run plan locally if no workers or enabled and not single-copy. */
213
0
    node->need_to_scan_locally = (node->nreaders == 0)
214
0
      || (!gather->single_copy && parallel_leader_participation);
215
0
    node->initialized = true;
216
0
  }
217
218
  /*
219
   * Reset per-tuple memory context to free any expression evaluation
220
   * storage allocated in the previous tuple cycle.
221
   */
222
0
  econtext = node->ps.ps_ExprContext;
223
0
  ResetExprContext(econtext);
224
225
  /*
226
   * Get next tuple, either from one of our workers, or by running the plan
227
   * ourselves.
228
   */
229
0
  slot = gather_getnext(node);
230
0
  if (TupIsNull(slot))
231
0
    return NULL;
232
233
  /* If no projection is required, we're done. */
234
0
  if (node->ps.ps_ProjInfo == NULL)
235
0
    return slot;
236
237
  /*
238
   * Form the result tuple using ExecProject(), and return it.
239
   */
240
0
  econtext->ecxt_outertuple = slot;
241
0
  return ExecProject(node->ps.ps_ProjInfo);
242
0
}
243
244
/* ----------------------------------------------------------------
245
 *    ExecEndGather
246
 *
247
 *    frees any storage allocated through C routines.
248
 * ----------------------------------------------------------------
249
 */
250
void
251
ExecEndGather(GatherState *node)
252
0
{
253
0
  ExecEndNode(outerPlanState(node));  /* let children clean up first */
254
0
  ExecShutdownGather(node);
255
0
}
256
257
/*
258
 * Read the next tuple.  We might fetch a tuple from one of the tuple queues
259
 * using gather_readnext, or if no tuple queue contains a tuple and the
260
 * single_copy flag is not set, we might generate one locally instead.
261
 */
262
static TupleTableSlot *
263
gather_getnext(GatherState *gatherstate)
264
0
{
265
0
  PlanState  *outerPlan = outerPlanState(gatherstate);
266
0
  TupleTableSlot *outerTupleSlot;
267
0
  TupleTableSlot *fslot = gatherstate->funnel_slot;
268
0
  MinimalTuple tup;
269
270
0
  while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally)
271
0
  {
272
0
    CHECK_FOR_INTERRUPTS();
273
274
0
    if (gatherstate->nreaders > 0)
275
0
    {
276
0
      tup = gather_readnext(gatherstate);
277
278
0
      if (HeapTupleIsValid(tup))
279
0
      {
280
0
        ExecStoreMinimalTuple(tup,  /* tuple to store */
281
0
                    fslot,  /* slot to store the tuple */
282
0
                    false); /* don't pfree tuple  */
283
0
        return fslot;
284
0
      }
285
0
    }
286
287
0
    if (gatherstate->need_to_scan_locally)
288
0
    {
289
0
      EState     *estate = gatherstate->ps.state;
290
291
      /* Install our DSA area while executing the plan. */
292
0
      estate->es_query_dsa =
293
0
        gatherstate->pei ? gatherstate->pei->area : NULL;
294
0
      outerTupleSlot = ExecProcNode(outerPlan);
295
0
      estate->es_query_dsa = NULL;
296
297
0
      if (!TupIsNull(outerTupleSlot))
298
0
        return outerTupleSlot;
299
300
0
      gatherstate->need_to_scan_locally = false;
301
0
    }
302
0
  }
303
304
0
  return ExecClearTuple(fslot);
305
0
}
306
307
/*
308
 * Attempt to read a tuple from one of our parallel workers.
309
 */
310
static MinimalTuple
311
gather_readnext(GatherState *gatherstate)
312
0
{
313
0
  int     nvisited = 0;
314
315
0
  for (;;)
316
0
  {
317
0
    TupleQueueReader *reader;
318
0
    MinimalTuple tup;
319
0
    bool    readerdone;
320
321
    /* Check for async events, particularly messages from workers. */
322
0
    CHECK_FOR_INTERRUPTS();
323
324
    /*
325
     * Attempt to read a tuple, but don't block if none is available.
326
     *
327
     * Note that TupleQueueReaderNext will just return NULL for a worker
328
     * which fails to initialize.  We'll treat that worker as having
329
     * produced no tuples; WaitForParallelWorkersToFinish will error out
330
     * when we get there.
331
     */
332
0
    Assert(gatherstate->nextreader < gatherstate->nreaders);
333
0
    reader = gatherstate->reader[gatherstate->nextreader];
334
0
    tup = TupleQueueReaderNext(reader, true, &readerdone);
335
336
    /*
337
     * If this reader is done, remove it from our working array of active
338
     * readers.  If all readers are done, we're outta here.
339
     */
340
0
    if (readerdone)
341
0
    {
342
0
      Assert(!tup);
343
0
      --gatherstate->nreaders;
344
0
      if (gatherstate->nreaders == 0)
345
0
      {
346
0
        ExecShutdownGatherWorkers(gatherstate);
347
0
        return NULL;
348
0
      }
349
0
      memmove(&gatherstate->reader[gatherstate->nextreader],
350
0
          &gatherstate->reader[gatherstate->nextreader + 1],
351
0
          sizeof(TupleQueueReader *)
352
0
          * (gatherstate->nreaders - gatherstate->nextreader));
353
0
      if (gatherstate->nextreader >= gatherstate->nreaders)
354
0
        gatherstate->nextreader = 0;
355
0
      continue;
356
0
    }
357
358
    /* If we got a tuple, return it. */
359
0
    if (tup)
360
0
      return tup;
361
362
    /*
363
     * Advance nextreader pointer in round-robin fashion.  Note that we
364
     * only reach this code if we weren't able to get a tuple from the
365
     * current worker.  We used to advance the nextreader pointer after
366
     * every tuple, but it turns out to be much more efficient to keep
367
     * reading from the same queue until that would require blocking.
368
     */
369
0
    gatherstate->nextreader++;
370
0
    if (gatherstate->nextreader >= gatherstate->nreaders)
371
0
      gatherstate->nextreader = 0;
372
373
    /* Have we visited every (surviving) TupleQueueReader? */
374
0
    nvisited++;
375
0
    if (nvisited >= gatherstate->nreaders)
376
0
    {
377
      /*
378
       * If (still) running plan locally, return NULL so caller can
379
       * generate another tuple from the local copy of the plan.
380
       */
381
0
      if (gatherstate->need_to_scan_locally)
382
0
        return NULL;
383
384
      /* Nothing to do except wait for developments. */
385
0
      (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
386
0
               WAIT_EVENT_EXECUTE_GATHER);
387
0
      ResetLatch(MyLatch);
388
0
      nvisited = 0;
389
0
    }
390
0
  }
391
0
}
392
393
/* ----------------------------------------------------------------
394
 *    ExecShutdownGatherWorkers
395
 *
396
 *    Stop all the parallel workers.
397
 * ----------------------------------------------------------------
398
 */
399
static void
400
ExecShutdownGatherWorkers(GatherState *node)
401
0
{
402
0
  if (node->pei != NULL)
403
0
    ExecParallelFinish(node->pei);
404
405
  /* Flush local copy of reader array */
406
0
  if (node->reader)
407
0
    pfree(node->reader);
408
0
  node->reader = NULL;
409
0
}
410
411
/* ----------------------------------------------------------------
412
 *    ExecShutdownGather
413
 *
414
 *    Destroy the setup for parallel workers including parallel context.
415
 * ----------------------------------------------------------------
416
 */
417
void
418
ExecShutdownGather(GatherState *node)
419
0
{
420
0
  ExecShutdownGatherWorkers(node);
421
422
  /* Now destroy the parallel context. */
423
0
  if (node->pei != NULL)
424
0
  {
425
0
    ExecParallelCleanup(node->pei);
426
0
    node->pei = NULL;
427
0
  }
428
0
}
429
430
/* ----------------------------------------------------------------
431
 *            Join Support
432
 * ----------------------------------------------------------------
433
 */
434
435
/* ----------------------------------------------------------------
436
 *    ExecReScanGather
437
 *
438
 *    Prepare to re-scan the result of a Gather.
439
 * ----------------------------------------------------------------
440
 */
441
void
442
ExecReScanGather(GatherState *node)
443
0
{
444
0
  Gather     *gather = (Gather *) node->ps.plan;
445
0
  PlanState  *outerPlan = outerPlanState(node);
446
447
  /* Make sure any existing workers are gracefully shut down */
448
0
  ExecShutdownGatherWorkers(node);
449
450
  /* Mark node so that shared state will be rebuilt at next call */
451
0
  node->initialized = false;
452
453
  /*
454
   * Set child node's chgParam to tell it that the next scan might deliver a
455
   * different set of rows within the leader process.  (The overall rowset
456
   * shouldn't change, but the leader process's subset might; hence nodes
457
   * between here and the parallel table scan node mustn't optimize on the
458
   * assumption of an unchanging rowset.)
459
   */
460
0
  if (gather->rescan_param >= 0)
461
0
    outerPlan->chgParam = bms_add_member(outerPlan->chgParam,
462
0
                       gather->rescan_param);
463
464
  /*
465
   * If chgParam of subnode is not null then plan will be re-scanned by
466
   * first ExecProcNode.  Note: because this does nothing if we have a
467
   * rescan_param, it's currently guaranteed that parallel-aware child nodes
468
   * will not see a ReScan call until after they get a ReInitializeDSM call.
469
   * That ordering might not be something to rely on, though.  A good rule
470
   * of thumb is that ReInitializeDSM should reset only shared state, ReScan
471
   * should reset only local state, and anything that depends on both of
472
   * those steps being finished must wait until the first ExecProcNode call.
473
   */
474
0
  if (outerPlan->chgParam == NULL)
475
0
    ExecReScan(outerPlan);
476
0
}