Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/executor/nodeAppend.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * nodeAppend.c
4
 *    routines to handle append nodes.
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/executor/nodeAppend.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
/* INTERFACE ROUTINES
16
 *    ExecInitAppend  - initialize the append node
17
 *    ExecAppend    - retrieve the next tuple from the node
18
 *    ExecEndAppend - shut down the append node
19
 *    ExecReScanAppend - rescan the append node
20
 *
21
 *   NOTES
22
 *    Each append node contains a list of one or more subplans which
23
 *    must be iteratively processed (forwards or backwards).
24
 *    Tuples are retrieved by executing the 'whichplan'th subplan
25
 *    until the subplan stops returning tuples, at which point that
26
 *    plan is shut down and the next started up.
27
 *
28
 *    Append nodes don't make use of their left and right
29
 *    subtrees, rather they maintain a list of subplans so
30
 *    a typical append node looks like this in the plan tree:
31
 *
32
 *           ...
33
 *           /
34
 *        Append -------+------+------+--- nil
35
 *        / \     |    |    |
36
 *        nil nil    ...    ...    ...
37
 *                 subplans
38
 *
39
 *    Append nodes are currently used for unions, and to support
40
 *    inheritance queries, where several relations need to be scanned.
41
 *    For example, in our standard person/student/employee/student-emp
42
 *    example, where student and employee inherit from person
43
 *    and student-emp inherits from student and employee, the
44
 *    query:
45
 *
46
 *        select name from person
47
 *
48
 *    generates the plan:
49
 *
50
 *          |
51
 *        Append -------+-------+--------+--------+
52
 *        / \     |     |      |    |
53
 *        nil nil    Scan  Scan   Scan     Scan
54
 *                |     |      |    |
55
 *              person employee student student-emp
56
 */
57
58
#include "postgres.h"
59
60
#include "executor/execAsync.h"
61
#include "executor/execPartition.h"
62
#include "executor/executor.h"
63
#include "executor/nodeAppend.h"
64
#include "miscadmin.h"
65
#include "pgstat.h"
66
#include "storage/latch.h"
67
68
/* Shared state for parallel-aware Append. */
69
struct ParallelAppendState
70
{
71
  LWLock    pa_lock;    /* mutual exclusion to choose next subplan */
72
  int     pa_next_plan; /* next plan to choose by any worker */
73
74
  /*
75
   * pa_finished[i] should be true if no more workers should select subplan
76
   * i.  for a non-partial plan, this should be set to true as soon as a
77
   * worker selects the plan; for a partial plan, it remains false until
78
   * some worker executes the plan to completion.
79
   */
80
  bool    pa_finished[FLEXIBLE_ARRAY_MEMBER];
81
};
82
83
0
#define INVALID_SUBPLAN_INDEX   -1
84
0
#define EVENT_BUFFER_SIZE     16
85
86
static TupleTableSlot *ExecAppend(PlanState *pstate);
87
static bool choose_next_subplan_locally(AppendState *node);
88
static bool choose_next_subplan_for_leader(AppendState *node);
89
static bool choose_next_subplan_for_worker(AppendState *node);
90
static void mark_invalid_subplans_as_finished(AppendState *node);
91
static void ExecAppendAsyncBegin(AppendState *node);
92
static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
93
static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
94
static void ExecAppendAsyncEventWait(AppendState *node);
95
static void classify_matching_subplans(AppendState *node);
96
97
/* ----------------------------------------------------------------
98
 *    ExecInitAppend
99
 *
100
 *    Begin all of the subscans of the append node.
101
 *
102
 *     (This is potentially wasteful, since the entire result of the
103
 *    append node may not be scanned, but this way all of the
104
 *    structures get allocated in the executor's top level memory
105
 *    block instead of that of the call to ExecAppend.)
106
 * ----------------------------------------------------------------
107
 */
