Coverage Report

Created: 2025-06-15 06:31

/src/postgres/src/backend/executor/nodeMergeAppend.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * nodeMergeAppend.c
4
 *    routines to handle MergeAppend nodes.
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 *
10
 * IDENTIFICATION
11
 *    src/backend/executor/nodeMergeAppend.c
12
 *
13
 *-------------------------------------------------------------------------
14
 */
15
/* INTERFACE ROUTINES
16
 *    ExecInitMergeAppend   - initialize the MergeAppend node
17
 *    ExecMergeAppend     - retrieve the next tuple from the node
18
 *    ExecEndMergeAppend    - shut down the MergeAppend node
19
 *    ExecReScanMergeAppend - rescan the MergeAppend node
20
 *
21
 *   NOTES
22
 *    A MergeAppend node contains a list of one or more subplans.
23
 *    These are each expected to deliver tuples that are sorted according
24
 *    to a common sort key.  The MergeAppend node merges these streams
25
 *    to produce output sorted the same way.
26
 *
27
 *    MergeAppend nodes don't make use of their left and right
28
 *    subtrees, rather they maintain a list of subplans so
29
 *    a typical MergeAppend node looks like this in the plan tree:
30
 *
31
 *           ...
32
 *           /
33
 *        MergeAppend---+------+------+--- nil
34
 *        / \     |    |    |
35
 *        nil nil    ...    ...    ...
36
 *                 subplans
37
 */
38
39
#include "postgres.h"
40
41
#include "executor/executor.h"
42
#include "executor/execPartition.h"
43
#include "executor/nodeMergeAppend.h"
44
#include "lib/binaryheap.h"
45
#include "miscadmin.h"
46
47
/*
48
 * We have one slot for each item in the heap array.  We use SlotNumber
49
 * to store slot indexes.  This doesn't actually provide any formal
50
 * type-safety, but it makes the code more self-documenting.
51
 */
52
typedef int32 SlotNumber;
53
54
static TupleTableSlot *ExecMergeAppend(PlanState *pstate);
55
static int  heap_compare_slots(Datum a, Datum b, void *arg);
56
57
58
/* ----------------------------------------------------------------
59
 *    ExecInitMergeAppend
60
 *
61
 *    Begin all of the subscans of the MergeAppend node.
62
 * ----------------------------------------------------------------
63
 */
64
MergeAppendState *
65
ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags)
66
0
{
67
0
  MergeAppendState *mergestate = makeNode(MergeAppendState);
68
0
  PlanState **mergeplanstates;
69
0
  const TupleTableSlotOps *mergeops;
70
0
  Bitmapset  *validsubplans;
71
0
  int     nplans;
72
0
  int     i,
73
0
        j;
74
75
  /* check for unsupported flags */
76
0
  Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK)));
77
78
  /*
79
   * create new MergeAppendState for our node
80
   */
81
0
  mergestate->ps.plan = (Plan *) node;
82
0
  mergestate->ps.state = estate;
83
0
  mergestate->ps.ExecProcNode = ExecMergeAppend;
84
85
  /* If run-time partition pruning is enabled, then set that up now */
86
0
  if (node->part_prune_index >= 0)
87
0
  {
88
0
    PartitionPruneState *prunestate;
89
90
    /*
91
     * Set up pruning data structure.  This also initializes the set of
92
     * subplans to initialize (validsubplans) by taking into account the
93
     * result of performing initial pruning if any.
94
     */
95
0
    prunestate = ExecInitPartitionExecPruning(&mergestate->ps,
96
0
                          list_length(node->mergeplans),
97
0
                          node->part_prune_index,
98
0
                          node->apprelids,
99
0
                          &validsubplans);
100
0
    mergestate->ms_prune_state = prunestate;
101
0
    nplans = bms_num_members(validsubplans);
102
103
    /*
104
     * When no run-time pruning is required and there's at least one
105
     * subplan, we can fill ms_valid_subplans immediately, preventing
106
     * later calls to ExecFindMatchingSubPlans.
107
     */
108
0
    if (!prunestate->do_exec_prune && nplans > 0)
109
0
      mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1);
110
0
  }
111
0
  else
112
0
  {
113
0
    nplans = list_length(node->mergeplans);
114
115
    /*
116
     * When run-time partition pruning is not enabled we can just mark all
117
     * subplans as valid; they must also all be initialized.
118
     */
119
0
    Assert(nplans > 0);
120
0
    mergestate->ms_valid_subplans = validsubplans =
121
0
      bms_add_range(NULL, 0, nplans - 1);
122
0
    mergestate->ms_prune_state = NULL;
123
0
  }
124
125
0
  mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *));
126
0
  mergestate->mergeplans = mergeplanstates;
127
0
  mergestate->ms_nplans = nplans;
