/src/postgres/src/backend/executor/nodeGatherMerge.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * nodeGatherMerge.c |
4 | | * Scan a plan in multiple workers, and do order-preserving merge. |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * IDENTIFICATION |
10 | | * src/backend/executor/nodeGatherMerge.c |
11 | | * |
12 | | *------------------------------------------------------------------------- |
13 | | */ |
14 | | |
15 | | #include "postgres.h" |
16 | | |
17 | | #include "executor/executor.h" |
18 | | #include "executor/execParallel.h" |
19 | | #include "executor/nodeGatherMerge.h" |
20 | | #include "executor/tqueue.h" |
21 | | #include "lib/binaryheap.h" |
22 | | #include "miscadmin.h" |
23 | | #include "optimizer/optimizer.h" |
24 | | |
25 | | /* |
26 | | * When we read tuples from workers, it's a good idea to read several at once |
27 | | * for efficiency when possible: this minimizes context-switching overhead. |
28 | | * But reading too many at a time wastes memory without improving performance. |
29 | | * We'll read up to MAX_TUPLE_STORE tuples (in addition to the first one). |
30 | | */ |
31 | 0 | #define MAX_TUPLE_STORE 10 |
32 | | |
33 | | /* |
34 | | * Pending-tuple array for each worker. This holds additional tuples that |
35 | | * we were able to fetch from the worker, but can't process yet. In addition, |
36 | | * this struct holds the "done" flag indicating the worker is known to have |
37 | | * no more tuples. (We do not use this struct for the leader; we don't keep |
38 | | * any pending tuples for the leader, and the need_to_scan_locally flag serves |
39 | | * as its "done" indicator.) |
40 | | */ |
41 | | typedef struct GMReaderTupleBuffer |
42 | | { |
43 | | MinimalTuple *tuple; /* array of length MAX_TUPLE_STORE */ |
44 | | int nTuples; /* number of tuples currently stored */ |
45 | | int readCounter; /* index of next tuple to extract */ |
46 | | bool done; /* true if reader is known exhausted */ |
47 | | } GMReaderTupleBuffer; |
48 | | |
49 | | static TupleTableSlot *ExecGatherMerge(PlanState *pstate); |
50 | | static int32 heap_compare_slots(Datum a, Datum b, void *arg); |
51 | | static TupleTableSlot *gather_merge_getnext(GatherMergeState *gm_state); |
52 | | static MinimalTuple gm_readnext_tuple(GatherMergeState *gm_state, int nreader, |
53 | | bool nowait, bool *done); |
54 | | static void ExecShutdownGatherMergeWorkers(GatherMergeState *node); |
55 | | static void gather_merge_setup(GatherMergeState *gm_state); |
56 | | static void gather_merge_init(GatherMergeState *gm_state); |
57 | | static void gather_merge_clear_tuples(GatherMergeState *gm_state); |
58 | | static bool gather_merge_readnext(GatherMergeState *gm_state, int reader, |
59 | | bool nowait); |
60 | | static void load_tuple_array(GatherMergeState *gm_state, int reader); |
61 | | |
62 | | /* ---------------------------------------------------------------- |
63 | | * ExecInitGather |
64 | | * ---------------------------------------------------------------- |
65 | | */ |
66 | | GatherMergeState * |
67 | | ExecInitGatherMerge(GatherMerge *node, EState *estate, int eflags) |
68 | 0 | { |
69 | 0 | GatherMergeState *gm_state; |
70 | 0 | Plan *outerNode; |
71 | 0 | TupleDesc tupDesc; |
72 | | |
73 | | /* Gather merge node doesn't have innerPlan node. */ |
74 | 0 | Assert(innerPlan(node) == NULL); |
75 | | |
76 | | /* |
77 | | * create state structure |
78 | | */ |
79 | 0 | gm_state = makeNode(GatherMergeState); |
80 | 0 | gm_state->ps.plan = (Plan *) node; |
81 | 0 | gm_state->ps.state = estate; |
82 | 0 | gm_state->ps.ExecProcNode = ExecGatherMerge; |
83 | |
|
84 | 0 | gm_state->initialized = false; |
85 | 0 | gm_state->gm_initialized = false; |
86 | 0 | gm_state->tuples_needed = -1; |
87 | | |
88 | | /* |
89 | | * Miscellaneous initialization |
90 | | * |
91 | | * create expression context for node |
92 | | */ |
93 | 0 | ExecAssignExprContext(estate, &gm_state->ps); |
94 | | |
95 | | /* |
96 | | * GatherMerge doesn't support checking a qual (it's always more efficient |
97 | | * to do it in the child node). |
98 | | */ |
99 | 0 | Assert(!node->plan.qual); |
100 | | |
101 | | /* |
102 | | * now initialize outer plan |
103 | | */ |
104 | 0 | outerNode = outerPlan(node); |
105 | 0 | outerPlanState(gm_state) = ExecInitNode(outerNode, estate, eflags); |
106 | | |
107 | | /* |
108 | | * Leader may access ExecProcNode result directly (if |
109 | | * need_to_scan_locally), or from workers via tuple queue. So we can't |
110 | | * trivially rely on the slot type being fixed for expressions evaluated |
111 | | * within this node. |
112 | | */ |
113 | 0 | gm_state->ps.outeropsset = true; |
114 | 0 | gm_state->ps.outeropsfixed = false; |
115 | | |
116 | | /* |
117 | | * Store the tuple descriptor into gather merge state, so we can use it |
118 | | * while initializing the gather merge slots. |
119 | | */ |
120 | 0 | tupDesc = ExecGetResultType(outerPlanState(gm_state)); |
121 | 0 | gm_state->tupDesc = tupDesc; |
122 | | |
123 | | /* |
124 | | * Initialize result type and projection. |
125 | | */ |
126 | 0 | ExecInitResultTypeTL(&gm_state->ps); |
127 | 0 | ExecConditionalAssignProjectionInfo(&gm_state->ps, tupDesc, OUTER_VAR); |
128 | | |
129 | | /* |
130 | | * Without projections result slot type is not trivially known, see |
131 | | * comment above. |
132 | | */ |
133 | 0 | if (gm_state->ps.ps_ProjInfo == NULL) |
134 | 0 | { |
135 | 0 | gm_state->ps.resultopsset = true; |
136 | 0 | gm_state->ps.resultopsfixed = false; |
137 | 0 | } |
138 | | |
139 | | /* |
140 | | * initialize sort-key information |
141 | | */ |
142 | 0 | if (node->numCols) |
143 | 0 | { |
144 | 0 | int i; |
145 | |
|
146 | 0 | gm_state->gm_nkeys = node->numCols; |
147 | 0 | gm_state->gm_sortkeys = |
148 | 0 | palloc0(sizeof(SortSupportData) * node->numCols); |
149 | |
|
150 | 0 | for (i = 0; i < node->numCols; i++) |
151 | 0 | { |
152 | 0 | SortSupport sortKey = gm_state->gm_sortkeys + i; |
153 | |
|
154 | 0 | sortKey->ssup_cxt = CurrentMemoryContext; |
155 | 0 | sortKey->ssup_collation = node->collations[i]; |
156 | 0 | sortKey->ssup_nulls_first = node->nullsFirst[i]; |
157 | 0 | sortKey->ssup_attno = node->sortColIdx[i]; |
158 | | |
159 | | /* |
160 | | * We don't perform abbreviated key conversion here, for the same |
161 | | * reasons that it isn't used in MergeAppend |
162 | | */ |
163 | 0 | sortKey->abbreviate = false; |
164 | |
|
165 | 0 | PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); |
166 | 0 | } |
167 | 0 | } |
168 | | |
169 | | /* Now allocate the workspace for gather merge */ |
170 | 0 | gather_merge_setup(gm_state); |
171 | |
|
172 | 0 | return gm_state; |
173 | 0 | } |
174 | | |
175 | | /* ---------------------------------------------------------------- |
176 | | * ExecGatherMerge(node) |
177 | | * |
178 | | * Scans the relation via multiple workers and returns |
179 | | * the next qualifying tuple. |
180 | | * ---------------------------------------------------------------- |
181 | | */ |
182 | | static TupleTableSlot * |
183 | | ExecGatherMerge(PlanState *pstate) |
184 | 0 | { |
185 | 0 | GatherMergeState *node = castNode(GatherMergeState, pstate); |
186 | 0 | TupleTableSlot *slot; |
187 | 0 | ExprContext *econtext; |
188 | |
|
189 | 0 | CHECK_FOR_INTERRUPTS(); |
190 | | |
191 | | /* |
192 | | * As with Gather, we don't launch workers until this node is actually |
193 | | * executed. |
194 | | */ |
195 | 0 | if (!node->initialized) |
196 | 0 | { |
197 | 0 | EState *estate = node->ps.state; |
198 | 0 | GatherMerge *gm = castNode(GatherMerge, node->ps.plan); |
199 | | |
200 | | /* |
201 | | * Sometimes we might have to run without parallelism; but if parallel |
202 | | * mode is active then we can try to fire up some workers. |
203 | | */ |
204 | 0 | if (gm->num_workers > 0 && estate->es_use_parallel_mode) |
205 | 0 | { |
206 | 0 | ParallelContext *pcxt; |
207 | | |
208 | | /* Initialize, or re-initialize, shared state needed by workers. */ |
209 | 0 | if (!node->pei) |
210 | 0 | node->pei = ExecInitParallelPlan(outerPlanState(node), |
211 | 0 | estate, |
212 | 0 | gm->initParam, |
213 | 0 | gm->num_workers, |
214 | 0 | node->tuples_needed); |
215 | 0 | else |
216 | 0 | ExecParallelReinitialize(outerPlanState(node), |
217 | 0 | node->pei, |
218 | 0 | gm->initParam); |
219 | | |
220 | | /* Try to launch workers. */ |
221 | 0 | pcxt = node->pei->pcxt; |
222 | 0 | LaunchParallelWorkers(pcxt); |
223 | | /* We save # workers launched for the benefit of EXPLAIN */ |
224 | 0 | node->nworkers_launched = pcxt->nworkers_launched; |
225 | | |
226 | | /* |
227 | | * Count number of workers originally wanted and actually |
228 | | * launched. |
229 | | */ |
230 | 0 | estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch; |
231 | 0 | estate->es_parallel_workers_launched += pcxt->nworkers_launched; |
232 | | |
233 | | /* Set up tuple queue readers to read the results. */ |
234 | 0 | if (pcxt->nworkers_launched > 0) |
235 | 0 | { |
236 | 0 | ExecParallelCreateReaders(node->pei); |
237 | | /* Make a working array showing the active readers */ |
238 | 0 | node->nreaders = pcxt->nworkers_launched; |
239 | 0 | node->reader = (TupleQueueReader **) |
240 | 0 | palloc(node->nreaders * sizeof(TupleQueueReader *)); |
241 | 0 | memcpy(node->reader, node->pei->reader, |
242 | 0 | node->nreaders * sizeof(TupleQueueReader *)); |
243 | 0 | } |
244 | 0 | else |
245 | 0 | { |
246 | | /* No workers? Then never mind. */ |
247 | 0 | node->nreaders = 0; |
248 | 0 | node->reader = NULL; |
249 | 0 | } |
250 | 0 | } |
251 | | |
252 | | /* allow leader to participate if enabled or no choice */ |
253 | 0 | if (parallel_leader_participation || node->nreaders == 0) |
254 | 0 | node->need_to_scan_locally = true; |
255 | 0 | node->initialized = true; |
256 | 0 | } |
257 | | |
258 | | /* |
259 | | * Reset per-tuple memory context to free any expression evaluation |
260 | | * storage allocated in the previous tuple cycle. |
261 | | */ |
262 | 0 | econtext = node->ps.ps_ExprContext; |
263 | 0 | ResetExprContext(econtext); |
264 | | |
265 | | /* |
266 | | * Get next tuple, either from one of our workers, or by running the plan |
267 | | * ourselves. |
268 | | */ |
269 | 0 | slot = gather_merge_getnext(node); |
270 | 0 | if (TupIsNull(slot)) |
271 | 0 | return NULL; |
272 | | |
273 | | /* If no projection is required, we're done. */ |
274 | 0 | if (node->ps.ps_ProjInfo == NULL) |
275 | 0 | return slot; |
276 | | |
277 | | /* |
278 | | * Form the result tuple using ExecProject(), and return it. |
279 | | */ |
280 | 0 | econtext->ecxt_outertuple = slot; |
281 | 0 | return ExecProject(node->ps.ps_ProjInfo); |
282 | 0 | } |
283 | | |
284 | | /* ---------------------------------------------------------------- |
285 | | * ExecEndGatherMerge |
286 | | * |
287 | | * frees any storage allocated through C routines. |
288 | | * ---------------------------------------------------------------- |
289 | | */ |
290 | | void |
291 | | ExecEndGatherMerge(GatherMergeState *node) |
292 | 0 | { |
293 | 0 | ExecEndNode(outerPlanState(node)); /* let children clean up first */ |
294 | 0 | ExecShutdownGatherMerge(node); |
295 | 0 | } |
296 | | |
297 | | /* ---------------------------------------------------------------- |
298 | | * ExecShutdownGatherMerge |
299 | | * |
300 | | * Destroy the setup for parallel workers including parallel context. |
301 | | * ---------------------------------------------------------------- |
302 | | */ |
303 | | void |
304 | | ExecShutdownGatherMerge(GatherMergeState *node) |
305 | 0 | { |
306 | 0 | ExecShutdownGatherMergeWorkers(node); |
307 | | |
308 | | /* Now destroy the parallel context. */ |
309 | 0 | if (node->pei != NULL) |
310 | 0 | { |
311 | 0 | ExecParallelCleanup(node->pei); |
312 | 0 | node->pei = NULL; |
313 | 0 | } |
314 | 0 | } |
315 | | |
316 | | /* ---------------------------------------------------------------- |
317 | | * ExecShutdownGatherMergeWorkers |
318 | | * |
319 | | * Stop all the parallel workers. |
320 | | * ---------------------------------------------------------------- |
321 | | */ |
322 | | static void |
323 | | ExecShutdownGatherMergeWorkers(GatherMergeState *node) |
324 | 0 | { |
325 | 0 | if (node->pei != NULL) |
326 | 0 | ExecParallelFinish(node->pei); |
327 | | |
328 | | /* Flush local copy of reader array */ |
329 | 0 | if (node->reader) |
330 | 0 | pfree(node->reader); |
331 | 0 | node->reader = NULL; |
332 | 0 | } |
333 | | |
334 | | /* ---------------------------------------------------------------- |
335 | | * ExecReScanGatherMerge |
336 | | * |
337 | | * Prepare to re-scan the result of a GatherMerge. |
338 | | * ---------------------------------------------------------------- |
339 | | */ |
340 | | void |
341 | | ExecReScanGatherMerge(GatherMergeState *node) |
342 | 0 | { |
343 | 0 | GatherMerge *gm = (GatherMerge *) node->ps.plan; |
344 | 0 | PlanState *outerPlan = outerPlanState(node); |
345 | | |
346 | | /* Make sure any existing workers are gracefully shut down */ |
347 | 0 | ExecShutdownGatherMergeWorkers(node); |
348 | | |
349 | | /* Free any unused tuples, so we don't leak memory across rescans */ |
350 | 0 | gather_merge_clear_tuples(node); |
351 | | |
352 | | /* Mark node so that shared state will be rebuilt at next call */ |
353 | 0 | node->initialized = false; |
354 | 0 | node->gm_initialized = false; |
355 | | |
356 | | /* |
357 | | * Set child node's chgParam to tell it that the next scan might deliver a |
358 | | * different set of rows within the leader process. (The overall rowset |
359 | | * shouldn't change, but the leader process's subset might; hence nodes |
360 | | * between here and the parallel table scan node mustn't optimize on the |
361 | | * assumption of an unchanging rowset.) |
362 | | */ |
363 | 0 | if (gm->rescan_param >= 0) |
364 | 0 | outerPlan->chgParam = bms_add_member(outerPlan->chgParam, |
365 | 0 | gm->rescan_param); |
366 | | |
367 | | /* |
368 | | * If chgParam of subnode is not null then plan will be re-scanned by |
369 | | * first ExecProcNode. Note: because this does nothing if we have a |
370 | | * rescan_param, it's currently guaranteed that parallel-aware child nodes |
371 | | * will not see a ReScan call until after they get a ReInitializeDSM call. |
372 | | * That ordering might not be something to rely on, though. A good rule |
373 | | * of thumb is that ReInitializeDSM should reset only shared state, ReScan |
374 | | * should reset only local state, and anything that depends on both of |
375 | | * those steps being finished must wait until the first ExecProcNode call. |
376 | | */ |
377 | 0 | if (outerPlan->chgParam == NULL) |
378 | 0 | ExecReScan(outerPlan); |
379 | 0 | } |
380 | | |
381 | | /* |
382 | | * Set up the data structures that we'll need for Gather Merge. |
383 | | * |
384 | | * We allocate these once on the basis of gm->num_workers, which is an |
385 | | * upper bound for the number of workers we'll actually have. During |
386 | | * a rescan, we reset the structures to empty. This approach simplifies |
387 | | * not leaking memory across rescans. |
388 | | * |
389 | | * In the gm_slots[] array, index 0 is for the leader, and indexes 1 to n |
390 | | * are for workers. The values placed into gm_heap correspond to indexes |
391 | | * in gm_slots[]. The gm_tuple_buffers[] array, however, is indexed from |
392 | | * 0 to n-1; it has no entry for the leader. |
393 | | */ |
394 | | static void |
395 | | gather_merge_setup(GatherMergeState *gm_state) |
396 | 0 | { |
397 | 0 | GatherMerge *gm = castNode(GatherMerge, gm_state->ps.plan); |
398 | 0 | int nreaders = gm->num_workers; |
399 | 0 | int i; |
400 | | |
401 | | /* |
402 | | * Allocate gm_slots for the number of workers + one more slot for leader. |
403 | | * Slot 0 is always for the leader. Leader always calls ExecProcNode() to |
404 | | * read the tuple, and then stores it directly into its gm_slots entry. |
405 | | * For other slots, code below will call ExecInitExtraTupleSlot() to |
406 | | * create a slot for the worker's results. Note that during any single |
407 | | * scan, we might have fewer than num_workers available workers, in which |
408 | | * case the extra array entries go unused. |
409 | | */ |
410 | 0 | gm_state->gm_slots = (TupleTableSlot **) |
411 | 0 | palloc0((nreaders + 1) * sizeof(TupleTableSlot *)); |
412 | | |
413 | | /* Allocate the tuple slot and tuple array for each worker */ |
414 | 0 | gm_state->gm_tuple_buffers = (GMReaderTupleBuffer *) |
415 | 0 | palloc0(nreaders * sizeof(GMReaderTupleBuffer)); |
416 | |
|
417 | 0 | for (i = 0; i < nreaders; i++) |
418 | 0 | { |
419 | | /* Allocate the tuple array with length MAX_TUPLE_STORE */ |
420 | 0 | gm_state->gm_tuple_buffers[i].tuple = |
421 | 0 | (MinimalTuple *) palloc0(sizeof(MinimalTuple) * MAX_TUPLE_STORE); |
422 | | |
423 | | /* Initialize tuple slot for worker */ |
424 | 0 | gm_state->gm_slots[i + 1] = |
425 | 0 | ExecInitExtraTupleSlot(gm_state->ps.state, gm_state->tupDesc, |
426 | 0 | &TTSOpsMinimalTuple); |
427 | 0 | } |
428 | | |
429 | | /* Allocate the resources for the merge */ |
430 | 0 | gm_state->gm_heap = binaryheap_allocate(nreaders + 1, |
431 | 0 | heap_compare_slots, |
432 | 0 | gm_state); |
433 | 0 | } |
434 | | |
435 | | /* |
436 | | * Initialize the Gather Merge. |
437 | | * |
438 | | * Reset data structures to ensure they're empty. Then pull at least one |
439 | | * tuple from leader + each worker (or set its "done" indicator), and set up |
440 | | * the heap. |
441 | | */ |
442 | | static void |
443 | | gather_merge_init(GatherMergeState *gm_state) |
444 | 0 | { |
445 | 0 | int nreaders = gm_state->nreaders; |
446 | 0 | bool nowait = true; |
447 | 0 | int i; |
448 | | |
449 | | /* Assert that gather_merge_setup made enough space */ |
450 | 0 | Assert(nreaders <= castNode(GatherMerge, gm_state->ps.plan)->num_workers); |
451 | | |
452 | | /* Reset leader's tuple slot to empty */ |
453 | 0 | gm_state->gm_slots[0] = NULL; |
454 | | |
455 | | /* Reset the tuple slot and tuple array for each worker */ |
456 | 0 | for (i = 0; i < nreaders; i++) |
457 | 0 | { |
458 | | /* Reset tuple array to empty */ |
459 | 0 | gm_state->gm_tuple_buffers[i].nTuples = 0; |
460 | 0 | gm_state->gm_tuple_buffers[i].readCounter = 0; |
461 | | /* Reset done flag to not-done */ |
462 | 0 | gm_state->gm_tuple_buffers[i].done = false; |
463 | | /* Ensure output slot is empty */ |
464 | 0 | ExecClearTuple(gm_state->gm_slots[i + 1]); |
465 | 0 | } |
466 | | |
467 | | /* Reset binary heap to empty */ |
468 | 0 | binaryheap_reset(gm_state->gm_heap); |
469 | | |
470 | | /* |
471 | | * First, try to read a tuple from each worker (including leader) in |
472 | | * nowait mode. After this, if not all workers were able to produce a |
473 | | * tuple (or a "done" indication), then re-read from remaining workers, |
474 | | * this time using wait mode. Add all live readers (those producing at |
475 | | * least one tuple) to the heap. |
476 | | */ |
477 | 0 | reread: |
478 | 0 | for (i = 0; i <= nreaders; i++) |
479 | 0 | { |
480 | 0 | CHECK_FOR_INTERRUPTS(); |
481 | | |
482 | | /* skip this source if already known done */ |
483 | 0 | if ((i == 0) ? gm_state->need_to_scan_locally : |
484 | 0 | !gm_state->gm_tuple_buffers[i - 1].done) |
485 | 0 | { |
486 | 0 | if (TupIsNull(gm_state->gm_slots[i])) |
487 | 0 | { |
488 | | /* Don't have a tuple yet, try to get one */ |
489 | 0 | if (gather_merge_readnext(gm_state, i, nowait)) |
490 | 0 | binaryheap_add_unordered(gm_state->gm_heap, |
491 | 0 | Int32GetDatum(i)); |
492 | 0 | } |
493 | 0 | else |
494 | 0 | { |
495 | | /* |
496 | | * We already got at least one tuple from this worker, but |
497 | | * might as well see if it has any more ready by now. |
498 | | */ |
499 | 0 | load_tuple_array(gm_state, i); |
500 | 0 | } |
501 | 0 | } |
502 | 0 | } |
503 | | |
504 | | /* need not recheck leader, since nowait doesn't matter for it */ |
505 | 0 | for (i = 1; i <= nreaders; i++) |
506 | 0 | { |
507 | 0 | if (!gm_state->gm_tuple_buffers[i - 1].done && |
508 | 0 | TupIsNull(gm_state->gm_slots[i])) |
509 | 0 | { |
510 | 0 | nowait = false; |
511 | 0 | goto reread; |
512 | 0 | } |
513 | 0 | } |
514 | | |
515 | | /* Now heapify the heap. */ |
516 | 0 | binaryheap_build(gm_state->gm_heap); |
517 | |
|
518 | 0 | gm_state->gm_initialized = true; |
519 | 0 | } |
520 | | |
521 | | /* |
522 | | * Clear out the tuple table slot, and any unused pending tuples, |
523 | | * for each gather merge input. |
524 | | */ |
525 | | static void |
526 | | gather_merge_clear_tuples(GatherMergeState *gm_state) |
527 | 0 | { |
528 | 0 | int i; |
529 | |
|
530 | 0 | for (i = 0; i < gm_state->nreaders; i++) |
531 | 0 | { |
532 | 0 | GMReaderTupleBuffer *tuple_buffer = &gm_state->gm_tuple_buffers[i]; |
533 | |
|
534 | 0 | while (tuple_buffer->readCounter < tuple_buffer->nTuples) |
535 | 0 | pfree(tuple_buffer->tuple[tuple_buffer->readCounter++]); |
536 | |
|
537 | 0 | ExecClearTuple(gm_state->gm_slots[i + 1]); |
538 | 0 | } |
539 | 0 | } |
540 | | |
541 | | /* |
542 | | * Read the next tuple for gather merge. |
543 | | * |
544 | | * Fetch the sorted tuple out of the heap. |
545 | | */ |
546 | | static TupleTableSlot * |
547 | | gather_merge_getnext(GatherMergeState *gm_state) |
548 | 0 | { |
549 | 0 | int i; |
550 | |
|
551 | 0 | if (!gm_state->gm_initialized) |
552 | 0 | { |
553 | | /* |
554 | | * First time through: pull the first tuple from each participant, and |
555 | | * set up the heap. |
556 | | */ |
557 | 0 | gather_merge_init(gm_state); |
558 | 0 | } |
559 | 0 | else |
560 | 0 | { |
561 | | /* |
562 | | * Otherwise, pull the next tuple from whichever participant we |
563 | | * returned from last time, and reinsert that participant's index into |
564 | | * the heap, because it might now compare differently against the |
565 | | * other elements of the heap. |
566 | | */ |
567 | 0 | i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); |
568 | |
|
569 | 0 | if (gather_merge_readnext(gm_state, i, false)) |
570 | 0 | binaryheap_replace_first(gm_state->gm_heap, Int32GetDatum(i)); |
571 | 0 | else |
572 | 0 | { |
573 | | /* reader exhausted, remove it from heap */ |
574 | 0 | (void) binaryheap_remove_first(gm_state->gm_heap); |
575 | 0 | } |
576 | 0 | } |
577 | |
|
578 | 0 | if (binaryheap_empty(gm_state->gm_heap)) |
579 | 0 | { |
580 | | /* All the queues are exhausted, and so is the heap */ |
581 | 0 | gather_merge_clear_tuples(gm_state); |
582 | 0 | return NULL; |
583 | 0 | } |
584 | 0 | else |
585 | 0 | { |
586 | | /* Return next tuple from whichever participant has the leading one */ |
587 | 0 | i = DatumGetInt32(binaryheap_first(gm_state->gm_heap)); |
588 | 0 | return gm_state->gm_slots[i]; |
589 | 0 | } |
590 | 0 | } |
591 | | |
592 | | /* |
593 | | * Read tuple(s) for given reader in nowait mode, and load into its tuple |
594 | | * array, until we have MAX_TUPLE_STORE of them or would have to block. |
595 | | */ |
596 | | static void |
597 | | load_tuple_array(GatherMergeState *gm_state, int reader) |
598 | 0 | { |
599 | 0 | GMReaderTupleBuffer *tuple_buffer; |
600 | 0 | int i; |
601 | | |
602 | | /* Don't do anything if this is the leader. */ |
603 | 0 | if (reader == 0) |
604 | 0 | return; |
605 | | |
606 | 0 | tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1]; |
607 | | |
608 | | /* If there's nothing in the array, reset the counters to zero. */ |
609 | 0 | if (tuple_buffer->nTuples == tuple_buffer->readCounter) |
610 | 0 | tuple_buffer->nTuples = tuple_buffer->readCounter = 0; |
611 | | |
612 | | /* Try to fill additional slots in the array. */ |
613 | 0 | for (i = tuple_buffer->nTuples; i < MAX_TUPLE_STORE; i++) |
614 | 0 | { |
615 | 0 | MinimalTuple tuple; |
616 | |
|
617 | 0 | tuple = gm_readnext_tuple(gm_state, |
618 | 0 | reader, |
619 | 0 | true, |
620 | 0 | &tuple_buffer->done); |
621 | 0 | if (!tuple) |
622 | 0 | break; |
623 | 0 | tuple_buffer->tuple[i] = tuple; |
624 | 0 | tuple_buffer->nTuples++; |
625 | 0 | } |
626 | 0 | } |
627 | | |
628 | | /* |
629 | | * Store the next tuple for a given reader into the appropriate slot. |
630 | | * |
631 | | * Returns true if successful, false if not (either reader is exhausted, |
632 | | * or we didn't want to wait for a tuple). Sets done flag if reader |
633 | | * is found to be exhausted. |
634 | | */ |
635 | | static bool |
636 | | gather_merge_readnext(GatherMergeState *gm_state, int reader, bool nowait) |
637 | 0 | { |
638 | 0 | GMReaderTupleBuffer *tuple_buffer; |
639 | 0 | MinimalTuple tup; |
640 | | |
641 | | /* |
642 | | * If we're being asked to generate a tuple from the leader, then we just |
643 | | * call ExecProcNode as normal to produce one. |
644 | | */ |
645 | 0 | if (reader == 0) |
646 | 0 | { |
647 | 0 | if (gm_state->need_to_scan_locally) |
648 | 0 | { |
649 | 0 | PlanState *outerPlan = outerPlanState(gm_state); |
650 | 0 | TupleTableSlot *outerTupleSlot; |
651 | 0 | EState *estate = gm_state->ps.state; |
652 | | |
653 | | /* Install our DSA area while executing the plan. */ |
654 | 0 | estate->es_query_dsa = gm_state->pei ? gm_state->pei->area : NULL; |
655 | 0 | outerTupleSlot = ExecProcNode(outerPlan); |
656 | 0 | estate->es_query_dsa = NULL; |
657 | |
|
658 | 0 | if (!TupIsNull(outerTupleSlot)) |
659 | 0 | { |
660 | 0 | gm_state->gm_slots[0] = outerTupleSlot; |
661 | 0 | return true; |
662 | 0 | } |
663 | | /* need_to_scan_locally serves as "done" flag for leader */ |
664 | 0 | gm_state->need_to_scan_locally = false; |
665 | 0 | } |
666 | 0 | return false; |
667 | 0 | } |
668 | | |
669 | | /* Otherwise, check the state of the relevant tuple buffer. */ |
670 | 0 | tuple_buffer = &gm_state->gm_tuple_buffers[reader - 1]; |
671 | |
|
672 | 0 | if (tuple_buffer->nTuples > tuple_buffer->readCounter) |
673 | 0 | { |
674 | | /* Return any tuple previously read that is still buffered. */ |
675 | 0 | tup = tuple_buffer->tuple[tuple_buffer->readCounter++]; |
676 | 0 | } |
677 | 0 | else if (tuple_buffer->done) |
678 | 0 | { |
679 | | /* Reader is known to be exhausted. */ |
680 | 0 | return false; |
681 | 0 | } |
682 | 0 | else |
683 | 0 | { |
684 | | /* Read and buffer next tuple. */ |
685 | 0 | tup = gm_readnext_tuple(gm_state, |
686 | 0 | reader, |
687 | 0 | nowait, |
688 | 0 | &tuple_buffer->done); |
689 | 0 | if (!tup) |
690 | 0 | return false; |
691 | | |
692 | | /* |
693 | | * Attempt to read more tuples in nowait mode and store them in the |
694 | | * pending-tuple array for the reader. |
695 | | */ |
696 | 0 | load_tuple_array(gm_state, reader); |
697 | 0 | } |
698 | | |
699 | 0 | Assert(tup); |
700 | | |
701 | | /* Build the TupleTableSlot for the given tuple */ |
702 | 0 | ExecStoreMinimalTuple(tup, /* tuple to store */ |
703 | 0 | gm_state->gm_slots[reader], /* slot in which to |
704 | | * store the tuple */ |
705 | 0 | true); /* pfree tuple when done with it */ |
706 | |
|
707 | 0 | return true; |
708 | 0 | } |
709 | | |
710 | | /* |
711 | | * Attempt to read a tuple from given worker. |
712 | | */ |
713 | | static MinimalTuple |
714 | | gm_readnext_tuple(GatherMergeState *gm_state, int nreader, bool nowait, |
715 | | bool *done) |
716 | 0 | { |
717 | 0 | TupleQueueReader *reader; |
718 | 0 | MinimalTuple tup; |
719 | | |
720 | | /* Check for async events, particularly messages from workers. */ |
721 | 0 | CHECK_FOR_INTERRUPTS(); |
722 | | |
723 | | /* |
724 | | * Attempt to read a tuple. |
725 | | * |
726 | | * Note that TupleQueueReaderNext will just return NULL for a worker which |
727 | | * fails to initialize. We'll treat that worker as having produced no |
728 | | * tuples; WaitForParallelWorkersToFinish will error out when we get |
729 | | * there. |
730 | | */ |
731 | 0 | reader = gm_state->reader[nreader - 1]; |
732 | 0 | tup = TupleQueueReaderNext(reader, nowait, done); |
733 | | |
734 | | /* |
735 | | * Since we'll be buffering these across multiple calls, we need to make a |
736 | | * copy. |
737 | | */ |
738 | 0 | return tup ? heap_copy_minimal_tuple(tup, 0) : NULL; |
739 | 0 | } |
740 | | |
741 | | /* |
742 | | * We have one slot for each item in the heap array. We use SlotNumber |
743 | | * to store slot indexes. This doesn't actually provide any formal |
744 | | * type-safety, but it makes the code more self-documenting. |
745 | | */ |
746 | | typedef int32 SlotNumber; |
747 | | |
748 | | /* |
749 | | * Compare the tuples in the two given slots. |
750 | | */ |
751 | | static int32 |
752 | | heap_compare_slots(Datum a, Datum b, void *arg) |
753 | 0 | { |
754 | 0 | GatherMergeState *node = (GatherMergeState *) arg; |
755 | 0 | SlotNumber slot1 = DatumGetInt32(a); |
756 | 0 | SlotNumber slot2 = DatumGetInt32(b); |
757 | |
|
758 | 0 | TupleTableSlot *s1 = node->gm_slots[slot1]; |
759 | 0 | TupleTableSlot *s2 = node->gm_slots[slot2]; |
760 | 0 | int nkey; |
761 | |
|
762 | 0 | Assert(!TupIsNull(s1)); |
763 | 0 | Assert(!TupIsNull(s2)); |
764 | |
|
765 | 0 | for (nkey = 0; nkey < node->gm_nkeys; nkey++) |
766 | 0 | { |
767 | 0 | SortSupport sortKey = node->gm_sortkeys + nkey; |
768 | 0 | AttrNumber attno = sortKey->ssup_attno; |
769 | 0 | Datum datum1, |
770 | 0 | datum2; |
771 | 0 | bool isNull1, |
772 | 0 | isNull2; |
773 | 0 | int compare; |
774 | |
|
775 | 0 | datum1 = slot_getattr(s1, attno, &isNull1); |
776 | 0 | datum2 = slot_getattr(s2, attno, &isNull2); |
777 | |
|
778 | 0 | compare = ApplySortComparator(datum1, isNull1, |
779 | 0 | datum2, isNull2, |
780 | 0 | sortKey); |
781 | 0 | if (compare != 0) |
782 | 0 | { |
783 | 0 | INVERT_COMPARE_RESULT(compare); |
784 | 0 | return compare; |
785 | 0 | } |
786 | 0 | } |
787 | 0 | return 0; |
788 | 0 | } |