108
AppendState *
109
ExecInitAppend(Append *node, EState *estate, int eflags)
110
0
{
111
0
  AppendState *appendstate = makeNode(AppendState);
112
0
  PlanState **appendplanstates;
113
0
  const TupleTableSlotOps *appendops;
114
0
  Bitmapset  *validsubplans;
115
0
  Bitmapset  *asyncplans;
116
0
  int     nplans;
117
0
  int     nasyncplans;
118
0
  int     firstvalid;
119
0
  int     i,
120
0
        j;
121
122
  /* check for unsupported flags */
123
0
  Assert(!(eflags & EXEC_FLAG_MARK));
124
125
  /*
126
   * create new AppendState for our append node
127
   */
128
0
  appendstate->ps.plan = (Plan *) node;
129
0
  appendstate->ps.state = estate;
130
0
  appendstate->ps.ExecProcNode = ExecAppend;
131
132
  /* Let choose_next_subplan_* function handle setting the first subplan */
133
0
  appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
134
0
  appendstate->as_syncdone = false;
135
0
  appendstate->as_begun = false;
136
137
  /* If run-time partition pruning is enabled, then set that up now */
138
0
  if (node->part_prune_index >= 0)
139
0
  {
140
0
    PartitionPruneState *prunestate;
141
142
    /*
143
     * Set up pruning data structure.  This also initializes the set of
144
     * subplans to initialize (validsubplans) by taking into account the
145
     * result of performing initial pruning if any.
146
     */
147
0
    prunestate = ExecInitPartitionExecPruning(&appendstate->ps,
148
0
                          list_length(node->appendplans),
149
0
                          node->part_prune_index,
150
0
                          node->apprelids,
151
0
                          &validsubplans);
152
0
    appendstate->as_prune_state = prunestate;
153
0
    nplans = bms_num_members(validsubplans);
154
155
    /*
156
     * When no run-time pruning is required and there's at least one
157
     * subplan, we can fill as_valid_subplans immediately, preventing
158
     * later calls to ExecFindMatchingSubPlans.
159
     */
160
0
    if (!prunestate->do_exec_prune && nplans > 0)
161
0
    {
162
0
      appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
163
0
      appendstate->as_valid_subplans_identified = true;
164
0
    }
165
0
  }
166
0
  else
167
0
  {
168
0
    nplans = list_length(node->appendplans);
169
170
    /*
171
     * When run-time partition pruning is not enabled we can just mark all
172
     * subplans as valid; they must also all be initialized.
173
     */
174
0
    Assert(nplans > 0);
175
0
    appendstate->as_valid_subplans = validsubplans =
176
0
      bms_add_range(NULL, 0, nplans - 1);
177
0
    appendstate->as_valid_subplans_identified = true;
178
0
    appendstate->as_prune_state = NULL;
179
0
  }
180
181
0
  appendplanstates = (PlanState **) palloc(nplans *
182
0
                       sizeof(PlanState *));
183
184
  /*
185
   * call ExecInitNode on each of the valid plans to be executed and save
186
   * the results into the appendplanstates array.
187
   *
188
   * While at it, find out the first valid partial plan.
189
   */
190
0
  j = 0;
191
0
  asyncplans = NULL;
192
0
  nasyncplans = 0;
193
0
  firstvalid = nplans;
194
0
  i = -1;
195
0
  while ((i = bms_next_member(validsubplans, i)) >= 0)
196
0
  {
197
0
    Plan     *initNode = (Plan *) list_nth(node->appendplans, i);
198
199
    /*
200
     * Record async subplans.  When executing EvalPlanQual, we treat them
201
     * as sync ones; don't do this when initializing an EvalPlanQual plan
202
     * tree.
203
     */
204
0
    if (initNode->async_capable && estate->es_epq_active == NULL)
205
0
    {
206
0
      asyncplans = bms_add_member(asyncplans, j);
207
0
      nasyncplans++;
208
0
    }
209
210
    /*
211
     * Record the lowest appendplans index which is a valid partial plan.
212
     */
213
0
    if (i >= node->first_partial_plan && j < firstvalid)
214
0
      firstvalid = j;
215
216
0
    appendplanstates[j++] = ExecInitNode(initNode, estate, eflags);
217
0
  }
218
219
0
  appendstate->as_first_partial_plan = firstvalid;
220
0
  appendstate->appendplans = appendplanstates;
221
0
  appendstate->as_nplans = nplans;
222
223
  /*
224
   * Initialize Append's result tuple type and slot.  If the child plans all
225
   * produce the same fixed slot type, we can use that slot type; otherwise
226
   * make a virtual slot.  (Note that the result slot itself is used only to
227
   * return a null tuple at end of execution; real tuples are returned to
228
   * the caller in the children's own result slots.  What we are doing here
229
   * is allowing the parent plan node to optimize if the Append will return
230
   * only one kind of slot.)
231
   */
232
0
  appendops = ExecGetCommonSlotOps(appendplanstates, j);
233
0
  if (appendops != NULL)
234
0
  {
235
0
    ExecInitResultTupleSlotTL(&appendstate->ps, appendops);
236
0
  }
237
0
  else
238
0
  {
239
0
    ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual);
240
    /* show that the output slot type is not fixed */
241
0
    appendstate->ps.resultopsset = true;
242
0
    appendstate->ps.resultopsfixed = false;
243
0
  }
244
245
  /* Initialize async state */
246
0
  appendstate->as_asyncplans = asyncplans;
247
0
  appendstate->as_nasyncplans = nasyncplans;
248
0
  appendstate->as_asyncrequests = NULL;
249
0
  appendstate->as_asyncresults = NULL;
250
0
  appendstate->as_nasyncresults = 0;
251
0
  appendstate->as_nasyncremain = 0;
252
0
  appendstate->as_needrequest = NULL;
253
0
  appendstate->as_eventset = NULL;
254
0
  appendstate->as_valid_asyncplans = NULL;
255
256
0
  if (nasyncplans > 0)
257
0
  {
258
0
    appendstate->as_asyncrequests = (AsyncRequest **)
259
0
      palloc0(nplans * sizeof(AsyncRequest *));
260
261
0
    i = -1;
262
0
    while ((i = bms_next_member(asyncplans, i)) >= 0)
263
0
    {
264
0
      AsyncRequest *areq;
265
266
0
      areq = palloc(sizeof(AsyncRequest));
267
0
      areq->requestor = (PlanState *) appendstate;
268
0
      areq->requestee = appendplanstates[i];
269
0
      areq->request_index = i;
270
0
      areq->callback_pending = false;
271
0
      areq->request_complete = false;
272
0
      areq->result = NULL;
273
274
0
      appendstate->as_asyncrequests[i] = areq;
275
0
    }
276
277
0
    appendstate->as_asyncresults = (TupleTableSlot **)
278
0
      palloc0(nasyncplans * sizeof(TupleTableSlot *));
279
280
0
    if (appendstate->as_valid_subplans_identified)
281
0
      classify_matching_subplans(appendstate);
282
0
  }
283
284
  /*
285
   * Miscellaneous initialization
286
   */
