/src/postgres/src/backend/executor/nodeMergeAppend.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * nodeMergeAppend.c |
4 | | * routines to handle MergeAppend nodes. |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/executor/nodeMergeAppend.c |
12 | | * |
13 | | *------------------------------------------------------------------------- |
14 | | */ |
15 | | /* INTERFACE ROUTINES |
16 | | * ExecInitMergeAppend - initialize the MergeAppend node |
17 | | * ExecMergeAppend - retrieve the next tuple from the node |
18 | | * ExecEndMergeAppend - shut down the MergeAppend node |
19 | | * ExecReScanMergeAppend - rescan the MergeAppend node |
20 | | * |
21 | | * NOTES |
22 | | * A MergeAppend node contains a list of one or more subplans. |
23 | | * These are each expected to deliver tuples that are sorted according |
24 | | * to a common sort key. The MergeAppend node merges these streams |
25 | | * to produce output sorted the same way. |
26 | | * |
27 | | * MergeAppend nodes don't make use of their left and right |
28 | | * subtrees, rather they maintain a list of subplans so |
29 | | * a typical MergeAppend node looks like this in the plan tree: |
30 | | * |
31 | | * ... |
32 | | * / |
33 | | * MergeAppend---+------+------+--- nil |
34 | | * / \ | | | |
35 | | * nil nil ... ... ... |
36 | | * subplans |
37 | | */ |
38 | | |
39 | | #include "postgres.h" |
40 | | |
41 | | #include "executor/executor.h" |
42 | | #include "executor/execPartition.h" |
43 | | #include "executor/nodeMergeAppend.h" |
44 | | #include "lib/binaryheap.h" |
45 | | #include "miscadmin.h" |
46 | | |
47 | | /* |
48 | | * We have one slot for each item in the heap array. We use SlotNumber |
49 | | * to store slot indexes. This doesn't actually provide any formal |
50 | | * type-safety, but it makes the code more self-documenting. |
51 | | */ |
52 | | typedef int32 SlotNumber; |
53 | | |
54 | | static TupleTableSlot *ExecMergeAppend(PlanState *pstate); |
55 | | static int heap_compare_slots(Datum a, Datum b, void *arg); |
56 | | |
57 | | |
58 | | /* ---------------------------------------------------------------- |
59 | | * ExecInitMergeAppend |
60 | | * |
61 | | * Begin all of the subscans of the MergeAppend node. |
62 | | * ---------------------------------------------------------------- |
63 | | */ |
64 | | MergeAppendState * |
65 | | ExecInitMergeAppend(MergeAppend *node, EState *estate, int eflags) |
66 | 0 | { |
67 | 0 | MergeAppendState *mergestate = makeNode(MergeAppendState); |
68 | 0 | PlanState **mergeplanstates; |
69 | 0 | const TupleTableSlotOps *mergeops; |
70 | 0 | Bitmapset *validsubplans; |
71 | 0 | int nplans; |
72 | 0 | int i, |
73 | 0 | j; |
74 | | |
75 | | /* check for unsupported flags */ |
76 | 0 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
77 | | |
78 | | /* |
79 | | * create new MergeAppendState for our node |
80 | | */ |
81 | 0 | mergestate->ps.plan = (Plan *) node; |
82 | 0 | mergestate->ps.state = estate; |
83 | 0 | mergestate->ps.ExecProcNode = ExecMergeAppend; |
84 | | |
85 | | /* If run-time partition pruning is enabled, then set that up now */ |
86 | 0 | if (node->part_prune_index >= 0) |
87 | 0 | { |
88 | 0 | PartitionPruneState *prunestate; |
89 | | |
90 | | /* |
91 | | * Set up pruning data structure. This also initializes the set of |
92 | | * subplans to initialize (validsubplans) by taking into account the |
93 | | * result of performing initial pruning if any. |
94 | | */ |
95 | 0 | prunestate = ExecInitPartitionExecPruning(&mergestate->ps, |
96 | 0 | list_length(node->mergeplans), |
97 | 0 | node->part_prune_index, |
98 | 0 | node->apprelids, |
99 | 0 | &validsubplans); |
100 | 0 | mergestate->ms_prune_state = prunestate; |
101 | 0 | nplans = bms_num_members(validsubplans); |
102 | | |
103 | | /* |
104 | | * When no run-time pruning is required and there's at least one |
105 | | * subplan, we can fill ms_valid_subplans immediately, preventing |
106 | | * later calls to ExecFindMatchingSubPlans. |
107 | | */ |
108 | 0 | if (!prunestate->do_exec_prune && nplans > 0) |
109 | 0 | mergestate->ms_valid_subplans = bms_add_range(NULL, 0, nplans - 1); |
110 | 0 | } |
111 | 0 | else |
112 | 0 | { |
113 | 0 | nplans = list_length(node->mergeplans); |
114 | | |
115 | | /* |
116 | | * When run-time partition pruning is not enabled we can just mark all |
117 | | * subplans as valid; they must also all be initialized. |
118 | | */ |
119 | 0 | Assert(nplans > 0); |
120 | 0 | mergestate->ms_valid_subplans = validsubplans = |
121 | 0 | bms_add_range(NULL, 0, nplans - 1); |
122 | 0 | mergestate->ms_prune_state = NULL; |
123 | 0 | } |
124 | |
|
125 | 0 | mergeplanstates = (PlanState **) palloc(nplans * sizeof(PlanState *)); |
126 | 0 | mergestate->mergeplans = mergeplanstates; |
127 | 0 | mergestate->ms_nplans = nplans; |
128 | |
|
129 | 0 | mergestate->ms_slots = (TupleTableSlot **) palloc0(sizeof(TupleTableSlot *) * nplans); |
130 | 0 | mergestate->ms_heap = binaryheap_allocate(nplans, heap_compare_slots, |
131 | 0 | mergestate); |
132 | | |
133 | | /* |
134 | | * call ExecInitNode on each of the valid plans to be executed and save |
135 | | * the results into the mergeplanstates array. |
136 | | */ |
137 | 0 | j = 0; |
138 | 0 | i = -1; |
139 | 0 | while ((i = bms_next_member(validsubplans, i)) >= 0) |
140 | 0 | { |
141 | 0 | Plan *initNode = (Plan *) list_nth(node->mergeplans, i); |
142 | |
|
143 | 0 | mergeplanstates[j++] = ExecInitNode(initNode, estate, eflags); |
144 | 0 | } |
145 | | |
146 | | /* |
147 | | * Initialize MergeAppend's result tuple type and slot. If the child |
148 | | * plans all produce the same fixed slot type, we can use that slot type; |
149 | | * otherwise make a virtual slot. (Note that the result slot itself is |
150 | | * used only to return a null tuple at end of execution; real tuples are |
151 | | * returned to the caller in the children's own result slots. What we are |
152 | | * doing here is allowing the parent plan node to optimize if the |
153 | | * MergeAppend will return only one kind of slot.) |
154 | | */ |
155 | 0 | mergeops = ExecGetCommonSlotOps(mergeplanstates, j); |
156 | 0 | if (mergeops != NULL) |
157 | 0 | { |
158 | 0 | ExecInitResultTupleSlotTL(&mergestate->ps, mergeops); |
159 | 0 | } |
160 | 0 | else |
161 | 0 | { |
162 | 0 | ExecInitResultTupleSlotTL(&mergestate->ps, &TTSOpsVirtual); |
163 | | /* show that the output slot type is not fixed */ |
164 | 0 | mergestate->ps.resultopsset = true; |
165 | 0 | mergestate->ps.resultopsfixed = false; |
166 | 0 | } |
167 | | |
168 | | /* |
169 | | * Miscellaneous initialization |
170 | | */ |
171 | 0 | mergestate->ps.ps_ProjInfo = NULL; |
172 | | |
173 | | /* |
174 | | * initialize sort-key information |
175 | | */ |
176 | 0 | mergestate->ms_nkeys = node->numCols; |
177 | 0 | mergestate->ms_sortkeys = palloc0(sizeof(SortSupportData) * node->numCols); |
178 | |
|
179 | 0 | for (i = 0; i < node->numCols; i++) |
180 | 0 | { |
181 | 0 | SortSupport sortKey = mergestate->ms_sortkeys + i; |
182 | |
|
183 | 0 | sortKey->ssup_cxt = CurrentMemoryContext; |
184 | 0 | sortKey->ssup_collation = node->collations[i]; |
185 | 0 | sortKey->ssup_nulls_first = node->nullsFirst[i]; |
186 | 0 | sortKey->ssup_attno = node->sortColIdx[i]; |
187 | | |
188 | | /* |
189 | | * It isn't feasible to perform abbreviated key conversion, since |
190 | | * tuples are pulled into mergestate's binary heap as needed. It |
191 | | * would likely be counter-productive to convert tuples into an |
192 | | * abbreviated representation as they're pulled up, so opt out of that |
193 | | * additional optimization entirely. |
194 | | */ |
195 | 0 | sortKey->abbreviate = false; |
196 | |
|
197 | 0 | PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); |
198 | 0 | } |
199 | | |
200 | | /* |
201 | | * initialize to show we have not run the subplans yet |
202 | | */ |
203 | 0 | mergestate->ms_initialized = false; |
204 | |
|
205 | 0 | return mergestate; |
206 | 0 | } |
207 | | |
208 | | /* ---------------------------------------------------------------- |
209 | | * ExecMergeAppend |
210 | | * |
211 | | * Handles iteration over multiple subplans. |
212 | | * ---------------------------------------------------------------- |
213 | | */ |
214 | | static TupleTableSlot * |
215 | | ExecMergeAppend(PlanState *pstate) |
216 | 0 | { |
217 | 0 | MergeAppendState *node = castNode(MergeAppendState, pstate); |
218 | 0 | TupleTableSlot *result; |
219 | 0 | SlotNumber i; |
220 | |
|
221 | 0 | CHECK_FOR_INTERRUPTS(); |
222 | |
|
223 | 0 | if (!node->ms_initialized) |
224 | 0 | { |
225 | | /* Nothing to do if all subplans were pruned */ |
226 | 0 | if (node->ms_nplans == 0) |
227 | 0 | return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
228 | | |
229 | | /* |
230 | | * If we've yet to determine the valid subplans then do so now. If |
231 | | * run-time pruning is disabled then the valid subplans will always be |
232 | | * set to all subplans. |
233 | | */ |
234 | 0 | if (node->ms_valid_subplans == NULL) |
235 | 0 | node->ms_valid_subplans = |
236 | 0 | ExecFindMatchingSubPlans(node->ms_prune_state, false, NULL); |
237 | | |
238 | | /* |
239 | | * First time through: pull the first tuple from each valid subplan, |
240 | | * and set up the heap. |
241 | | */ |
242 | 0 | i = -1; |
243 | 0 | while ((i = bms_next_member(node->ms_valid_subplans, i)) >= 0) |
244 | 0 | { |
245 | 0 | node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); |
246 | 0 | if (!TupIsNull(node->ms_slots[i])) |
247 | 0 | binaryheap_add_unordered(node->ms_heap, Int32GetDatum(i)); |
248 | 0 | } |
249 | 0 | binaryheap_build(node->ms_heap); |
250 | 0 | node->ms_initialized = true; |
251 | 0 | } |
252 | 0 | else |
253 | 0 | { |
254 | | /* |
255 | | * Otherwise, pull the next tuple from whichever subplan we returned |
256 | | * from last time, and reinsert the subplan index into the heap, |
257 | | * because it might now compare differently against the existing |
258 | | * elements of the heap. (We could perhaps simplify the logic a bit |
259 | | * by doing this before returning from the prior call, but it's better |
260 | | * to not pull tuples until necessary.) |
261 | | */ |
262 | 0 | i = DatumGetInt32(binaryheap_first(node->ms_heap)); |
263 | 0 | node->ms_slots[i] = ExecProcNode(node->mergeplans[i]); |
264 | 0 | if (!TupIsNull(node->ms_slots[i])) |
265 | 0 | binaryheap_replace_first(node->ms_heap, Int32GetDatum(i)); |
266 | 0 | else |
267 | 0 | (void) binaryheap_remove_first(node->ms_heap); |
268 | 0 | } |
269 | | |
270 | 0 | if (binaryheap_empty(node->ms_heap)) |
271 | 0 | { |
272 | | /* All the subplans are exhausted, and so is the heap */ |
273 | 0 | result = ExecClearTuple(node->ps.ps_ResultTupleSlot); |
274 | 0 | } |
275 | 0 | else |
276 | 0 | { |
277 | 0 | i = DatumGetInt32(binaryheap_first(node->ms_heap)); |
278 | 0 | result = node->ms_slots[i]; |
279 | 0 | } |
280 | |
|
281 | 0 | return result; |
282 | 0 | } |
283 | | |
284 | | /* |
285 | | * Compare the tuples in the two given slots. |
286 | | */ |
287 | | static int32 |
288 | | heap_compare_slots(Datum a, Datum b, void *arg) |
289 | 0 | { |
290 | 0 | MergeAppendState *node = (MergeAppendState *) arg; |
291 | 0 | SlotNumber slot1 = DatumGetInt32(a); |
292 | 0 | SlotNumber slot2 = DatumGetInt32(b); |
293 | |
|
294 | 0 | TupleTableSlot *s1 = node->ms_slots[slot1]; |
295 | 0 | TupleTableSlot *s2 = node->ms_slots[slot2]; |
296 | 0 | int nkey; |
297 | |
|
298 | 0 | Assert(!TupIsNull(s1)); |
299 | 0 | Assert(!TupIsNull(s2)); |
300 | |
|
301 | 0 | for (nkey = 0; nkey < node->ms_nkeys; nkey++) |
302 | 0 | { |
303 | 0 | SortSupport sortKey = node->ms_sortkeys + nkey; |
304 | 0 | AttrNumber attno = sortKey->ssup_attno; |
305 | 0 | Datum datum1, |
306 | 0 | datum2; |
307 | 0 | bool isNull1, |
308 | 0 | isNull2; |
309 | 0 | int compare; |
310 | |
|
311 | 0 | datum1 = slot_getattr(s1, attno, &isNull1); |
312 | 0 | datum2 = slot_getattr(s2, attno, &isNull2); |
313 | |
|
314 | 0 | compare = ApplySortComparator(datum1, isNull1, |
315 | 0 | datum2, isNull2, |
316 | 0 | sortKey); |
317 | 0 | if (compare != 0) |
318 | 0 | { |
319 | 0 | INVERT_COMPARE_RESULT(compare); |
320 | 0 | return compare; |
321 | 0 | } |
322 | 0 | } |
323 | 0 | return 0; |
324 | 0 | } |
325 | | |
326 | | /* ---------------------------------------------------------------- |
327 | | * ExecEndMergeAppend |
328 | | * |
329 | | * Shuts down the subscans of the MergeAppend node. |
330 | | * |
331 | | * Returns nothing of interest. |
332 | | * ---------------------------------------------------------------- |
333 | | */ |
334 | | void |
335 | | ExecEndMergeAppend(MergeAppendState *node) |
336 | 0 | { |
337 | 0 | PlanState **mergeplans; |
338 | 0 | int nplans; |
339 | 0 | int i; |
340 | | |
341 | | /* |
342 | | * get information from the node |
343 | | */ |
344 | 0 | mergeplans = node->mergeplans; |
345 | 0 | nplans = node->ms_nplans; |
346 | | |
347 | | /* |
348 | | * shut down each of the subscans |
349 | | */ |
350 | 0 | for (i = 0; i < nplans; i++) |
351 | 0 | ExecEndNode(mergeplans[i]); |
352 | 0 | } |
353 | | |
354 | | void |
355 | | ExecReScanMergeAppend(MergeAppendState *node) |
356 | 0 | { |
357 | 0 | int i; |
358 | | |
359 | | /* |
360 | | * If any PARAM_EXEC Params used in pruning expressions have changed, then |
361 | | * we'd better unset the valid subplans so that they are reselected for |
362 | | * the new parameter values. |
363 | | */ |
364 | 0 | if (node->ms_prune_state && |
365 | 0 | bms_overlap(node->ps.chgParam, |
366 | 0 | node->ms_prune_state->execparamids)) |
367 | 0 | { |
368 | 0 | bms_free(node->ms_valid_subplans); |
369 | 0 | node->ms_valid_subplans = NULL; |
370 | 0 | } |
371 | |
|
372 | 0 | for (i = 0; i < node->ms_nplans; i++) |
373 | 0 | { |
374 | 0 | PlanState *subnode = node->mergeplans[i]; |
375 | | |
376 | | /* |
377 | | * ExecReScan doesn't know about my subplans, so I have to do |
378 | | * changed-parameter signaling myself. |
379 | | */ |
380 | 0 | if (node->ps.chgParam != NULL) |
381 | 0 | UpdateChangedParamSet(subnode, node->ps.chgParam); |
382 | | |
383 | | /* |
384 | | * If chgParam of subnode is not null then plan will be re-scanned by |
385 | | * first ExecProcNode. |
386 | | */ |
387 | 0 | if (subnode->chgParam == NULL) |
388 | 0 | ExecReScan(subnode); |
389 | 0 | } |
390 | 0 | binaryheap_reset(node->ms_heap); |
391 | 0 | node->ms_initialized = false; |
392 | 0 | } |