128
129
0
  mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans);
130
0
  mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots,
131
0
                        mergestate);
132
133
  /*
134
   * call ExecInitNode on each of the valid plans to be executed and save
135
   * the results into the mergeplanstates array.
136
   */
137
0
  j = 0;
138
0
  i = -1;
139
0
  while ((i = bms_next_member(validsubplans, i)) >= 0)
140
0
  {
141
0
    Plan     *initNode = (Plan *) list_nth(node->mergeplans, i);
142
143
0
    mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags);
144
0
  }
145
146
  /*
147
   * Initialize MergeAppend's result tuple type and slot.  If the child
148
   * plans all produce the same fixed slot type, we can use that slot type;
149
   * otherwise make a virtual slot.  (Note that the result slot itself is
150
   * used only to return a null tuple at end of execution; real tuples are
151
   * returned to the caller in the children's own result slots.  What we are
152
   * doing here is allowing the parent plan node to optimize if the
153
   * MergeAppend will return only one kind of slot.)
154
   */
155
0
  mergeops = ExecGetCommonSlotOps(mergeplanstates, j);
156
0
  if (mergeops != NULL)
157
0
  {
158
0
    ExecInitResultTupleSlotTL(&mergestate->ps, mergeops);
159
0
  }
160
0
  else
161
0
  {
162
0
    ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual);
163
    /* show that the output slot type is not fixed */
164
0
    mergestate->ps.resultopsset = true;
165
0
    mergestate->ps.resultopsfixed = false;
166
0
  }
167
168
  /*
169
   * Miscellaneous initialization
170
   */
171
0
  mergestate->ps.ps_ProjInfo = NULL;
172
173
  /*
174
   * initialize sort-key information
175
   */
176
0
  mergestate->ms_nkeys = node->numCols;
177
0
  mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols);
178
179
0
  for (i = 0; i < node->numCols; i++)
180
0
  {
181
0
    SortSupport sortKey = mergestate->ms_sortkeys + i;
182
183
0
    sortKey->ssup_cxt = CurrentMemoryContext;
184
0
    sortKey->ssup_collation = node->collations[i];
185
0
    sortKey->ssup_nulls_first = node->nullsFirst[i];
186
0
    sortKey->ssup_attno = node->sortColIdx[i];
187
188
    /*
189
     * It isn't feasible to perform abbreviated key conversion, since
190
     * tuples are pulled into mergestate's binary heap as needed.  It
191
     * would likely be counter-productive to convert tuples into an
192
     * abbreviated representation as they're pulled up, so opt out of that
193
     * additional optimization entirely.
194
     */
195
0
    sortKey->abbreviate = false;
196
197
0
    PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey);
198
0
  }
199
200
  /*
201
   * initialize to show we have not run the subplans yet
202
   */
203
0
  mergestate->ms_initialized = false;
204
205
0
  return mergestate;
206
0
}
207
208
/* ----------------------------------------------------------------
209
 *     ExecMergeAppend
210
 *
211
 *    Handles iteration over multiple subplans.
212
 * ----------------------------------------------------------------
213
 */
214
static TupleTableSlot *
215
ExecMergeAppend(PlanState *pstate)
216
0
{
217
0
  MergeAppendState *node = castNode(MergeAppendState, pstate);
218
0
  TupleTableSlot *result;
219
0
  SlotNumber  i;
220
221
0
  CHECK_FOR_INTERRUPTS();
222
223
0
  if (!node->ms_initialized)
224
0
  {
225
    /* Nothing to do if all subplans were pruned */
226
0
    if (node->ms_nplans == 0)
227
0
      return ExecClearTuple(node->ps.ps_ResultTupleSlot);
228
229
    /*
230
     * If we've yet to determine the valid subplans then do so now.  If
231
     * run-time pruning is disabled then the valid subplans will always be
232
     * set to all subplans.
233
     */
234
0
    if (node->ms_valid_subplans == NULL)
235
0
      node->ms_valid_subplans =
236
0
        ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL);
237
238
    /*
239
     * First time through: pull the first tuple from each valid subplan,
240
     * and set up the heap.
241
     */
242
0
    i = -1;
243
0
    while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0)
244
0
    {
245
0
      node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
246
0
      if (!TupIsNull(node->ms_slots[i]))
247
0
        binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i));
248
0
    }
249
0
    binaryheap_build(node->ms_heap);
250
0
    node->ms_initialized = true;
251
0
  }
252
0
  else