287
288
0
  appendstate->ps.ps_ProjInfo = NULL;
289
290
  /* For parallel query, this will be overridden later. */
291
0
  appendstate->choose_next_subplan = choose_next_subplan_locally;
292
293
0
  return appendstate;
294
0
}
295
296
/* ----------------------------------------------------------------
297
 *     ExecAppend
298
 *
299
 *    Handles iteration over multiple subplans.
300
 * ----------------------------------------------------------------
301
 */
302
static TupleTableSlot *
303
ExecAppend(PlanState *pstate)
304
0
{
305
0
  AppendState *node = castNode(AppendState, pstate);
306
0
  TupleTableSlot *result;
307
308
  /*
309
   * If this is the first call after Init or ReScan, we need to do the
310
   * initialization work.
311
   */
312
0
  if (!node->as_begun)
313
0
  {
314
0
    Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
315
0
    Assert(!node->as_syncdone);
316
317
    /* Nothing to do if there are no subplans */
318
0
    if (node->as_nplans == 0)
319
0
      return ExecClearTuple(node->ps.ps_ResultTupleSlot);
320
321
    /* If there are any async subplans, begin executing them. */
322
0
    if (node->as_nasyncplans > 0)
323
0
      ExecAppendAsyncBegin(node);
324
325
    /*
326
     * If no sync subplan has been chosen, we must choose one before
327
     * proceeding.
328
     */
329
0
    if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
330
0
      return ExecClearTuple(node->ps.ps_ResultTupleSlot);
331
332
0
    Assert(node->as_syncdone ||
333
0
         (node->as_whichplan >= 0 &&
334
0
        node->as_whichplan < node->as_nplans));
335
336
    /* And we're initialized. */
337
0
    node->as_begun = true;
338
0
  }
339
340
0
  for (;;)
341
0
  {
342
0
    PlanState  *subnode;
343
344
0
    CHECK_FOR_INTERRUPTS();
345
346
    /*
347
     * try to get a tuple from an async subplan if any
348
     */
349
0
    if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
350
0
    {
351
0
      if (ExecAppendAsyncGetNext(node, &result))
352
0
        return result;
353
0
      Assert(!node->as_syncdone);
354
0
      Assert(bms_is_empty(node->as_needrequest));
355
0
    }
356
357
    /*
358
     * figure out which sync subplan we are currently processing
359
     */
360
0
    Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
361
0
    subnode = node->appendplans[node->as_whichplan];
362
363
    /*
364
     * get a tuple from the subplan
365
     */
366
0
    result = ExecProcNode(subnode);
367
368
0
    if (!TupIsNull(result))
369
0
    {
370
      /*
371
       * If the subplan gave us something then return it as-is. We do
372
       * NOT make use of the result slot that was set up in
373
       * ExecInitAppend; there's no need for it.
374
       */
375
0
      return result;
376
0
    }
377
378
    /*
379
     * wait or poll for async events if any. We do this before checking
380
     * for the end of iteration, because it might drain the remaining
381
     * async subplans.
382
     */
383
0
    if (node->as_nasyncremain > 0)
384
0
      ExecAppendAsyncEventWait(node);
385
386
    /* choose new sync subplan; if no sync/async subplans, we're done */
387
0
    if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
388
0
      return ExecClearTuple(node->ps.ps_ResultTupleSlot);
389
0
  }
390
0
}
391
392
/* ----------------------------------------------------------------
393
 *    ExecEndAppend
394
 *
395
 *    Shuts down the subscans of the append node.
396
 *
397
 *    Returns nothing of interest.
398
 * ----------------------------------------------------------------
399
 */
400
void
401
ExecEndAppend(AppendState *node)
402
0
{
403
0
  PlanState **appendplans;
404
0
  int     nplans;
405
0
  int     i;
406
407
  /*
408
   * get information from the node
409
   */
410
0
  appendplans = node->appendplans;
411
0
  nplans = node->as_nplans;
412
413
  /*
414
   * shut down each of the subscans
415
   */
416
0
  for (i = 0; i < nplans; i++)
417
0
    ExecEndNode(appendplans[i]);
418
0
}
419
420
void
421
ExecReScanAppend(AppendState *node)
422
0
{
423
0
  int     nasyncplans = node->as_nasyncplans;
424
0
  int     i;
425
426
  /*
427
   * If any PARAM_EXEC Params used in pruning expressions have changed, then
428
   * we'd better unset the valid subplans so that they are reselected for
429
   * the new parameter values.
430
   */
431
0
  if (node->as_prune_state &&
432
0
    bms_overlap(node->ps.chgParam,
433
0
          node->as_prune_state->execparamids))
434
0
  {
435
0
    node->as_valid_subplans_identified = false;
436
0
    bms_free(node->as_valid_subplans);
437
0
    node->as_valid_subplans = NULL;
438
0
    bms_free(node->as_valid_asyncplans);
439
0
    node->as_valid_asyncplans = NULL;
440
0
  }
441
442
0
  for (i = 0; i < node->as_nplans; i++)
443
0
  {
444
0
    PlanState  *subnode = node->appendplans[i];
445
446
    /*
447
     * ExecReScan doesn't know about my subplans, so I have to do
448
     * changed-parameter signaling myself.
449
     */
450
0
    if (node->ps.chgParam != NULL)
451
0
      UpdateChangedParamSet(subnode, node->ps.chgParam);
452
453
    /*
454
     * If chgParam of subnode is not null then plan will be re-scanned by
455
     * first ExecProcNode or by first ExecAsyncRequest.
456
     */
457
0
    if (subnode->chgParam == NULL)
458
0
      ExecReScan(subnode);
459
0
  }
460
461
  /* Reset async state */
462
0
  if (nasyncplans > 0)
463
0
  {
464
0
    i = -1;
465
0
    while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
466
0
    {
467
0
      AsyncRequest *areq = node->as_asyncrequests[i];
468
469
0
      areq->callback_pending = false;
470
0
      areq->request_complete = false;
471
0
      areq->result = NULL;
472
0
    }
473
474
0
    node->as_nasyncresults = 0;
475
0
    node->as_nasyncremain = 0;
476
0
    bms_free(node->as_needrequest);
477
0
    node->as_needrequest = NULL;
478
0
  }
479
480
  /* Let choose_next_subplan_* function handle setting the first subplan */
481
0
  node->as_whichplan = INVALID_SUBPLAN_INDEX;
482
0
  node->as_syncdone = false;
483
0
  node->as_begun = false;
484
0
}
485
486
/* ----------------------------------------------------------------
487
 *            Parallel Append Support
488
 * ----------------------------------------------------------------
489
 */
