/src/postgres/src/backend/executor/nodeGather.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * nodeGather.c |
4 | | * Support routines for scanning a plan via multiple workers. |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * A Gather executor launches parallel workers to run multiple copies of a |
10 | | * plan. It can also run the plan itself, if the workers are not available |
11 | | * or have not started up yet. It then merges all of the results it produces |
12 | | * and the results from the workers into a single output stream. Therefore, |
13 | | * it will normally be used with a plan where running multiple copies of the |
14 | | * same plan does not produce duplicate output, such as parallel-aware |
15 | | * SeqScan. |
16 | | * |
17 | | * Alternatively, a Gather node can be configured to use just one worker |
18 | | * and the single-copy flag can be set. In this case, the Gather node will |
19 | | * run the plan in one worker and will not execute the plan itself. In |
20 | | * this case, it simply returns whatever tuples were returned by the worker. |
21 | | * If a worker cannot be obtained, then it will run the plan itself and |
22 | | * return the results. Therefore, a plan used with a single-copy Gather |
23 | | * node need not be parallel-aware. |
24 | | * |
25 | | * IDENTIFICATION |
26 | | * src/backend/executor/nodeGather.c |
27 | | * |
28 | | *------------------------------------------------------------------------- |
29 | | */ |
30 | | |
31 | | #include "postgres.h" |
32 | | |
33 | | #include "executor/execParallel.h" |
34 | | #include "executor/executor.h" |
35 | | #include "executor/nodeGather.h" |
36 | | #include "executor/tqueue.h" |
37 | | #include "miscadmin.h" |
38 | | #include "optimizer/optimizer.h" |
39 | | #include "utils/wait_event.h" |
40 | | |
41 | | |
42 | | static TupleTableSlot *ExecGather(PlanState *pstate); |
43 | | static TupleTableSlot *gather_getnext(GatherState *gatherstate); |
44 | | static MinimalTuple gather_readnext(GatherState *gatherstate); |
45 | | static void ExecShutdownGatherWorkers(GatherState *node); |
46 | | |
47 | | |
48 | | /* ---------------------------------------------------------------- |
49 | | * ExecInitGather |
50 | | * ---------------------------------------------------------------- |
51 | | */ |
52 | | GatherState * |
53 | | ExecInitGather(Gather *node, EState *estate, int eflags) |
54 | 0 | { |
55 | 0 | GatherState *gatherstate; |
56 | 0 | Plan *outerNode; |
57 | 0 | TupleDesc tupDesc; |
58 | | |
59 | | /* Gather node doesn't have innerPlan node. */ |
60 | 0 | Assert(innerPlan(node) == NULL); |
61 | | |
62 | | /* |
63 | | * create state structure |
64 | | */ |
65 | 0 | gatherstate = makeNode(GatherState); |
66 | 0 | gatherstate->ps.plan = (Plan *) node; |
67 | 0 | gatherstate->ps.state = estate; |
68 | 0 | gatherstate->ps.ExecProcNode = ExecGather; |
69 | |
|
70 | 0 | gatherstate->initialized = false; |
71 | 0 | gatherstate->need_to_scan_locally = |
72 | 0 | !node->single_copy && parallel_leader_participation; |
73 | 0 | gatherstate->tuples_needed = -1; |
74 | | |
75 | | /* |
76 | | * Miscellaneous initialization |
77 | | * |
78 | | * create expression context for node |
79 | | */ |
80 | 0 | ExecAssignExprContext(estate, &gatherstate->ps); |
81 | | |
82 | | /* |
83 | | * now initialize outer plan |
84 | | */ |
85 | 0 | outerNode = outerPlan(node); |
86 | 0 | outerPlanState(gatherstate) = ExecInitNode(outerNode, estate, eflags); |
87 | 0 | tupDesc = ExecGetResultType(outerPlanState(gatherstate)); |
88 | | |
89 | | /* |
90 | | * Leader may access ExecProcNode result directly (if |
91 | | * need_to_scan_locally), or from workers via tuple queue. So we can't |
92 | | * trivially rely on the slot type being fixed for expressions evaluated |
93 | | * within this node. |
94 | | */ |
95 | 0 | gatherstate->ps.outeropsset = true; |
96 | 0 | gatherstate->ps.outeropsfixed = false; |
97 | | |
98 | | /* |
99 | | * Initialize result type and projection. |
100 | | */ |
101 | 0 | ExecInitResultTypeTL(&gatherstate->ps); |
102 | 0 | ExecConditionalAssignProjectionInfo(&gatherstate->ps, tupDesc, OUTER_VAR); |
103 | | |
104 | | /* |
105 | | * Without projections result slot type is not trivially known, see |
106 | | * comment above. |
107 | | */ |
108 | 0 | if (gatherstate->ps.ps_ProjInfo == NULL) |
109 | 0 | { |
110 | 0 | gatherstate->ps.resultopsset = true; |
111 | 0 | gatherstate->ps.resultopsfixed = false; |
112 | 0 | } |
113 | | |
114 | | /* |
115 | | * Initialize funnel slot to same tuple descriptor as outer plan. |
116 | | */ |
117 | 0 | gatherstate->funnel_slot = ExecInitExtraTupleSlot(estate, tupDesc, |
118 | 0 | &TTSOpsMinimalTuple); |
119 | | |
120 | | /* |
121 | | * Gather doesn't support checking a qual (it's always more efficient to |
122 | | * do it in the child node). |
123 | | */ |
124 | 0 | Assert(!node->plan.qual); |
125 | |
|
126 | 0 | return gatherstate; |
127 | 0 | } |
128 | | |
129 | | /* ---------------------------------------------------------------- |
130 | | * ExecGather(node) |
131 | | * |
132 | | * Scans the relation via multiple workers and returns |
133 | | * the next qualifying tuple. |
134 | | * ---------------------------------------------------------------- |
135 | | */ |
136 | | static TupleTableSlot * |
137 | | ExecGather(PlanState *pstate) |
138 | 0 | { |
139 | 0 | GatherState *node = castNode(GatherState, pstate); |
140 | 0 | TupleTableSlot *slot; |
141 | 0 | ExprContext *econtext; |
142 | |
|
143 | 0 | CHECK_FOR_INTERRUPTS(); |
144 | | |
145 | | /* |
146 | | * Initialize the parallel context and workers on first execution. We do |
147 | | * this on first execution rather than during node initialization, as it |
148 | | * needs to allocate a large dynamic segment, so it is better to do it |
149 | | * only if it is really needed. |
150 | | */ |
151 | 0 | if (!node->initialized) |
152 | 0 | { |
153 | 0 | EState *estate = node->ps.state; |
154 | 0 | Gather *gather = (Gather *) node->ps.plan; |
155 | | |
156 | | /* |
157 | | * Sometimes we might have to run without parallelism; but if parallel |
158 | | * mode is active then we can try to fire up some workers. |
159 | | */ |
160 | 0 | if (gather->num_workers > 0 && estate->es_use_parallel_mode) |
161 | 0 | { |
162 | 0 | ParallelContext *pcxt; |
163 | | |
164 | | /* Initialize, or re-initialize, shared state needed by workers. */ |
165 | 0 | if (!node->pei) |
166 | 0 | node->pei = ExecInitParallelPlan(outerPlanState(node), |
167 | 0 | estate, |
168 | 0 | gather->initParam, |
169 | 0 | gather->num_workers, |
170 | 0 | node->tuples_needed); |
171 | 0 | else |
172 | 0 | ExecParallelReinitialize(outerPlanState(node), |
173 | 0 | node->pei, |
174 | 0 | gather->initParam); |
175 | | |
176 | | /* |
177 | | * Register backend workers. We might not get as many as we |
178 | | * requested, or indeed any at all. |
179 | | */ |
180 | 0 | pcxt = node->pei->pcxt; |
181 | 0 | LaunchParallelWorkers(pcxt); |
182 | | /* We save # workers launched for the benefit of EXPLAIN */ |
183 | 0 | node->nworkers_launched = pcxt->nworkers_launched; |
184 | | |
185 | | /* |
186 | | * Count number of workers originally wanted and actually |
187 | | * launched. |
188 | | */ |
189 | 0 | estate->es_parallel_workers_to_launch += pcxt->nworkers_to_launch; |
190 | 0 | estate->es_parallel_workers_launched += pcxt->nworkers_launched; |
191 | | |
192 | | /* Set up tuple queue readers to read the results. */ |
193 | 0 | if (pcxt->nworkers_launched > 0) |
194 | 0 | { |
195 | 0 | ExecParallelCreateReaders(node->pei); |
196 | | /* Make a working array showing the active readers */ |
197 | 0 | node->nreaders = pcxt->nworkers_launched; |
198 | 0 | node->reader = (TupleQueueReader **) |
199 | 0 | palloc(node->nreaders * sizeof(TupleQueueReader *)); |
200 | 0 | memcpy(node->reader, node->pei->reader, |
201 | 0 | node->nreaders * sizeof(TupleQueueReader *)); |
202 | 0 | } |
203 | 0 | else |
204 | 0 | { |
205 | | /* No workers? Then never mind. */ |
206 | 0 | node->nreaders = 0; |
207 | 0 | node->reader = NULL; |
208 | 0 | } |
209 | 0 | node->nextreader = 0; |
210 | 0 | } |
211 | | |
212 | | /* Run plan locally if no workers or enabled and not single-copy. */ |
213 | 0 | node->need_to_scan_locally = (node->nreaders == 0) |
214 | 0 | || (!gather->single_copy && parallel_leader_participation); |
215 | 0 | node->initialized = true; |
216 | 0 | } |
217 | | |
218 | | /* |
219 | | * Reset per-tuple memory context to free any expression evaluation |
220 | | * storage allocated in the previous tuple cycle. |
221 | | */ |
222 | 0 | econtext = node->ps.ps_ExprContext; |
223 | 0 | ResetExprContext(econtext); |
224 | | |
225 | | /* |
226 | | * Get next tuple, either from one of our workers, or by running the plan |
227 | | * ourselves. |
228 | | */ |
229 | 0 | slot = gather_getnext(node); |
230 | 0 | if (TupIsNull(slot)) |
231 | 0 | return NULL; |
232 | | |
233 | | /* If no projection is required, we're done. */ |
234 | 0 | if (node->ps.ps_ProjInfo == NULL) |
235 | 0 | return slot; |
236 | | |
237 | | /* |
238 | | * Form the result tuple using ExecProject(), and return it. |
239 | | */ |
240 | 0 | econtext->ecxt_outertuple = slot; |
241 | 0 | return ExecProject(node->ps.ps_ProjInfo); |
242 | 0 | } |
243 | | |
244 | | /* ---------------------------------------------------------------- |
245 | | * ExecEndGather |
246 | | * |
247 | | * frees any storage allocated through C routines. |
248 | | * ---------------------------------------------------------------- |
249 | | */ |
250 | | void |
251 | | ExecEndGather(GatherState *node) |
252 | 0 | { |
253 | 0 | ExecEndNode(outerPlanState(node)); /* let children clean up first */ |
254 | 0 | ExecShutdownGather(node); |
255 | 0 | } |
256 | | |
257 | | /* |
258 | | * Read the next tuple. We might fetch a tuple from one of the tuple queues |
259 | | * using gather_readnext, or if no tuple queue contains a tuple and the |
260 | | * single_copy flag is not set, we might generate one locally instead. |
261 | | */ |
262 | | static TupleTableSlot * |
263 | | gather_getnext(GatherState *gatherstate) |
264 | 0 | { |
265 | 0 | PlanState *outerPlan = outerPlanState(gatherstate); |
266 | 0 | TupleTableSlot *outerTupleSlot; |
267 | 0 | TupleTableSlot *fslot = gatherstate->funnel_slot; |
268 | 0 | MinimalTuple tup; |
269 | |
|
270 | 0 | while (gatherstate->nreaders > 0 || gatherstate->need_to_scan_locally) |
271 | 0 | { |
272 | 0 | CHECK_FOR_INTERRUPTS(); |
273 | |
|
274 | 0 | if (gatherstate->nreaders > 0) |
275 | 0 | { |
276 | 0 | tup = gather_readnext(gatherstate); |
277 | |
|
278 | 0 | if (HeapTupleIsValid(tup)) |
279 | 0 | { |
280 | 0 | ExecStoreMinimalTuple(tup, /* tuple to store */ |
281 | 0 | fslot, /* slot to store the tuple */ |
282 | 0 | false); /* don't pfree tuple */ |
283 | 0 | return fslot; |
284 | 0 | } |
285 | 0 | } |
286 | | |
287 | 0 | if (gatherstate->need_to_scan_locally) |
288 | 0 | { |
289 | 0 | EState *estate = gatherstate->ps.state; |
290 | | |
291 | | /* Install our DSA area while executing the plan. */ |
292 | 0 | estate->es_query_dsa = |
293 | 0 | gatherstate->pei ? gatherstate->pei->area : NULL; |
294 | 0 | outerTupleSlot = ExecProcNode(outerPlan); |
295 | 0 | estate->es_query_dsa = NULL; |
296 | |
|
297 | 0 | if (!TupIsNull(outerTupleSlot)) |
298 | 0 | return outerTupleSlot; |
299 | | |
300 | 0 | gatherstate->need_to_scan_locally = false; |
301 | 0 | } |
302 | 0 | } |
303 | | |
304 | 0 | return ExecClearTuple(fslot); |
305 | 0 | } |
306 | | |
307 | | /* |
308 | | * Attempt to read a tuple from one of our parallel workers. |
309 | | */ |
310 | | static MinimalTuple |
311 | | gather_readnext(GatherState *gatherstate) |
312 | 0 | { |
313 | 0 | int nvisited = 0; |
314 | |
|
315 | 0 | for (;;) |
316 | 0 | { |
317 | 0 | TupleQueueReader *reader; |
318 | 0 | MinimalTuple tup; |
319 | 0 | bool readerdone; |
320 | | |
321 | | /* Check for async events, particularly messages from workers. */ |
322 | 0 | CHECK_FOR_INTERRUPTS(); |
323 | | |
324 | | /* |
325 | | * Attempt to read a tuple, but don't block if none is available. |
326 | | * |
327 | | * Note that TupleQueueReaderNext will just return NULL for a worker |
328 | | * which fails to initialize. We'll treat that worker as having |
329 | | * produced no tuples; WaitForParallelWorkersToFinish will error out |
330 | | * when we get there. |
331 | | */ |
332 | 0 | Assert(gatherstate->nextreader < gatherstate->nreaders); |
333 | 0 | reader = gatherstate->reader[gatherstate->nextreader]; |
334 | 0 | tup = TupleQueueReaderNext(reader, true, &readerdone); |
335 | | |
336 | | /* |
337 | | * If this reader is done, remove it from our working array of active |
338 | | * readers. If all readers are done, we're outta here. |
339 | | */ |
340 | 0 | if (readerdone) |
341 | 0 | { |
342 | 0 | Assert(!tup); |
343 | 0 | --gatherstate->nreaders; |
344 | 0 | if (gatherstate->nreaders == 0) |
345 | 0 | { |
346 | 0 | ExecShutdownGatherWorkers(gatherstate); |
347 | 0 | return NULL; |
348 | 0 | } |
349 | 0 | memmove(&gatherstate->reader[gatherstate->nextreader], |
350 | 0 | &gatherstate->reader[gatherstate->nextreader + 1], |
351 | 0 | sizeof(TupleQueueReader *) |
352 | 0 | * (gatherstate->nreaders - gatherstate->nextreader)); |
353 | 0 | if (gatherstate->nextreader >= gatherstate->nreaders) |
354 | 0 | gatherstate->nextreader = 0; |
355 | 0 | continue; |
356 | 0 | } |
357 | | |
358 | | /* If we got a tuple, return it. */ |
359 | 0 | if (tup) |
360 | 0 | return tup; |
361 | | |
362 | | /* |
363 | | * Advance nextreader pointer in round-robin fashion. Note that we |
364 | | * only reach this code if we weren't able to get a tuple from the |
365 | | * current worker. We used to advance the nextreader pointer after |
366 | | * every tuple, but it turns out to be much more efficient to keep |
367 | | * reading from the same queue until that would require blocking. |
368 | | */ |
369 | 0 | gatherstate->nextreader++; |
370 | 0 | if (gatherstate->nextreader >= gatherstate->nreaders) |
371 | 0 | gatherstate->nextreader = 0; |
372 | | |
373 | | /* Have we visited every (surviving) TupleQueueReader? */ |
374 | 0 | nvisited++; |
375 | 0 | if (nvisited >= gatherstate->nreaders) |
376 | 0 | { |
377 | | /* |
378 | | * If (still) running plan locally, return NULL so caller can |
379 | | * generate another tuple from the local copy of the plan. |
380 | | */ |
381 | 0 | if (gatherstate->need_to_scan_locally) |
382 | 0 | return NULL; |
383 | | |
384 | | /* Nothing to do except wait for developments. */ |
385 | 0 | (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, |
386 | 0 | WAIT_EVENT_EXECUTE_GATHER); |
387 | 0 | ResetLatch(MyLatch); |
388 | 0 | nvisited = 0; |
389 | 0 | } |
390 | 0 | } |
391 | 0 | } |
392 | | |
393 | | /* ---------------------------------------------------------------- |
394 | | * ExecShutdownGatherWorkers |
395 | | * |
396 | | * Stop all the parallel workers. |
397 | | * ---------------------------------------------------------------- |
398 | | */ |
399 | | static void |
400 | | ExecShutdownGatherWorkers(GatherState *node) |
401 | 0 | { |
402 | 0 | if (node->pei != NULL) |
403 | 0 | ExecParallelFinish(node->pei); |
404 | | |
405 | | /* Flush local copy of reader array */ |
406 | 0 | if (node->reader) |
407 | 0 | pfree(node->reader); |
408 | 0 | node->reader = NULL; |
409 | 0 | } |
410 | | |
411 | | /* ---------------------------------------------------------------- |
412 | | * ExecShutdownGather |
413 | | * |
414 | | * Destroy the setup for parallel workers including parallel context. |
415 | | * ---------------------------------------------------------------- |
416 | | */ |
417 | | void |
418 | | ExecShutdownGather(GatherState *node) |
419 | 0 | { |
420 | 0 | ExecShutdownGatherWorkers(node); |
421 | | |
422 | | /* Now destroy the parallel context. */ |
423 | 0 | if (node->pei != NULL) |
424 | 0 | { |
425 | 0 | ExecParallelCleanup(node->pei); |
426 | 0 | node->pei = NULL; |
427 | 0 | } |
428 | 0 | } |
429 | | |
430 | | /* ---------------------------------------------------------------- |
431 | | * Join Support |
432 | | * ---------------------------------------------------------------- |
433 | | */ |
434 | | |
435 | | /* ---------------------------------------------------------------- |
436 | | * ExecReScanGather |
437 | | * |
438 | | * Prepare to re-scan the result of a Gather. |
439 | | * ---------------------------------------------------------------- |
440 | | */ |
441 | | void |
442 | | ExecReScanGather(GatherState *node) |
443 | 0 | { |
444 | 0 | Gather *gather = (Gather *) node->ps.plan; |
445 | 0 | PlanState *outerPlan = outerPlanState(node); |
446 | | |
447 | | /* Make sure any existing workers are gracefully shut down */ |
448 | 0 | ExecShutdownGatherWorkers(node); |
449 | | |
450 | | /* Mark node so that shared state will be rebuilt at next call */ |
451 | 0 | node->initialized = false; |
452 | | |
453 | | /* |
454 | | * Set child node's chgParam to tell it that the next scan might deliver a |
455 | | * different set of rows within the leader process. (The overall rowset |
456 | | * shouldn't change, but the leader process's subset might; hence nodes |
457 | | * between here and the parallel table scan node mustn't optimize on the |
458 | | * assumption of an unchanging rowset.) |
459 | | */ |
460 | 0 | if (gather->rescan_param >= 0) |
461 | 0 | outerPlan->chgParam = bms_add_member(outerPlan->chgParam, |
462 | 0 | gather->rescan_param); |
463 | | |
464 | | /* |
465 | | * If chgParam of subnode is not null then plan will be re-scanned by |
466 | | * first ExecProcNode. Note: because this does nothing if we have a |
467 | | * rescan_param, it's currently guaranteed that parallel-aware child nodes |
468 | | * will not see a ReScan call until after they get a ReInitializeDSM call. |
469 | | * That ordering might not be something to rely on, though. A good rule |
470 | | * of thumb is that ReInitializeDSM should reset only shared state, ReScan |
471 | | * should reset only local state, and anything that depends on both of |
472 | | * those steps being finished must wait until the first ExecProcNode call. |
473 | | */ |
474 | 0 | if (outerPlan->chgParam == NULL) |
475 | 0 | ExecReScan(outerPlan); |
476 | 0 | } |