253
0
  {
254
    /*
255
     * Otherwise, pull the next tuple from whichever subplan we returned
256
     * from last time, and reinsert the subplan index into the heap,
257
     * because it might now compare differently against the existing
258
     * elements of the heap.  (We could perhaps simplify the logic a bit
259
     * by doing this before returning from the prior call, but it's better
260
     * to not pull tuples until necessary.)
261
     */
262
0
    i = DatumGetInt32(binaryheap_first(node->ms_heap));
263
0
    node->ms_slots[i] = ExecProcNode(node->mergeplans[i]);
264
0
    if (!TupIsNull(node->ms_slots[i]))
265
0
      binaryheap_replace_first(node->ms_heap, Int32GetDatum(i));
266
0
    else
267
0
      (void) binaryheap_remove_first(node->ms_heap);
268
0
  }
269
270
0
  if (binaryheap_empty(node->ms_heap))
271
0
  {
272
    /* All the subplans are exhausted, and so is the heap */
273
0
    result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
274
0
  }
275
0
  else
276
0
  {
277
0
    i = DatumGetInt32(binaryheap_first(node->ms_heap));
278
0
    result = node->ms_slots[i];
279
0
  }
280
281
0
  return result;
282
0
}
283
284
/*
285
 * Compare the tuples in the two given slots.
286
 */
287
static int32
288
heap_compare_slots(Datum a, Datum b, void *arg)
289
0
{
290
0
  MergeAppendState *node = (MergeAppendState *) arg;
291
0
  SlotNumber  slot1 = DatumGetInt32(a);
292
0
  SlotNumber  slot2 = DatumGetInt32(b);
293
294
0
  TupleTableSlot *s1 = node->ms_slots[slot1];
295
0
  TupleTableSlot *s2 = node->ms_slots[slot2];
296
0
  int     nkey;
297
298
0
  Assert(!TupIsNull(s1));
299
0
  Assert(!TupIsNull(s2));
300
301
0
  for (nkey = 0; nkey < node->ms_nkeys; nkey++)
302
0
  {
303
0
    SortSupport sortKey = node->ms_sortkeys + nkey;
304
0
    AttrNumber  attno = sortKey->ssup_attno;
305
0
    Datum   datum1,
306
0
          datum2;
307
0
    bool    isNull1,
308
0
          isNull2;
309
0
    int     compare;
310
311
0
    datum1 = slot_getattr(s1, attno, &isNull1);
312
0
    datum2 = slot_getattr(s2, attno, &isNull2);
313
314
0
    compare = ApplySortComparator(datum1, isNull1,
315
0
                    datum2, isNull2,
316
0
                    sortKey);
317
0
    if (compare != 0)
318
0
    {
319
0
      INVERT_COMPARE_RESULT(compare);
320
0
      return compare;
321
0
    }
322
0
  }
323
0
  return 0;
324
0
}
325
326
/* ----------------------------------------------------------------
327
 *    ExecEndMergeAppend
328
 *
329
 *    Shuts down the subscans of the MergeAppend node.
330
 *
331
 *    Returns nothing of interest.
332
 * ----------------------------------------------------------------
333
 */
334
void
335
ExecEndMergeAppend(MergeAppendState *node)
336
0
{
337
0
  PlanState **mergeplans;
338
0
  int     nplans;
339
0
  int     i;
340
341
  /*
342
   * get information from the node
343
   */
344
0
  mergeplans = node->mergeplans;
345
0
  nplans = node->ms_nplans;
346
347
  /*
348
   * shut down each of the subscans
349
   */
350
0
  for (i = 0; i < nplans; i++)
351
0
    ExecEndNode(mergeplans[i]);
352
0
}
353
354
void
355
ExecReScanMergeAppend(MergeAppendState *node)
356
0
{
357
0
  int     i;
358
359
  /*
360
   * If any PARAM_EXEC Params used in pruning expressions have changed, then
361
   * we'd better unset the valid subplans so that they are reselected for
362
   * the new parameter values.
363
   */
364
0
  if (node->ms_prune_state &&
365
0
    bms_overlap(node->ps.chgParam,
366
0
          node->ms_prune_state->execparamids))
367
0
  {
368
0
    bms_free(node->ms_valid_subplans);
369
0
    node->ms_valid_subplans = NULL;
370
0
  }
371
372
0
  for (i = 0; i < node->ms_nplans; i++)
373
0
  {
374
0
    PlanState  *subnode = node->mergeplans[i];
375
376
    /*
377
     * ExecReScan doesn't know about my subplans, so I have to do
378
     * changed-parameter signaling myself.
379
     */
380
0
    if (node->ps.chgParam != NULL)
381
0
      UpdateChangedParamSet(subnode, node->ps.chgParam);
382
383
    /*
384
     * If chgParam of subnode is not null then plan will be re-scanned by
385
     * first ExecProcNode.
386
     */
387
0
    if (subnode->chgParam == NULL)
388
0
      ExecReScan(subnode);
389
0
  }
390
0
  binaryheap_reset(node->ms_heap);
391
0
  node->ms_initialized = false;
392
0
}