490
491
/* ----------------------------------------------------------------
492
 *    ExecAppendEstimate
493
 *
494
 *    Compute the amount of space we'll need in the parallel
495
 *    query DSM, and inform pcxt->estimator about our needs.
496
 * ----------------------------------------------------------------
497
 */
498
void
499
ExecAppendEstimate(AppendState *node,
500
           ParallelContext *pcxt)
501
0
{
502
0
  node->pstate_len =
503
0
    add_size(offsetof(ParallelAppendState, pa_finished),
504
0
         sizeof(bool) * node->as_nplans);
505
506
0
  shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len);
507
0
  shm_toc_estimate_keys(&pcxt->estimator, 1);
508
0
}
509
510
511
/* ----------------------------------------------------------------
512
 *    ExecAppendInitializeDSM
513
 *
514
 *    Set up shared state for Parallel Append.
515
 * ----------------------------------------------------------------
516
 */
517
void
518
ExecAppendInitializeDSM(AppendState *node,
519
            ParallelContext *pcxt)
520
0
{
521
0
  ParallelAppendState *pstate;
522
523
0
  pstate = shm_toc_allocate(pcxt->toc, node->pstate_len);
524
0
  memset(pstate, 0, node->pstate_len);
525
0
  LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND);
526
0
  shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate);
527
528
0
  node->as_pstate = pstate;
529
0
  node->choose_next_subplan = choose_next_subplan_for_leader;
530
0
}
531
532
/* ----------------------------------------------------------------
533
 *    ExecAppendReInitializeDSM
534
 *
535
 *    Reset shared state before beginning a fresh scan.
536
 * ----------------------------------------------------------------
537
 */
538
void
539
ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt)
540
0
{
541
0
  ParallelAppendState *pstate = node->as_pstate;
542
543
0
  pstate->pa_next_plan = 0;
544
0
  memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans);
545
0
}
546
547
/* ----------------------------------------------------------------
548
 *    ExecAppendInitializeWorker
549
 *
550
 *    Copy relevant information from TOC into planstate, and initialize
551
 *    whatever is required to choose and execute the optimal subplan.
552
 * ----------------------------------------------------------------
553
 */
554
void
555
ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
556
0
{
557
0
  node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false);
558
0
  node->choose_next_subplan = choose_next_subplan_for_worker;
559
0
}
560
561
/* ----------------------------------------------------------------
562
 *    choose_next_subplan_locally
563
 *
564
 *    Choose next sync subplan for a non-parallel-aware Append,
565
 *    returning false if there are no more.
566
 * ----------------------------------------------------------------
567
 */
568
static bool
569
choose_next_subplan_locally(AppendState *node)
570
0
{
571
0
  int     whichplan = node->as_whichplan;
572
0
  int     nextplan;
573
574
  /* We should never be called when there are no subplans */
575
0
  Assert(node->as_nplans > 0);
576
577
  /* Nothing to do if syncdone */
578
0
  if (node->as_syncdone)
579
0
    return false;
580
581
  /*
582
   * If first call then have the bms member function choose the first valid
583
   * sync subplan by initializing whichplan to -1.  If there happen to be no
584
   * valid sync subplans then the bms member function will handle that by
585
   * returning a negative number which will allow us to exit returning a
586
   * false value.
587
   */
588
0
  if (whichplan == INVALID_SUBPLAN_INDEX)
589
0
  {
590
0
    if (node->as_nasyncplans > 0)
591
0
    {
592
      /* We'd have filled as_valid_subplans already */
593
0
      Assert(node->as_valid_subplans_identified);
594
0
    }
595
0
    else if (!node->as_valid_subplans_identified)
596
0
    {
597
0
      node->as_valid_subplans =
598
0
        ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
599
0
      node->as_valid_subplans_identified = true;
600
0
    }
601
602
0
    whichplan = -1;
603
0
  }
604
605
  /* Ensure whichplan is within the expected range */
606
0
  Assert(whichplan >= -1 && whichplan <= node->as_nplans);
607
608
0
  if (ScanDirectionIsForward(node->ps.state->es_direction))
609
0
    nextplan = bms_next_member(node->as_valid_subplans, whichplan);
610
0
  else
611
0
    nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
612
613
0
  if (nextplan < 0)
614
0
  {
615
    /* Set as_syncdone if in async mode */
616
0
    if (node->as_nasyncplans > 0)
617
0
      node->as_syncdone = true;
618
0
    return false;
619
0
  }
620
621
0
  node->as_whichplan = nextplan;
622
623
0
  return true;
624
0
}
625
626
/* ----------------------------------------------------------------
627
 *    choose_next_subplan_for_leader
628
 *
629
 *      Try to pick a plan which doesn't commit us to doing much
630
 *      work locally, so that as much work as possible is done in
631
 *      the workers.  Cheapest subplans are at the end.
632
 * ----------------------------------------------------------------
633
 */
