/src/postgres/src/backend/executor/nodeAppend.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * nodeAppend.c |
4 | | * routines to handle append nodes. |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/executor/nodeAppend.c |
12 | | * |
13 | | *------------------------------------------------------------------------- |
14 | | */ |
15 | | /* INTERFACE ROUTINES |
16 | | * ExecInitAppend - initialize the append node |
17 | | * ExecAppend - retrieve the next tuple from the node |
18 | | * ExecEndAppend - shut down the append node |
19 | | * ExecReScanAppend - rescan the append node |
20 | | * |
21 | | * NOTES |
22 | | * Each append node contains a list of one or more subplans which |
23 | | * must be iteratively processed (forwards or backwards). |
24 | | * Tuples are retrieved by executing the 'whichplan'th subplan |
25 | | * until the subplan stops returning tuples, at which point that |
26 | | * plan is shut down and the next started up. |
27 | | * |
28 | | * Append nodes don't make use of their left and right |
29 | | * subtrees, rather they maintain a list of subplans so |
30 | | * a typical append node looks like this in the plan tree: |
31 | | * |
32 | | * ... |
33 | | * / |
34 | | * Append -------+------+------+--- nil |
35 | | * / \ | | | |
36 | | * nil nil ... ... ... |
37 | | * subplans |
38 | | * |
39 | | * Append nodes are currently used for unions, and to support |
40 | | * inheritance queries, where several relations need to be scanned. |
41 | | * For example, in our standard person/student/employee/student-emp |
42 | | * example, where student and employee inherit from person |
43 | | * and student-emp inherits from student and employee, the |
44 | | * query: |
45 | | * |
46 | | * select name from person |
47 | | * |
48 | | * generates the plan: |
49 | | * |
50 | | * | |
51 | | * Append -------+-------+--------+--------+ |
52 | | * / \ | | | | |
53 | | * nil nil Scan Scan Scan Scan |
54 | | * | | | | |
55 | | * person employee student student-emp |
56 | | */ |
57 | | |
58 | | #include "postgres.h" |
59 | | |
60 | | #include "executor/execAsync.h" |
61 | | #include "executor/execPartition.h" |
62 | | #include "executor/executor.h" |
63 | | #include "executor/nodeAppend.h" |
64 | | #include "miscadmin.h" |
65 | | #include "pgstat.h" |
66 | | #include "storage/latch.h" |
67 | | |
68 | | /* Shared state for parallel-aware Append. */ |
69 | | struct ParallelAppendState |
70 | | { |
71 | | LWLock pa_lock; /* mutual exclusion to choose next subplan */ |
72 | | int pa_next_plan; /* next plan to choose by any worker */ |
73 | | |
74 | | /* |
75 | | * pa_finished[i] should be true if no more workers should select subplan |
76 | | * i. for a non-partial plan, this should be set to true as soon as a |
77 | | * worker selects the plan; for a partial plan, it remains false until |
78 | | * some worker executes the plan to completion. |
79 | | */ |
80 | | bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; |
81 | | }; |
82 | | |
83 | 0 | #define INVALID_SUBPLAN_INDEX -1 |
84 | 0 | #define EVENT_BUFFER_SIZE 16 |
85 | | |
86 | | static TupleTableSlot *ExecAppend(PlanState *pstate); |
87 | | static bool choose_next_subplan_locally(AppendState *node); |
88 | | static bool choose_next_subplan_for_leader(AppendState *node); |
89 | | static bool choose_next_subplan_for_worker(AppendState *node); |
90 | | static void mark_invalid_subplans_as_finished(AppendState *node); |
91 | | static void ExecAppendAsyncBegin(AppendState *node); |
92 | | static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result); |
93 | | static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result); |
94 | | static void ExecAppendAsyncEventWait(AppendState *node); |
95 | | static void classify_matching_subplans(AppendState *node); |
96 | | |
97 | | /* ---------------------------------------------------------------- |
98 | | * ExecInitAppend |
99 | | * |
100 | | * Begin all of the subscans of the append node. |
101 | | * |
102 | | * (This is potentially wasteful, since the entire result of the |
103 | | * append node may not be scanned, but this way all of the |
104 | | * structures get allocated in the executor's top level memory |
105 | | * block instead of that of the call to ExecAppend.) |
106 | | * ---------------------------------------------------------------- |
107 | | */ |
108 | | AppendState * |
109 | | ExecInitAppend(Append *node, EState *estate, int eflags) |
110 | 0 | { |
111 | 0 | AppendState *appendstate = makeNode(AppendState); |
112 | 0 | PlanState **appendplanstates; |
113 | 0 | const TupleTableSlotOps *appendops; |
114 | 0 | Bitmapset *validsubplans; |
115 | 0 | Bitmapset *asyncplans; |
116 | 0 | int nplans; |
117 | 0 | int nasyncplans; |
118 | 0 | int firstvalid; |
119 | 0 | int i, |
120 | 0 | j; |
121 | | |
122 | | /* check for unsupported flags */ |
123 | 0 | Assert(!(eflags & EXEC_FLAG_MARK)); |
124 | | |
125 | | /* |
126 | | * create new AppendState for our append node |
127 | | */ |
128 | 0 | appendstate->ps.plan = (Plan *) node; |
129 | 0 | appendstate->ps.state = estate; |
130 | 0 | appendstate->ps.ExecProcNode = ExecAppend; |
131 | | |
132 | | /* Let choose_next_subplan_* function handle setting the first subplan */ |
133 | 0 | appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; |
134 | 0 | appendstate->as_syncdone = false; |
135 | 0 | appendstate->as_begun = false; |
136 | | |
137 | | /* If run-time partition pruning is enabled, then set that up now */ |
138 | 0 | if (node->part_prune_index >= 0) |
139 | 0 | { |
140 | 0 | PartitionPruneState *prunestate; |
141 | | |
142 | | /* |
143 | | * Set up pruning data structure. This also initializes the set of |
144 | | * subplans to initialize (validsubplans) by taking into account the |
145 | | * result of performing initial pruning if any. |
146 | | */ |
147 | 0 | prunestate = ExecInitPartitionExecPruning(&appendstate->ps, |
148 | 0 | list_length(node->appendplans), |
149 | 0 | node->part_prune_index, |
150 | 0 | node->apprelids, |
151 | 0 | &validsubplans); |
152 | 0 | appendstate->as_prune_state = prunestate; |
153 | 0 | nplans = bms_num_members(validsubplans); |
154 | | |
155 | | /* |
156 | | * When no run-time pruning is required and there's at least one |
157 | | * subplan, we can fill as_valid_subplans immediately, preventing |
158 | | * later calls to ExecFindMatchingSubPlans. |
159 | | */ |
160 | 0 | if (!prunestate->do_exec_prune && nplans > 0) |
161 | 0 | { |
162 | 0 | appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1); |
163 | 0 | appendstate->as_valid_subplans_identified = true; |
164 | 0 | } |
165 | 0 | } |
166 | 0 | else |
167 | 0 | { |
168 | 0 | nplans = list_length(node->appendplans); |
169 | | |
170 | | /* |
171 | | * When run-time partition pruning is not enabled we can just mark all |
172 | | * subplans as valid; they must also all be initialized. |
173 | | */ |
174 | 0 | Assert(nplans > 0); |
175 | 0 | appendstate->as_valid_subplans = validsubplans = |
176 | 0 | bms_add_range(NULL, 0, nplans - 1); |
177 | 0 | appendstate->as_valid_subplans_identified = true; |
178 | 0 | appendstate->as_prune_state = NULL; |
179 | 0 | } |
180 | |
|
181 | 0 | appendplanstates = (PlanState **) palloc(nplans * |
182 | 0 | sizeof(PlanState *)); |
183 | | |
184 | | /* |
185 | | * call ExecInitNode on each of the valid plans to be executed and save |
186 | | * the results into the appendplanstates array. |
187 | | * |
188 | | * While at it, find out the first valid partial plan. |
189 | | */ |
190 | 0 | j = 0; |
191 | 0 | asyncplans = NULL; |
192 | 0 | nasyncplans = 0; |
193 | 0 | firstvalid = nplans; |
194 | 0 | i = -1; |
195 | 0 | while ((i = bms_next_member(validsubplans, i)) >= 0) |
196 | 0 | { |
197 | 0 | Plan *initNode = (Plan *) list_nth(node->appendplans, i); |
198 | | |
199 | | /* |
200 | | * Record async subplans. When executing EvalPlanQual, we treat them |
201 | | * as sync ones; don't do this when initializing an EvalPlanQual plan |
202 | | * tree. |
203 | | */ |
204 | 0 | if (initNode->async_capable && estate->es_epq_active == NULL) |
205 | 0 | { |
206 | 0 | asyncplans = bms_add_member(asyncplans, j); |
207 | 0 | nasyncplans++; |
208 | 0 | } |
209 | | |
210 | | /* |
211 | | * Record the lowest appendplans index which is a valid partial plan. |
212 | | */ |
213 | 0 | if (i >= node->first_partial_plan && j < firstvalid) |
214 | 0 | firstvalid = j; |
215 | |
|
216 | 0 | appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); |
217 | 0 | } |
218 | |
|
219 | 0 | appendstate->as_first_partial_plan = firstvalid; |
220 | 0 | appendstate->appendplans = appendplanstates; |
221 | 0 | appendstate->as_nplans = nplans; |
222 | | |
223 | | /* |
224 | | * Initialize Append's result tuple type and slot. If the child plans all |
225 | | * produce the same fixed slot type, we can use that slot type; otherwise |
226 | | * make a virtual slot. (Note that the result slot itself is used only to |
227 | | * return a null tuple at end of execution; real tuples are returned to |
228 | | * the caller in the children's own result slots. What we are doing here |
229 | | * is allowing the parent plan node to optimize if the Append will return |
230 | | * only one kind of slot.) |
231 | | */ |
232 | 0 | appendops = ExecGetCommonSlotOps(appendplanstates, j); |
233 | 0 | if (appendops != NULL) |
234 | 0 | { |
235 | 0 | ExecInitResultTupleSlotTL(&appendstate->ps, appendops); |
236 | 0 | } |
237 | 0 | else |
238 | 0 | { |
239 | 0 | ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual); |
240 | | /* show that the output slot type is not fixed */ |
241 | 0 | appendstate->ps.resultopsset = true; |
242 | 0 | appendstate->ps.resultopsfixed = false; |
243 | 0 | } |
244 | | |
245 | | /* Initialize async state */ |
246 | 0 | appendstate->as_asyncplans = asyncplans; |
247 | 0 | appendstate->as_nasyncplans = nasyncplans; |
248 | 0 | appendstate->as_asyncrequests = NULL; |
249 | 0 | appendstate->as_asyncresults = NULL; |
250 | 0 | appendstate->as_nasyncresults = 0; |
251 | 0 | appendstate->as_nasyncremain = 0; |
252 | 0 | appendstate->as_needrequest = NULL; |
253 | 0 | appendstate->as_eventset = NULL; |
254 | 0 | appendstate->as_valid_asyncplans = NULL; |
255 | |
|
256 | 0 | if (nasyncplans > 0) |
257 | 0 | { |
258 | 0 | appendstate->as_asyncrequests = (AsyncRequest **) |
259 | 0 | palloc0(nplans * sizeof(AsyncRequest *)); |
260 | |
|
261 | 0 | i = -1; |
262 | 0 | while ((i = bms_next_member(asyncplans, i)) >= 0) |
263 | 0 | { |
264 | 0 | AsyncRequest *areq; |
265 | |
|
266 | 0 | areq = palloc(sizeof(AsyncRequest)); |
267 | 0 | areq->requestor = (PlanState *) appendstate; |
268 | 0 | areq->requestee = appendplanstates[i]; |
269 | 0 | areq->request_index = i; |
270 | 0 | areq->callback_pending = false; |
271 | 0 | areq->request_complete = false; |
272 | 0 | areq->result = NULL; |
273 | |
|
274 | 0 | appendstate->as_asyncrequests[i] = areq; |
275 | 0 | } |
276 | |
|
277 | 0 | appendstate->as_asyncresults = (TupleTableSlot **) |
278 | 0 | palloc0(nasyncplans * sizeof(TupleTableSlot *)); |
279 | |
|
280 | 0 | if (appendstate->as_valid_subplans_identified) |
281 | 0 | classify_matching_subplans(appendstate); |
282 | 0 | } |
283 | | |
284 | | /* |
285 | | * Miscellaneous initialization |
286 | | */ |
287 | |
|
288 | 0 | appendstate->ps.ps_ProjInfo = NULL; |
289 | | |
290 | | /* For parallel query, this will be overridden later. */ |
291 | 0 | appendstate->choose_next_subplan = choose_next_subplan_locally; |
292 | |
|
293 | 0 | return appendstate; |
294 | 0 | } |
295 | | |
296 | | /* ---------------------------------------------------------------- |
297 | | * ExecAppend |
298 | | * |
299 | | * Handles iteration over multiple subplans. |
300 | | * ---------------------------------------------------------------- |
301 | | */ |
302 | | static TupleTableSlot * |
303 | | ExecAppend(PlanState *pstate) |
304 | 0 | { |
305 | 0 | AppendState *node = castNode(AppendState, pstate); |
306 | 0 | TupleTableSlot *result; |
307 | | |
308 | | /* |
309 | | * If this is the first call after Init or ReScan, we need to do the |
310 | | * initialization work. |
311 | | */ |
312 | 0 | if (!node->as_begun) |
313 | 0 | { |
314 | 0 | Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX); |
315 | 0 | Assert(!node->as_syncdone); |
316 | | |
317 | | /* Nothing to do if there are no subplans */ |
318 | 0 | if (node->as_nplans == 0) |
319 | 0 | return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
320 | | |
321 | | /* If there are any async subplans, begin executing them. */ |
322 | 0 | if (node->as_nasyncplans > 0) |
323 | 0 | ExecAppendAsyncBegin(node); |
324 | | |
325 | | /* |
326 | | * If no sync subplan has been chosen, we must choose one before |
327 | | * proceeding. |
328 | | */ |
329 | 0 | if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) |
330 | 0 | return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
331 | | |
332 | 0 | Assert(node->as_syncdone || |
333 | 0 | (node->as_whichplan >= 0 && |
334 | 0 | node->as_whichplan < node->as_nplans)); |
335 | | |
336 | | /* And we're initialized. */ |
337 | 0 | node->as_begun = true; |
338 | 0 | } |
339 | | |
340 | 0 | for (;;) |
341 | 0 | { |
342 | 0 | PlanState *subnode; |
343 | |
|
344 | 0 | CHECK_FOR_INTERRUPTS(); |
345 | | |
346 | | /* |
347 | | * try to get a tuple from an async subplan if any |
348 | | */ |
349 | 0 | if (node->as_syncdone || !bms_is_empty(node->as_needrequest)) |
350 | 0 | { |
351 | 0 | if (ExecAppendAsyncGetNext(node, &result)) |
352 | 0 | return result; |
353 | 0 | Assert(!node->as_syncdone); |
354 | 0 | Assert(bms_is_empty(node->as_needrequest)); |
355 | 0 | } |
356 | | |
357 | | /* |
358 | | * figure out which sync subplan we are currently processing |
359 | | */ |
360 | 0 | Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); |
361 | 0 | subnode = node->appendplans[node->as_whichplan]; |
362 | | |
363 | | /* |
364 | | * get a tuple from the subplan |
365 | | */ |
366 | 0 | result = ExecProcNode(subnode); |
367 | |
|
368 | 0 | if (!TupIsNull(result)) |
369 | 0 | { |
370 | | /* |
371 | | * If the subplan gave us something then return it as-is. We do |
372 | | * NOT make use of the result slot that was set up in |
373 | | * ExecInitAppend; there's no need for it. |
374 | | */ |
375 | 0 | return result; |
376 | 0 | } |
377 | | |
378 | | /* |
379 | | * wait or poll for async events if any. We do this before checking |
380 | | * for the end of iteration, because it might drain the remaining |
381 | | * async subplans. |
382 | | */ |
383 | 0 | if (node->as_nasyncremain > 0) |
384 | 0 | ExecAppendAsyncEventWait(node); |
385 | | |
386 | | /* choose new sync subplan; if no sync/async subplans, we're done */ |
387 | 0 | if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) |
388 | 0 | return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
389 | 0 | } |
390 | 0 | } |
391 | | |
392 | | /* ---------------------------------------------------------------- |
393 | | * ExecEndAppend |
394 | | * |
395 | | * Shuts down the subscans of the append node. |
396 | | * |
397 | | * Returns nothing of interest. |
398 | | * ---------------------------------------------------------------- |
399 | | */ |
400 | | void |
401 | | ExecEndAppend(AppendState *node) |
402 | 0 | { |
403 | 0 | PlanState **appendplans; |
404 | 0 | int nplans; |
405 | 0 | int i; |
406 | | |
407 | | /* |
408 | | * get information from the node |
409 | | */ |
410 | 0 | appendplans = node->appendplans; |
411 | 0 | nplans = node->as_nplans; |
412 | | |
413 | | /* |
414 | | * shut down each of the subscans |
415 | | */ |
416 | 0 | for (i = 0; i < nplans; i++) |
417 | 0 | ExecEndNode(appendplans[i]); |
418 | 0 | } |
419 | | |
420 | | void |
421 | | ExecReScanAppend(AppendState *node) |
422 | 0 | { |
423 | 0 | int nasyncplans = node->as_nasyncplans; |
424 | 0 | int i; |
425 | | |
426 | | /* |
427 | | * If any PARAM_EXEC Params used in pruning expressions have changed, then |
428 | | * we'd better unset the valid subplans so that they are reselected for |
429 | | * the new parameter values. |
430 | | */ |
431 | 0 | if (node->as_prune_state && |
432 | 0 | bms_overlap(node->ps.chgParam, |
433 | 0 | node->as_prune_state->execparamids)) |
434 | 0 | { |
435 | 0 | node->as_valid_subplans_identified = false; |
436 | 0 | bms_free(node->as_valid_subplans); |
437 | 0 | node->as_valid_subplans = NULL; |
438 | 0 | bms_free(node->as_valid_asyncplans); |
439 | 0 | node->as_valid_asyncplans = NULL; |
440 | 0 | } |
441 | |
|
442 | 0 | for (i = 0; i < node->as_nplans; i++) |
443 | 0 | { |
444 | 0 | PlanState *subnode = node->appendplans[i]; |
445 | | |
446 | | /* |
447 | | * ExecReScan doesn't know about my subplans, so I have to do |
448 | | * changed-parameter signaling myself. |
449 | | */ |
450 | 0 | if (node->ps.chgParam != NULL) |
451 | 0 | UpdateChangedParamSet(subnode, node->ps.chgParam); |
452 | | |
453 | | /* |
454 | | * If chgParam of subnode is not null then plan will be re-scanned by |
455 | | * first ExecProcNode or by first ExecAsyncRequest. |
456 | | */ |
457 | 0 | if (subnode->chgParam == NULL) |
458 | 0 | ExecReScan(subnode); |
459 | 0 | } |
460 | | |
461 | | /* Reset async state */ |
462 | 0 | if (nasyncplans > 0) |
463 | 0 | { |
464 | 0 | i = -1; |
465 | 0 | while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) |
466 | 0 | { |
467 | 0 | AsyncRequest *areq = node->as_asyncrequests[i]; |
468 | |
|
469 | 0 | areq->callback_pending = false; |
470 | 0 | areq->request_complete = false; |
471 | 0 | areq->result = NULL; |
472 | 0 | } |
473 | |
|
474 | 0 | node->as_nasyncresults = 0; |
475 | 0 | node->as_nasyncremain = 0; |
476 | 0 | bms_free(node->as_needrequest); |
477 | 0 | node->as_needrequest = NULL; |
478 | 0 | } |
479 | | |
480 | | /* Let choose_next_subplan_* function handle setting the first subplan */ |
481 | 0 | node->as_whichplan = INVALID_SUBPLAN_INDEX; |
482 | 0 | node->as_syncdone = false; |
483 | 0 | node->as_begun = false; |
484 | 0 | } |
485 | | |
486 | | /* ---------------------------------------------------------------- |
487 | | * Parallel Append Support |
488 | | * ---------------------------------------------------------------- |
489 | | */ |
490 | | |
491 | | /* ---------------------------------------------------------------- |
492 | | * ExecAppendEstimate |
493 | | * |
494 | | * Compute the amount of space we'll need in the parallel |
495 | | * query DSM, and inform pcxt->estimator about our needs. |
496 | | * ---------------------------------------------------------------- |
497 | | */ |
498 | | void |
499 | | ExecAppendEstimate(AppendState *node, |
500 | | ParallelContext *pcxt) |
501 | 0 | { |
502 | 0 | node->pstate_len = |
503 | 0 | add_size(offsetof(ParallelAppendState, pa_finished), |
504 | 0 | sizeof(bool) * node->as_nplans); |
505 | |
|
506 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); |
507 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
508 | 0 | } |
509 | | |
510 | | |
511 | | /* ---------------------------------------------------------------- |
512 | | * ExecAppendInitializeDSM |
513 | | * |
514 | | * Set up shared state for Parallel Append. |
515 | | * ---------------------------------------------------------------- |
516 | | */ |
517 | | void |
518 | | ExecAppendInitializeDSM(AppendState *node, |
519 | | ParallelContext *pcxt) |
520 | 0 | { |
521 | 0 | ParallelAppendState *pstate; |
522 | |
|
523 | 0 | pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); |
524 | 0 | memset(pstate, 0, node->pstate_len); |
525 | 0 | LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); |
526 | 0 | shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); |
527 | |
|
528 | 0 | node->as_pstate = pstate; |
529 | 0 | node->choose_next_subplan = choose_next_subplan_for_leader; |
530 | 0 | } |
531 | | |
532 | | /* ---------------------------------------------------------------- |
533 | | * ExecAppendReInitializeDSM |
534 | | * |
535 | | * Reset shared state before beginning a fresh scan. |
536 | | * ---------------------------------------------------------------- |
537 | | */ |
538 | | void |
539 | | ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) |
540 | 0 | { |
541 | 0 | ParallelAppendState *pstate = node->as_pstate; |
542 | |
|
543 | 0 | pstate->pa_next_plan = 0; |
544 | 0 | memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); |
545 | 0 | } |
546 | | |
547 | | /* ---------------------------------------------------------------- |
548 | | * ExecAppendInitializeWorker |
549 | | * |
550 | | * Copy relevant information from TOC into planstate, and initialize |
551 | | * whatever is required to choose and execute the optimal subplan. |
552 | | * ---------------------------------------------------------------- |
553 | | */ |
554 | | void |
555 | | ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) |
556 | 0 | { |
557 | 0 | node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); |
558 | 0 | node->choose_next_subplan = choose_next_subplan_for_worker; |
559 | 0 | } |
560 | | |
561 | | /* ---------------------------------------------------------------- |
562 | | * choose_next_subplan_locally |
563 | | * |
564 | | * Choose next sync subplan for a non-parallel-aware Append, |
565 | | * returning false if there are no more. |
566 | | * ---------------------------------------------------------------- |
567 | | */ |
568 | | static bool |
569 | | choose_next_subplan_locally(AppendState *node) |
570 | 0 | { |
571 | 0 | int whichplan = node->as_whichplan; |
572 | 0 | int nextplan; |
573 | | |
574 | | /* We should never be called when there are no subplans */ |
575 | 0 | Assert(node->as_nplans > 0); |
576 | | |
577 | | /* Nothing to do if syncdone */ |
578 | 0 | if (node->as_syncdone) |
579 | 0 | return false; |
580 | | |
581 | | /* |
582 | | * If first call then have the bms member function choose the first valid |
583 | | * sync subplan by initializing whichplan to -1. If there happen to be no |
584 | | * valid sync subplans then the bms member function will handle that by |
585 | | * returning a negative number which will allow us to exit returning a |
586 | | * false value. |
587 | | */ |
588 | 0 | if (whichplan == INVALID_SUBPLAN_INDEX) |
589 | 0 | { |
590 | 0 | if (node->as_nasyncplans > 0) |
591 | 0 | { |
592 | | /* We'd have filled as_valid_subplans already */ |
593 | 0 | Assert(node->as_valid_subplans_identified); |
594 | 0 | } |
595 | 0 | else if (!node->as_valid_subplans_identified) |
596 | 0 | { |
597 | 0 | node->as_valid_subplans = |
598 | 0 | ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); |
599 | 0 | node->as_valid_subplans_identified = true; |
600 | 0 | } |
601 | |
|
602 | 0 | whichplan = -1; |
603 | 0 | } |
604 | | |
605 | | /* Ensure whichplan is within the expected range */ |
606 | 0 | Assert(whichplan >= -1 && whichplan <= node->as_nplans); |
607 | |
|
608 | 0 | if (ScanDirectionIsForward(node->ps.state->es_direction)) |
609 | 0 | nextplan = bms_next_member(node->as_valid_subplans, whichplan); |
610 | 0 | else |
611 | 0 | nextplan = bms_prev_member(node->as_valid_subplans, whichplan); |
612 | |
|
613 | 0 | if (nextplan < 0) |
614 | 0 | { |
615 | | /* Set as_syncdone if in async mode */ |
616 | 0 | if (node->as_nasyncplans > 0) |
617 | 0 | node->as_syncdone = true; |
618 | 0 | return false; |
619 | 0 | } |
620 | | |
621 | 0 | node->as_whichplan = nextplan; |
622 | |
|
623 | 0 | return true; |
624 | 0 | } |
625 | | |
626 | | /* ---------------------------------------------------------------- |
627 | | * choose_next_subplan_for_leader |
628 | | * |
629 | | * Try to pick a plan which doesn't commit us to doing much |
630 | | * work locally, so that as much work as possible is done in |
631 | | * the workers. Cheapest subplans are at the end. |
632 | | * ---------------------------------------------------------------- |
633 | | */ |
634 | | static bool |
635 | | choose_next_subplan_for_leader(AppendState *node) |
636 | 0 | { |
637 | 0 | ParallelAppendState *pstate = node->as_pstate; |
638 | | |
639 | | /* Backward scan is not supported by parallel-aware plans */ |
640 | 0 | Assert(ScanDirectionIsForward(node->ps.state->es_direction)); |
641 | | |
642 | | /* We should never be called when there are no subplans */ |
643 | 0 | Assert(node->as_nplans > 0); |
644 | |
|
645 | 0 | LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); |
646 | |
|
647 | 0 | if (node->as_whichplan != INVALID_SUBPLAN_INDEX) |
648 | 0 | { |
649 | | /* Mark just-completed subplan as finished. */ |
650 | 0 | node->as_pstate->pa_finished[node->as_whichplan] = true; |
651 | 0 | } |
652 | 0 | else |
653 | 0 | { |
654 | | /* Start with last subplan. */ |
655 | 0 | node->as_whichplan = node->as_nplans - 1; |
656 | | |
657 | | /* |
658 | | * If we've yet to determine the valid subplans then do so now. If |
659 | | * run-time pruning is disabled then the valid subplans will always be |
660 | | * set to all subplans. |
661 | | */ |
662 | 0 | if (!node->as_valid_subplans_identified) |
663 | 0 | { |
664 | 0 | node->as_valid_subplans = |
665 | 0 | ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); |
666 | 0 | node->as_valid_subplans_identified = true; |
667 | | |
668 | | /* |
669 | | * Mark each invalid plan as finished to allow the loop below to |
670 | | * select the first valid subplan. |
671 | | */ |
672 | 0 | mark_invalid_subplans_as_finished(node); |
673 | 0 | } |
674 | 0 | } |
675 | | |
676 | | /* Loop until we find a subplan to execute. */ |
677 | 0 | while (pstate->pa_finished[node->as_whichplan]) |
678 | 0 | { |
679 | 0 | if (node->as_whichplan == 0) |
680 | 0 | { |
681 | 0 | pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; |
682 | 0 | node->as_whichplan = INVALID_SUBPLAN_INDEX; |
683 | 0 | LWLockRelease(&pstate->pa_lock); |
684 | 0 | return false; |
685 | 0 | } |
686 | | |
687 | | /* |
688 | | * We needn't pay attention to as_valid_subplans here as all invalid |
689 | | * plans have been marked as finished. |
690 | | */ |
691 | 0 | node->as_whichplan--; |
692 | 0 | } |
693 | | |
694 | | /* If non-partial, immediately mark as finished. */ |
695 | 0 | if (node->as_whichplan < node->as_first_partial_plan) |
696 | 0 | node->as_pstate->pa_finished[node->as_whichplan] = true; |
697 | |
|
698 | 0 | LWLockRelease(&pstate->pa_lock); |
699 | |
|
700 | 0 | return true; |
701 | 0 | } |
702 | | |
703 | | /* ---------------------------------------------------------------- |
704 | | * choose_next_subplan_for_worker |
705 | | * |
706 | | * Choose next subplan for a parallel-aware Append, returning |
707 | | * false if there are no more. |
708 | | * |
709 | | * We start from the first plan and advance through the list; |
710 | | * when we get back to the end, we loop back to the first |
711 | | * partial plan. This assigns the non-partial plans first in |
712 | | * order of descending cost and then spreads out the workers |
713 | | * as evenly as possible across the remaining partial plans. |
714 | | * ---------------------------------------------------------------- |
715 | | */ |
716 | | static bool |
717 | | choose_next_subplan_for_worker(AppendState *node) |
718 | 0 | { |
719 | 0 | ParallelAppendState *pstate = node->as_pstate; |
720 | | |
721 | | /* Backward scan is not supported by parallel-aware plans */ |
722 | 0 | Assert(ScanDirectionIsForward(node->ps.state->es_direction)); |
723 | | |
724 | | /* We should never be called when there are no subplans */ |
725 | 0 | Assert(node->as_nplans > 0); |
726 | |
|
727 | 0 | LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); |
728 | | |
729 | | /* Mark just-completed subplan as finished. */ |
730 | 0 | if (node->as_whichplan != INVALID_SUBPLAN_INDEX) |
731 | 0 | node->as_pstate->pa_finished[node->as_whichplan] = true; |
732 | | |
733 | | /* |
734 | | * If we've yet to determine the valid subplans then do so now. If |
735 | | * run-time pruning is disabled then the valid subplans will always be set |
736 | | * to all subplans. |
737 | | */ |
738 | 0 | else if (!node->as_valid_subplans_identified) |
739 | 0 | { |
740 | 0 | node->as_valid_subplans = |
741 | 0 | ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); |
742 | 0 | node->as_valid_subplans_identified = true; |
743 | |
|
744 | 0 | mark_invalid_subplans_as_finished(node); |
745 | 0 | } |
746 | | |
747 | | /* If all the plans are already done, we have nothing to do */ |
748 | 0 | if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX) |
749 | 0 | { |
750 | 0 | LWLockRelease(&pstate->pa_lock); |
751 | 0 | return false; |
752 | 0 | } |
753 | | |
754 | | /* Save the plan from which we are starting the search. */ |
755 | 0 | node->as_whichplan = pstate->pa_next_plan; |
756 | | |
757 | | /* Loop until we find a valid subplan to execute. */ |
758 | 0 | while (pstate->pa_finished[pstate->pa_next_plan]) |
759 | 0 | { |
760 | 0 | int nextplan; |
761 | |
|
762 | 0 | nextplan = bms_next_member(node->as_valid_subplans, |
763 | 0 | pstate->pa_next_plan); |
764 | 0 | if (nextplan >= 0) |
765 | 0 | { |
766 | | /* Advance to the next valid plan. */ |
767 | 0 | pstate->pa_next_plan = nextplan; |
768 | 0 | } |
769 | 0 | else if (node->as_whichplan > node->as_first_partial_plan) |
770 | 0 | { |
771 | | /* |
772 | | * Try looping back to the first valid partial plan, if there is |
773 | | * one. If there isn't, arrange to bail out below. |
774 | | */ |
775 | 0 | nextplan = bms_next_member(node->as_valid_subplans, |
776 | 0 | node->as_first_partial_plan - 1); |
777 | 0 | pstate->pa_next_plan = |
778 | 0 | nextplan < 0 ? node->as_whichplan : nextplan; |
779 | 0 | } |
780 | 0 | else |
781 | 0 | { |
782 | | /* |
783 | | * At last plan, and either there are no partial plans or we've |
784 | | * tried them all. Arrange to bail out. |
785 | | */ |
786 | 0 | pstate->pa_next_plan = node->as_whichplan; |
787 | 0 | } |
788 | |
|
789 | 0 | if (pstate->pa_next_plan == node->as_whichplan) |
790 | 0 | { |
791 | | /* We've tried everything! */ |
792 | 0 | pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; |
793 | 0 | LWLockRelease(&pstate->pa_lock); |
794 | 0 | return false; |
795 | 0 | } |
796 | 0 | } |
797 | | |
798 | | /* Pick the plan we found, and advance pa_next_plan one more time. */ |
799 | 0 | node->as_whichplan = pstate->pa_next_plan; |
800 | 0 | pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, |
801 | 0 | pstate->pa_next_plan); |
802 | | |
803 | | /* |
804 | | * If there are no more valid plans then try setting the next plan to the |
805 | | * first valid partial plan. |
806 | | */ |
807 | 0 | if (pstate->pa_next_plan < 0) |
808 | 0 | { |
809 | 0 | int nextplan = bms_next_member(node->as_valid_subplans, |
810 | 0 | node->as_first_partial_plan - 1); |
811 | |
|
812 | 0 | if (nextplan >= 0) |
813 | 0 | pstate->pa_next_plan = nextplan; |
814 | 0 | else |
815 | 0 | { |
816 | | /* |
817 | | * There are no valid partial plans, and we already chose the last |
818 | | * non-partial plan; so flag that there's nothing more for our |
819 | | * fellow workers to do. |
820 | | */ |
821 | 0 | pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; |
822 | 0 | } |
823 | 0 | } |
824 | | |
825 | | /* If non-partial, immediately mark as finished. */ |
826 | 0 | if (node->as_whichplan < node->as_first_partial_plan) |
827 | 0 | node->as_pstate->pa_finished[node->as_whichplan] = true; |
828 | |
|
829 | 0 | LWLockRelease(&pstate->pa_lock); |
830 | |
|
831 | 0 | return true; |
832 | 0 | } |
833 | | |
834 | | /* |
835 | | * mark_invalid_subplans_as_finished |
836 | | * Marks the ParallelAppendState's pa_finished as true for each invalid |
837 | | * subplan. |
838 | | * |
839 | | * This function should only be called for parallel Append with run-time |
840 | | * pruning enabled. |
841 | | */ |
842 | | static void |
843 | | mark_invalid_subplans_as_finished(AppendState *node) |
844 | 0 | { |
845 | 0 | int i; |
846 | | |
847 | | /* Only valid to call this while in parallel Append mode */ |
848 | 0 | Assert(node->as_pstate); |
849 | | |
850 | | /* Shouldn't have been called when run-time pruning is not enabled */ |
851 | 0 | Assert(node->as_prune_state); |
852 | | |
853 | | /* Nothing to do if all plans are valid */ |
854 | 0 | if (bms_num_members(node->as_valid_subplans) == node->as_nplans) |
855 | 0 | return; |
856 | | |
857 | | /* Mark all non-valid plans as finished */ |
858 | 0 | for (i = 0; i < node->as_nplans; i++) |
859 | 0 | { |
860 | 0 | if (!bms_is_member(i, node->as_valid_subplans)) |
861 | 0 | node->as_pstate->pa_finished[i] = true; |
862 | 0 | } |
863 | 0 | } |
864 | | |
865 | | /* ---------------------------------------------------------------- |
866 | | * Asynchronous Append Support |
867 | | * ---------------------------------------------------------------- |
868 | | */ |
869 | | |
870 | | /* ---------------------------------------------------------------- |
871 | | * ExecAppendAsyncBegin |
872 | | * |
873 | | * Begin executing designed async-capable subplans. |
874 | | * ---------------------------------------------------------------- |
875 | | */ |
876 | | static void |
877 | | ExecAppendAsyncBegin(AppendState *node) |
878 | 0 | { |
879 | 0 | int i; |
880 | | |
881 | | /* Backward scan is not supported by async-aware Appends. */ |
882 | 0 | Assert(ScanDirectionIsForward(node->ps.state->es_direction)); |
883 | | |
884 | | /* We should never be called when there are no subplans */ |
885 | 0 | Assert(node->as_nplans > 0); |
886 | | |
887 | | /* We should never be called when there are no async subplans. */ |
888 | 0 | Assert(node->as_nasyncplans > 0); |
889 | | |
890 | | /* If we've yet to determine the valid subplans then do so now. */ |
891 | 0 | if (!node->as_valid_subplans_identified) |
892 | 0 | { |
893 | 0 | node->as_valid_subplans = |
894 | 0 | ExecFindMatchingSubPlans(node->as_prune_state, false, NULL); |
895 | 0 | node->as_valid_subplans_identified = true; |
896 | |
|
897 | 0 | classify_matching_subplans(node); |
898 | 0 | } |
899 | | |
900 | | /* Initialize state variables. */ |
901 | 0 | node->as_syncdone = bms_is_empty(node->as_valid_subplans); |
902 | 0 | node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans); |
903 | | |
904 | | /* Nothing to do if there are no valid async subplans. */ |
905 | 0 | if (node->as_nasyncremain == 0) |
906 | 0 | return; |
907 | | |
908 | | /* Make a request for each of the valid async subplans. */ |
909 | 0 | i = -1; |
910 | 0 | while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0) |
911 | 0 | { |
912 | 0 | AsyncRequest *areq = node->as_asyncrequests[i]; |
913 | |
|
914 | 0 | Assert(areq->request_index == i); |
915 | 0 | Assert(!areq->callback_pending); |
916 | | |
917 | | /* Do the actual work. */ |
918 | 0 | ExecAsyncRequest(areq); |
919 | 0 | } |
920 | 0 | } |
921 | | |
922 | | /* ---------------------------------------------------------------- |
923 | | * ExecAppendAsyncGetNext |
924 | | * |
925 | | * Get the next tuple from any of the asynchronous subplans. |
926 | | * ---------------------------------------------------------------- |
927 | | */ |
928 | | static bool |
929 | | ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) |
930 | 0 | { |
931 | 0 | *result = NULL; |
932 | | |
933 | | /* We should never be called when there are no valid async subplans. */ |
934 | 0 | Assert(node->as_nasyncremain > 0); |
935 | | |
936 | | /* Request a tuple asynchronously. */ |
937 | 0 | if (ExecAppendAsyncRequest(node, result)) |
938 | 0 | return true; |
939 | | |
940 | 0 | while (node->as_nasyncremain > 0) |
941 | 0 | { |
942 | 0 | CHECK_FOR_INTERRUPTS(); |
943 | | |
944 | | /* Wait or poll for async events. */ |
945 | 0 | ExecAppendAsyncEventWait(node); |
946 | | |
947 | | /* Request a tuple asynchronously. */ |
948 | 0 | if (ExecAppendAsyncRequest(node, result)) |
949 | 0 | return true; |
950 | | |
951 | | /* Break from loop if there's any sync subplan that isn't complete. */ |
952 | 0 | if (!node->as_syncdone) |
953 | 0 | break; |
954 | 0 | } |
955 | | |
956 | | /* |
957 | | * If all sync subplans are complete, we're totally done scanning the |
958 | | * given node. Otherwise, we're done with the asynchronous stuff but must |
959 | | * continue scanning the sync subplans. |
960 | | */ |
961 | 0 | if (node->as_syncdone) |
962 | 0 | { |
963 | 0 | Assert(node->as_nasyncremain == 0); |
964 | 0 | *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); |
965 | 0 | return true; |
966 | 0 | } |
967 | | |
968 | 0 | return false; |
969 | 0 | } |
970 | | |
971 | | /* ---------------------------------------------------------------- |
972 | | * ExecAppendAsyncRequest |
973 | | * |
974 | | * Request a tuple asynchronously. |
975 | | * ---------------------------------------------------------------- |
976 | | */ |
977 | | static bool |
978 | | ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) |
979 | 0 | { |
980 | 0 | Bitmapset *needrequest; |
981 | 0 | int i; |
982 | | |
983 | | /* Nothing to do if there are no async subplans needing a new request. */ |
984 | 0 | if (bms_is_empty(node->as_needrequest)) |
985 | 0 | { |
986 | 0 | Assert(node->as_nasyncresults == 0); |
987 | 0 | return false; |
988 | 0 | } |
989 | | |
990 | | /* |
991 | | * If there are any asynchronously-generated results that have not yet |
992 | | * been returned, we have nothing to do; just return one of them. |
993 | | */ |
994 | 0 | if (node->as_nasyncresults > 0) |
995 | 0 | { |
996 | 0 | --node->as_nasyncresults; |
997 | 0 | *result = node->as_asyncresults[node->as_nasyncresults]; |
998 | 0 | return true; |
999 | 0 | } |
1000 | | |
1001 | | /* Make a new request for each of the async subplans that need it. */ |
1002 | 0 | needrequest = node->as_needrequest; |
1003 | 0 | node->as_needrequest = NULL; |
1004 | 0 | i = -1; |
1005 | 0 | while ((i = bms_next_member(needrequest, i)) >= 0) |
1006 | 0 | { |
1007 | 0 | AsyncRequest *areq = node->as_asyncrequests[i]; |
1008 | | |
1009 | | /* Do the actual work. */ |
1010 | 0 | ExecAsyncRequest(areq); |
1011 | 0 | } |
1012 | 0 | bms_free(needrequest); |
1013 | | |
1014 | | /* Return one of the asynchronously-generated results if any. */ |
1015 | 0 | if (node->as_nasyncresults > 0) |
1016 | 0 | { |
1017 | 0 | --node->as_nasyncresults; |
1018 | 0 | *result = node->as_asyncresults[node->as_nasyncresults]; |
1019 | 0 | return true; |
1020 | 0 | } |
1021 | | |
1022 | 0 | return false; |
1023 | 0 | } |
1024 | | |
1025 | | /* ---------------------------------------------------------------- |
1026 | | * ExecAppendAsyncEventWait |
1027 | | * |
1028 | | * Wait or poll for file descriptor events and fire callbacks. |
1029 | | * ---------------------------------------------------------------- |
1030 | | */ |
1031 | | static void |
1032 | | ExecAppendAsyncEventWait(AppendState *node) |
1033 | 0 | { |
1034 | 0 | int nevents = node->as_nasyncplans + 2; |
1035 | 0 | long timeout = node->as_syncdone ? -1 : 0; |
1036 | 0 | WaitEvent occurred_event[EVENT_BUFFER_SIZE]; |
1037 | 0 | int noccurred; |
1038 | 0 | int i; |
1039 | | |
1040 | | /* We should never be called when there are no valid async subplans. */ |
1041 | 0 | Assert(node->as_nasyncremain > 0); |
1042 | |
|
1043 | 0 | Assert(node->as_eventset == NULL); |
1044 | 0 | node->as_eventset = CreateWaitEventSet(CurrentResourceOwner, nevents); |
1045 | 0 | AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, |
1046 | 0 | NULL, NULL); |
1047 | | |
1048 | | /* Give each waiting subplan a chance to add an event. */ |
1049 | 0 | i = -1; |
1050 | 0 | while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) |
1051 | 0 | { |
1052 | 0 | AsyncRequest *areq = node->as_asyncrequests[i]; |
1053 | |
|
1054 | 0 | if (areq->callback_pending) |
1055 | 0 | ExecAsyncConfigureWait(areq); |
1056 | 0 | } |
1057 | | |
1058 | | /* |
1059 | | * No need for further processing if none of the subplans configured any |
1060 | | * events. |
1061 | | */ |
1062 | 0 | if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) |
1063 | 0 | { |
1064 | 0 | FreeWaitEventSet(node->as_eventset); |
1065 | 0 | node->as_eventset = NULL; |
1066 | 0 | return; |
1067 | 0 | } |
1068 | | |
1069 | | /* |
1070 | | * Add the process latch to the set, so that we wake up to process the |
1071 | | * standard interrupts with CHECK_FOR_INTERRUPTS(). |
1072 | | * |
1073 | | * NOTE: For historical reasons, it's important that this is added to the |
1074 | | * WaitEventSet after the ExecAsyncConfigureWait() calls. Namely, |
1075 | | * postgres_fdw calls "GetNumRegisteredWaitEvents(set) == 1" to check if |
1076 | | * any other events are in the set. That's a poor design, it's |
1077 | | * questionable for postgres_fdw to be doing that in the first place, but |
1078 | | * we cannot change it now. The pattern has possibly been copied to other |
1079 | | * extensions too. |
1080 | | */ |
1081 | 0 | AddWaitEventToSet(node->as_eventset, WL_LATCH_SET, PGINVALID_SOCKET, |
1082 | 0 | MyLatch, NULL); |
1083 | | |
1084 | | /* Return at most EVENT_BUFFER_SIZE events in one call. */ |
1085 | 0 | if (nevents > EVENT_BUFFER_SIZE) |
1086 | 0 | nevents = EVENT_BUFFER_SIZE; |
1087 | | |
1088 | | /* |
1089 | | * If the timeout is -1, wait until at least one event occurs. If the |
1090 | | * timeout is 0, poll for events, but do not wait at all. |
1091 | | */ |
1092 | 0 | noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, |
1093 | 0 | nevents, WAIT_EVENT_APPEND_READY); |
1094 | 0 | FreeWaitEventSet(node->as_eventset); |
1095 | 0 | node->as_eventset = NULL; |
1096 | 0 | if (noccurred == 0) |
1097 | 0 | return; |
1098 | | |
1099 | | /* Deliver notifications. */ |
1100 | 0 | for (i = 0; i < noccurred; i++) |
1101 | 0 | { |
1102 | 0 | WaitEvent *w = &occurred_event[i]; |
1103 | | |
1104 | | /* |
1105 | | * Each waiting subplan should have registered its wait event with |
1106 | | * user_data pointing back to its AsyncRequest. |
1107 | | */ |
1108 | 0 | if ((w->events & WL_SOCKET_READABLE) != 0) |
1109 | 0 | { |
1110 | 0 | AsyncRequest *areq = (AsyncRequest *) w->user_data; |
1111 | |
|
1112 | 0 | if (areq->callback_pending) |
1113 | 0 | { |
1114 | | /* |
1115 | | * Mark it as no longer needing a callback. We must do this |
1116 | | * before dispatching the callback in case the callback resets |
1117 | | * the flag. |
1118 | | */ |
1119 | 0 | areq->callback_pending = false; |
1120 | | |
1121 | | /* Do the actual work. */ |
1122 | 0 | ExecAsyncNotify(areq); |
1123 | 0 | } |
1124 | 0 | } |
1125 | | |
1126 | | /* Handle standard interrupts */ |
1127 | 0 | if ((w->events & WL_LATCH_SET) != 0) |
1128 | 0 | { |
1129 | 0 | ResetLatch(MyLatch); |
1130 | 0 | CHECK_FOR_INTERRUPTS(); |
1131 | 0 | } |
1132 | 0 | } |
1133 | 0 | } |
1134 | | |
1135 | | /* ---------------------------------------------------------------- |
1136 | | * ExecAsyncAppendResponse |
1137 | | * |
1138 | | * Receive a response from an asynchronous request we made. |
1139 | | * ---------------------------------------------------------------- |
1140 | | */ |
1141 | | void |
1142 | | ExecAsyncAppendResponse(AsyncRequest *areq) |
1143 | 0 | { |
1144 | 0 | AppendState *node = (AppendState *) areq->requestor; |
1145 | 0 | TupleTableSlot *slot = areq->result; |
1146 | | |
1147 | | /* The result should be a TupleTableSlot or NULL. */ |
1148 | 0 | Assert(slot == NULL || IsA(slot, TupleTableSlot)); |
1149 | | |
1150 | | /* Nothing to do if the request is pending. */ |
1151 | 0 | if (!areq->request_complete) |
1152 | 0 | { |
1153 | | /* The request would have been pending for a callback. */ |
1154 | 0 | Assert(areq->callback_pending); |
1155 | 0 | return; |
1156 | 0 | } |
1157 | | |
1158 | | /* If the result is NULL or an empty slot, there's nothing more to do. */ |
1159 | 0 | if (TupIsNull(slot)) |
1160 | 0 | { |
1161 | | /* The ending subplan wouldn't have been pending for a callback. */ |
1162 | 0 | Assert(!areq->callback_pending); |
1163 | 0 | --node->as_nasyncremain; |
1164 | 0 | return; |
1165 | 0 | } |
1166 | | |
1167 | | /* Save result so we can return it. */ |
1168 | 0 | Assert(node->as_nasyncresults < node->as_nasyncplans); |
1169 | 0 | node->as_asyncresults[node->as_nasyncresults++] = slot; |
1170 | | |
1171 | | /* |
1172 | | * Mark the subplan that returned a result as ready for a new request. We |
1173 | | * don't launch another one here immediately because it might complete. |
1174 | | */ |
1175 | 0 | node->as_needrequest = bms_add_member(node->as_needrequest, |
1176 | 0 | areq->request_index); |
1177 | 0 | } |
1178 | | |
1179 | | /* ---------------------------------------------------------------- |
1180 | | * classify_matching_subplans |
1181 | | * |
1182 | | * Classify the node's as_valid_subplans into sync ones and |
1183 | | * async ones, adjust it to contain sync ones only, and save |
1184 | | * async ones in the node's as_valid_asyncplans. |
1185 | | * ---------------------------------------------------------------- |
1186 | | */ |
1187 | | static void |
1188 | | classify_matching_subplans(AppendState *node) |
1189 | 0 | { |
1190 | 0 | Bitmapset *valid_asyncplans; |
1191 | |
|
1192 | 0 | Assert(node->as_valid_subplans_identified); |
1193 | 0 | Assert(node->as_valid_asyncplans == NULL); |
1194 | | |
1195 | | /* Nothing to do if there are no valid subplans. */ |
1196 | 0 | if (bms_is_empty(node->as_valid_subplans)) |
1197 | 0 | { |
1198 | 0 | node->as_syncdone = true; |
1199 | 0 | node->as_nasyncremain = 0; |
1200 | 0 | return; |
1201 | 0 | } |
1202 | | |
1203 | | /* Nothing to do if there are no valid async subplans. */ |
1204 | 0 | if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) |
1205 | 0 | { |
1206 | 0 | node->as_nasyncremain = 0; |
1207 | 0 | return; |
1208 | 0 | } |
1209 | | |
1210 | | /* Get valid async subplans. */ |
1211 | 0 | valid_asyncplans = bms_intersect(node->as_asyncplans, |
1212 | 0 | node->as_valid_subplans); |
1213 | | |
1214 | | /* Adjust the valid subplans to contain sync subplans only. */ |
1215 | 0 | node->as_valid_subplans = bms_del_members(node->as_valid_subplans, |
1216 | 0 | valid_asyncplans); |
1217 | | |
1218 | | /* Save valid async subplans. */ |
1219 | 0 | node->as_valid_asyncplans = valid_asyncplans; |
1220 | 0 | } |