634
static bool
635
choose_next_subplan_for_leader(AppendState *node)
636
0
{
637
0
  ParallelAppendState *pstate = node->as_pstate;
638
639
  /* Backward scan is not supported by parallel-aware plans */
640
0
  Assert(ScanDirectionIsForward(node->ps.state->es_direction));
641
642
  /* We should never be called when there are no subplans */
643
0
  Assert(node->as_nplans > 0);
644
645
0
  LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
646
647
0
  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
648
0
  {
649
    /* Mark just-completed subplan as finished. */
650
0
    node->as_pstate->pa_finished[node->as_whichplan] = true;
651
0
  }
652
0
  else
653
0
  {
654
    /* Start with last subplan. */
655
0
    node->as_whichplan = node->as_nplans - 1;
656
657
    /*
658
     * If we've yet to determine the valid subplans then do so now.  If
659
     * run-time pruning is disabled then the valid subplans will always be
660
     * set to all subplans.
661
     */
662
0
    if (!node->as_valid_subplans_identified)
663
0
    {
664
0
      node->as_valid_subplans =
665
0
        ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
666
0
      node->as_valid_subplans_identified = true;
667
668
      /*
669
       * Mark each invalid plan as finished to allow the loop below to
670
       * select the first valid subplan.
671
       */
672
0
      mark_invalid_subplans_as_finished(node);
673
0
    }
674
0
  }
675
676
  /* Loop until we find a subplan to execute. */
677
0
  while (pstate->pa_finished[node->as_whichplan])
678
0
  {
679
0
    if (node->as_whichplan == 0)
680
0
    {
681
0
      pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
682
0
      node->as_whichplan = INVALID_SUBPLAN_INDEX;
683
0
      LWLockRelease(&pstate->pa_lock);
684
0
      return false;
685
0
    }
686
687
    /*
688
     * We needn't pay attention to as_valid_subplans here as all invalid
689
     * plans have been marked as finished.
690
     */
691
0
    node->as_whichplan--;
692
0
  }
693
694
  /* If non-partial, immediately mark as finished. */
695
0
  if (node->as_whichplan < node->as_first_partial_plan)
696
0
    node->as_pstate->pa_finished[node->as_whichplan] = true;
697
698
0
  LWLockRelease(&pstate->pa_lock);
699
700
0
  return true;
701
0
}
702
703
/* ----------------------------------------------------------------
704
 *    choose_next_subplan_for_worker
705
 *
706
 *    Choose next subplan for a parallel-aware Append, returning
707
 *    false if there are no more.
708
 *
709
 *    We start from the first plan and advance through the list;
710
 *    when we get back to the end, we loop back to the first
711
 *    partial plan.  This assigns the non-partial plans first in
712
 *    order of descending cost and then spreads out the workers
713
 *    as evenly as possible across the remaining partial plans.
714
 * ----------------------------------------------------------------
715
 */
716
static bool
717
choose_next_subplan_for_worker(AppendState *node)
718
0
{
719
0
  ParallelAppendState *pstate = node->as_pstate;
720
721
  /* Backward scan is not supported by parallel-aware plans */
722
0
  Assert(ScanDirectionIsForward(node->ps.state->es_direction));
723
724
  /* We should never be called when there are no subplans */
725
0
  Assert(node->as_nplans > 0);
726
727
0
  LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE);
728
729
  /* Mark just-completed subplan as finished. */
730
0
  if (node->as_whichplan != INVALID_SUBPLAN_INDEX)
731
0
    node->as_pstate->pa_finished[node->as_whichplan] = true;
732
733
  /*
734
   * If we've yet to determine the valid subplans then do so now.  If
735
   * run-time pruning is disabled then the valid subplans will always be set
736
   * to all subplans.
737
   */
738
0
  else if (!node->as_valid_subplans_identified)
739
0
  {
740
0
    node->as_valid_subplans =
741
0
      ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
742
0
    node->as_valid_subplans_identified = true;
743
744
0
    mark_invalid_subplans_as_finished(node);
745
0
  }
746
747
  /* If all the plans are already done, we have nothing to do */
748
0
  if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX)
749
0
  {
750
0
    LWLockRelease(&pstate->pa_lock);
751
0
    return false;
752
0
  }
753
754
  /* Save the plan from which we are starting the search. */
755
0
  node->as_whichplan = pstate->pa_next_plan;
756
757
  /* Loop until we find a valid subplan to execute. */
758
0
  while (pstate->pa_finished[pstate->pa_next_plan])
759
0
  {
760
0
    int     nextplan;
761
762
0
    nextplan = bms_next_member(node->as_valid_subplans,
763
0
                   pstate->pa_next_plan);
764
0
    if (nextplan >= 0)
765
0
    {
766
      /* Advance to the next valid plan. */
767
0
      pstate->pa_next_plan = nextplan;
768
0
    }
769
0
    else if (node->as_whichplan > node->as_first_partial_plan)
770
0
    {
771
      /*
772
       * Try looping back to the first valid partial plan, if there is
773
       * one.  If there isn't, arrange to bail out below.
774
       */
775
0
      nextplan = bms_next_member(node->as_valid_subplans,
776
0
                     node->as_first_partial_plan - 1);
777
0
      pstate->pa_next_plan =
778
0
        nextplan < 0 ? node->as_whichplan : nextplan;
779
0
    }
780
0
    else
781
0
    {
782
      /*
783
       * At last plan, and either there are no partial plans or we've
784
       * tried them all.  Arrange to bail out.
785
       */
786
0
      pstate->pa_next_plan = node->as_whichplan;
787
0
    }
788
789
0
    if (pstate->pa_next_plan == node->as_whichplan)
790
0
    {
791
      /* We've tried everything! */
792
0
      pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
793
0
      LWLockRelease(&pstate->pa_lock);
794
0
      return false;
795
0
    }
796
0
  }
797
798
  /* Pick the plan we found, and advance pa_next_plan one more time. */
799
0
  node->as_whichplan = pstate->pa_next_plan;
800
0
  pstate->pa_next_plan = bms_next_member(node->as_valid_subplans,
801
0
                       pstate->pa_next_plan);
802
803
  /*
804
   * If there are no more valid plans then try setting the next plan to the
805
   * first valid partial plan.
806
   */
807
0
  if (pstate->pa_next_plan < 0)
808
0
  {
809
0
    int     nextplan = bms_next_member(node->as_valid_subplans,
810
0
                         node->as_first_partial_plan - 1);
811
812
0
    if (nextplan >= 0)
813
0
      pstate->pa_next_plan = nextplan;
814
0
    else
815
0
    {
816
      /*
817
       * There are no valid partial plans, and we already chose the last
818
       * non-partial plan; so flag that there's nothing more for our
819
       * fellow workers to do.
820
       */
821
0
      pstate->pa_next_plan = INVALID_SUBPLAN_INDEX;
822
0
    }
823
0
  }
824
825
  /* If non-partial, immediately mark as finished. */
826
0
  if (node->as_whichplan < node->as_first_partial_plan)
827
0
    node->as_pstate->pa_finished[node->as_whichplan] = true;
828
829
0
  LWLockRelease(&pstate->pa_lock);
830
831
0
  return true;
832
0
}
833
834
/*
835
 * mark_invalid_subplans_as_finished
836
 *    Marks the ParallelAppendState's pa_finished as true for each invalid
837
 *    subplan.
838
 *
839
 * This function should only be called for parallel Append with run-time
840
 * pruning enabled.
841
 */
842
static void
843
mark_invalid_subplans_as_finished(AppendState *node)
844
0
{
845
0
  int     i;
846
847
  /* Only valid to call this while in parallel Append mode */
848
0
  Assert(node->as_pstate);
849
850
  /* Shouldn't have been called when run-time pruning is not enabled */
851
0
  Assert(node->as_prune_state);
852
853
  /* Nothing to do if all plans are valid */
854
0
  if (bms_num_members(node->as_valid_subplans) == node->as_nplans)
855
0
    return;
856
857
  /* Mark all non-valid plans as finished */
858
0
  for (i = 0; i < node->as_nplans; i++)
859
0
  {
860
0
    if (!bms_is_member(i, node->as_valid_subplans))
861
0
      node->as_pstate->pa_finished[i] = true;
862
0
  }
863
0
}
864
865
/* ----------------------------------------------------------------
866
 *            Asynchronous Append Support
867
 * ----------------------------------------------------------------
868
 */
869
870
/* ----------------------------------------------------------------
871
 *    ExecAppendAsyncBegin
872
 *
873
 *    Begin executing designed async-capable subplans.
874
 * ----------------------------------------------------------------
875
 */
876
static void
877
ExecAppendAsyncBegin(AppendState *node)
878
0
{
879
0
  int     i;
880
881
  /* Backward scan is not supported by async-aware Appends. */
882
0
  Assert(ScanDirectionIsForward(node->ps.state->es_direction));
883
884
  /* We should never be called when there are no subplans */
885
0
  Assert(node->as_nplans > 0);
886
887
  /* We should never be called when there are no async subplans. */
888
0
  Assert(node->as_nasyncplans > 0);
889
890
  /* If we've yet to determine the valid subplans then do so now. */
891
0
  if (!node->as_valid_subplans_identified)
892
0
  {
893
0
    node->as_valid_subplans =
894
0
      ExecFindMatchingSubPlans(node->as_prune_state, false, NULL);
895
0
    node->as_valid_subplans_identified = true;
896
897
0
    classify_matching_subplans(node);
898
0
  }
899
900
  /* Initialize state variables. */
901
0
  node->as_syncdone = bms_is_empty(node->as_valid_subplans);
902
0
  node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans);
903
904
  /* Nothing to do if there are no valid async subplans. */
905
0
  if (node->as_nasyncremain == 0)
906
0
    return;
907
908
  /* Make a request for each of the valid async subplans. */
909
0
  i = -1;
910
0
  while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
911
0
  {
912
0
    AsyncRequest *areq = node->as_asyncrequests[i];
913
914
0
    Assert(areq->request_index == i);
915
0
    Assert(!areq->callback_pending);
916
917
    /* Do the actual work. */
918
0
    ExecAsyncRequest(areq);
919
0
  }
920
0
}
921
922
/* ----------------------------------------------------------------
923
 *    ExecAppendAsyncGetNext
924
 *
925
 *    Get the next tuple from any of the asynchronous subplans.
926
 * ----------------------------------------------------------------
927
 */
928
static bool
929
ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
930
0
{
931
0
  *result = NULL;
932
933
  /* We should never be called when there are no valid async subplans. */
934
0
  Assert(node->as_nasyncremain > 0);
935
936
  /* Request a tuple asynchronously. */
937
0
  if (ExecAppendAsyncRequest(node, result))
938
0
    return true;
939
940
0
  while (node->as_nasyncremain > 0)
941
0
  {
942
0
    CHECK_FOR_INTERRUPTS();
943
944
    /* Wait or poll for async events. */
945
0
    ExecAppendAsyncEventWait(node);
946
947
    /* Request a tuple asynchronously. */
948
0
    if (ExecAppendAsyncRequest(node, result))
949
0
      return true;
950
951
    /* Break from loop if there's any sync subplan that isn't complete. */
952
0
    if (!node->as_syncdone)
953
0
      break;
954
0
  }
955
956
  /*
957
   * If all sync subplans are complete, we're totally done scanning the
958
   * given node.  Otherwise, we're done with the asynchronous stuff but must
959
   * continue scanning the sync subplans.
960
   */
961
0
  if (node->as_syncdone)
962
0
  {
963
0
    Assert(node->as_nasyncremain == 0);
964
0
    *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
965
0
    return true;
966
0
  }
967
968
0
  return false;
969
0
}
970
971
/* ----------------------------------------------------------------
972
 *    ExecAppendAsyncRequest
973
 *
974
 *    Request a tuple asynchronously.
975
 * ----------------------------------------------------------------
976
 */
977
static bool
978
ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
979
0
{
980
0
  Bitmapset  *needrequest;
981
0
  int     i;
982
983
  /* Nothing to do if there are no async subplans needing a new request. */
984
0
  if (bms_is_empty(node->as_needrequest))
985
0
  {
986
0
    Assert(node->as_nasyncresults == 0);
987
0
    return false;
988
0
  }
989
990
  /*
991
   * If there are any asynchronously-generated results that have not yet
992
   * been returned, we have nothing to do; just return one of them.
993
   */
994
0
  if (node->as_nasyncresults > 0)
995
0
  {
996
0
    --node->as_nasyncresults;
997
0
    *result = node->as_asyncresults[node->as_nasyncresults];
998
0
    return true;
999
0
  }
1000
1001
  /* Make a new request for each of the async subplans that need it. */
1002
0
  needrequest = node->as_needrequest;
1003
0
  node->as_needrequest = NULL;
1004
0
  i = -1;
1005
0
  while ((i = bms_next_member(needrequest, i)) >= 0)
1006
0
  {
1007
0
    AsyncRequest *areq = node->as_asyncrequests[i];
1008
1009
    /* Do the actual work. */
1010
0
    ExecAsyncRequest(areq);
1011
0
  }
1012
0
  bms_free(needrequest);
1013
1014
  /* Return one of the asynchronously-generated results if any. */
1015
0
  if (node->as_nasyncresults > 0)
1016
0
  {
1017
0
    --node->as_nasyncresults;
1018
0
    *result = node->as_asyncresults[node->as_nasyncresults];
1019
0
    return true;
1020
0
  }
1021
1022
0
  return false;
1023
0
}
1024
1025
/* ----------------------------------------------------------------
1026
 *    ExecAppendAsyncEventWait
1027
 *
1028
 *    Wait or poll for file descriptor events and fire callbacks.
1029
 * ----------------------------------------------------------------
1030
 */
1031
static void
1032
ExecAppendAsyncEventWait(AppendState *node)
1033
0
{
1034
0
  int     nevents = node->as_nasyncplans + 2;
1035
0
  long    timeout = node->as_syncdone ? -1 : 0;
1036
0
  WaitEvent occurred_event[EVENT_BUFFER_SIZE];
1037
0
  int     noccurred;
1038
0
  int     i;
1039
1040
  /* We should never be called when there are no valid async subplans. */
1041
0
  Assert(node->as_nasyncremain > 0);
1042
1043
0
  Assert(node->as_eventset == NULL);
1044
0
  node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents);
1045
0
  AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
1046
0
            NULL, NULL);
1047
1048
  /* Give each waiting subplan a chance to add an event. */
1049
0
  i = -1;
1050
0
  while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
1051
0
  {
1052
0
    AsyncRequest *areq = node->as_asyncrequests[i];
1053
1054
0
    if (areq->callback_pending)
1055
0
      ExecAsyncConfigureWait(areq);
1056
0
  }
1057
1058
  /*
1059
   * No need for further processing if none of the subplans configured any
1060
   * events.
1061
   */
1062
0
  if (GetNumRegisteredWaitEvents(node->as_eventset) == 1)
1063
0
  {
1064
0
    FreeWaitEventSet(node->as_eventset);
1065
0
    node->as_eventset = NULL;
1066
0
    return;
1067
0
  }
1068
1069
  /*
1070
   * Add the process latch to the set, so that we wake up to process the
1071
   * standard interrupts with CHECK_FOR_INTERRUPTS().
1072
   *
1073
   * NOTE: For historical reasons, it's important that this is added to the
1074
   * WaitEventSet after the ExecAsyncConfigureWait() calls.  Namely,
1075
   * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if
1076
   * any other events are in the set.  That's a poor design, it's
1077
   * questionable for postgres_fdw to be doing that in the first place, but
1078
   * we cannot change it now.  The pattern has possibly been copied to other
1079
   * extensions too.
1080
   */
1081
0
  AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET,
1082
0
            MyLatch, NULL);
1083
1084
  /* Return at most EVENT_BUFFER_SIZE events in one call. */
1085
0
  if (nevents > EVENT_BUFFER_SIZE)
1086
0
    nevents = EVENT_BUFFER_SIZE;
1087
1088
  /*
1089
   * If the timeout is -1, wait until at least one event occurs.  If the
1090
   * timeout is 0, poll for events, but do not wait at all.
1091
   */
1092
0
  noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
1093
0
                 nevents, WAIT_EVENT_APPEND_READY);
1094
0
  FreeWaitEventSet(node->as_eventset);
1095
0
  node->as_eventset = NULL;
1096
0
  if (noccurred == 0)
1097
0
    return;
1098
1099
  /* Deliver notifications. */
1100
0
  for (i = 0; i < noccurred; i++)
1101
0
  {
1102
0
    WaitEvent  *w = &occurred_event[i];
1103
1104
    /*
1105
     * Each waiting subplan should have registered its wait event with
1106
     * user_data pointing back to its AsyncRequest.
1107
     */
1108
0
    if ((w->events & WL_SOCKET_READABLE) != 0)
1109
0
    {
1110
0
      AsyncRequest *areq = (AsyncRequest *) w->user_data;
1111
1112
0
      if (areq->callback_pending)
1113
0
      {
1114
        /*
1115
         * Mark it as no longer needing a callback.  We must do this
1116
         * before dispatching the callback in case the callback resets
1117
         * the flag.
1118
         */
1119
0
        areq->callback_pending = false;
1120
1121
        /* Do the actual work. */
1122
0
        ExecAsyncNotify(areq);
1123
0
      }
1124
0
    }
1125
1126
    /* Handle standard interrupts */
1127
0
    if ((w->events & WL_LATCH_SET) != 0)
1128
0
    {
1129
0
      ResetLatch(MyLatch);
1130
0
      CHECK_FOR_INTERRUPTS();
1131
0
    }
1132
0
  }
1133
0
}
1134
1135
/* ----------------------------------------------------------------
1136
 *    ExecAsyncAppendResponse
1137
 *
1138
 *    Receive a response from an asynchronous request we made.
1139
 * ----------------------------------------------------------------
1140
 */
1141
void
1142
ExecAsyncAppendResponse(AsyncRequest *areq)
1143
0
{
1144
0
  AppendState *node = (AppendState *) areq->requestor;
1145
0
  TupleTableSlot *slot = areq->result;
1146
1147
  /* The result should be a TupleTableSlot or NULL. */
1148
0
  Assert(slot == NULL || IsA(slot, TupleTableSlot));
1149
1150
  /* Nothing to do if the request is pending. */
1151
0
  if (!areq->request_complete)
1152
0
  {
1153
    /* The request would have been pending for a callback. */
1154
0
    Assert(areq->callback_pending);
1155
0
    return;
1156
0
  }
1157
1158
  /* If the result is NULL or an empty slot, there's nothing more to do. */
1159
0
  if (TupIsNull(slot))
1160
0
  {
1161
    /* The ending subplan wouldn't have been pending for a callback. */
1162
0
    Assert(!areq->callback_pending);
1163
0
    --node->as_nasyncremain;
1164
0
    return;
1165
0
  }
1166
1167
  /* Save result so we can return it. */
1168
0
  Assert(node->as_nasyncresults < node->as_nasyncplans);
1169
0
  node->as_asyncresults[node->as_nasyncresults++] = slot;
1170
1171
  /*
1172
   * Mark the subplan that returned a result as ready for a new request.  We
1173
   * don't launch another one here immediately because it might complete.
1174
   */
1175
0
  node->as_needrequest = bms_add_member(node->as_needrequest,
1176
0
                      areq->request_index);
1177
0
}
1178
1179
/* ----------------------------------------------------------------
1180
 *    classify_matching_subplans
1181
 *
1182
 *    Classify the node's as_valid_subplans into sync ones and
1183
 *    async ones, adjust it to contain sync ones only, and save
1184
 *    async ones in the node's as_valid_asyncplans.
1185
 * ----------------------------------------------------------------
1186
 */
1187
static void
1188
classify_matching_subplans(AppendState *node)
1189
0
{
1190
0
  Bitmapset  *valid_asyncplans;
1191
1192
0
  Assert(node->as_valid_subplans_identified);
1193
0
  Assert(node->as_valid_asyncplans == NULL);
1194
1195
  /* Nothing to do if there are no valid subplans. */
1196
0
  if (bms_is_empty(node->as_valid_subplans))
1197
0
  {
1198
0
    node->as_syncdone = true;
1199
0
    node->as_nasyncremain = 0;
1200
0
    return;
1201
0
  }
1202
1203
  /* Nothing to do if there are no valid async subplans. */
1204
0
  if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
1205
0
  {
1206
0
    node->as_nasyncremain = 0;
1207
0
    return;
1208
0
  }
1209
1210
  /* Get valid async subplans. */
1211
0
  valid_asyncplans = bms_intersect(node->as_asyncplans,
1212
0
                   node->as_valid_subplans);
1213
1214
  /* Adjust the valid subplans to contain sync subplans only. */
1215
0
  node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
1216
0
                        valid_asyncplans);
1217
1218
  /* Save valid async subplans. */
1219
0
  node->as_valid_asyncplans = valid_asyncplans;
1220
0
}