/src/postgres/src/backend/executor/nodeForeignscan.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * nodeForeignscan.c |
4 | | * Routines to support scans of foreign tables |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * |
10 | | * IDENTIFICATION |
11 | | * src/backend/executor/nodeForeignscan.c |
12 | | * |
13 | | *------------------------------------------------------------------------- |
14 | | */ |
15 | | /* |
16 | | * INTERFACE ROUTINES |
17 | | * |
18 | | * ExecForeignScan scans a foreign table. |
19 | | * ExecInitForeignScan creates and initializes state info. |
20 | | * ExecReScanForeignScan rescans the foreign relation. |
21 | | * ExecEndForeignScan releases any resources allocated. |
22 | | */ |
23 | | #include "postgres.h" |
24 | | |
25 | | #include "executor/executor.h" |
26 | | #include "executor/nodeForeignscan.h" |
27 | | #include "foreign/fdwapi.h" |
28 | | #include "utils/rel.h" |
29 | | |
30 | | static TupleTableSlot *ForeignNext(ForeignScanState *node); |
31 | | static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot); |
32 | | |
33 | | |
34 | | /* ---------------------------------------------------------------- |
35 | | * ForeignNext |
36 | | * |
37 | | * This is a workhorse for ExecForeignScan |
38 | | * ---------------------------------------------------------------- |
39 | | */ |
40 | | static TupleTableSlot * |
41 | | ForeignNext(ForeignScanState *node) |
42 | 0 | { |
43 | 0 | TupleTableSlot *slot; |
44 | 0 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
45 | 0 | ExprContext *econtext = node->ss.ps.ps_ExprContext; |
46 | 0 | MemoryContext oldcontext; |
47 | | |
48 | | /* Call the Iterate function in short-lived context */ |
49 | 0 | oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
50 | 0 | if (plan->operation != CMD_SELECT) |
51 | 0 | { |
52 | | /* |
53 | | * direct modifications cannot be re-evaluated, so shouldn't get here |
54 | | * during EvalPlanQual processing |
55 | | */ |
56 | 0 | Assert(node->ss.ps.state->es_epq_active == NULL); |
57 | |
|
58 | 0 | slot = node->fdwroutine->IterateDirectModify(node); |
59 | 0 | } |
60 | 0 | else |
61 | 0 | slot = node->fdwroutine->IterateForeignScan(node); |
62 | 0 | MemoryContextSwitchTo(oldcontext); |
63 | | |
64 | | /* |
65 | | * Insert valid value into tableoid, the only actually-useful system |
66 | | * column. |
67 | | */ |
68 | 0 | if (plan->fsSystemCol && !TupIsNull(slot)) |
69 | 0 | slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation); |
70 | |
|
71 | 0 | return slot; |
72 | 0 | } |
73 | | |
74 | | /* |
75 | | * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual |
76 | | */ |
77 | | static bool |
78 | | ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot) |
79 | 0 | { |
80 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
81 | 0 | ExprContext *econtext; |
82 | | |
83 | | /* |
84 | | * extract necessary information from foreign scan node |
85 | | */ |
86 | 0 | econtext = node->ss.ps.ps_ExprContext; |
87 | | |
88 | | /* Does the tuple meet the remote qual condition? */ |
89 | 0 | econtext->ecxt_scantuple = slot; |
90 | |
|
91 | 0 | ResetExprContext(econtext); |
92 | | |
93 | | /* |
94 | | * If an outer join is pushed down, RecheckForeignScan may need to store a |
95 | | * different tuple in the slot, because a different set of columns may go |
96 | | * to NULL upon recheck. Otherwise, it shouldn't need to change the slot |
97 | | * contents, just return true or false to indicate whether the quals still |
98 | | * pass. For simple cases, setting fdw_recheck_quals may be easier than |
99 | | * providing this callback. |
100 | | */ |
101 | 0 | if (fdwroutine->RecheckForeignScan && |
102 | 0 | !fdwroutine->RecheckForeignScan(node, slot)) |
103 | 0 | return false; |
104 | | |
105 | 0 | return ExecQual(node->fdw_recheck_quals, econtext); |
106 | 0 | } |
107 | | |
108 | | /* ---------------------------------------------------------------- |
109 | | * ExecForeignScan(node) |
110 | | * |
111 | | * Fetches the next tuple from the FDW, checks local quals, and |
112 | | * returns it. |
113 | | * We call the ExecScan() routine and pass it the appropriate |
114 | | * access method functions. |
115 | | * ---------------------------------------------------------------- |
116 | | */ |
117 | | static TupleTableSlot * |
118 | | ExecForeignScan(PlanState *pstate) |
119 | 0 | { |
120 | 0 | ForeignScanState *node = castNode(ForeignScanState, pstate); |
121 | 0 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
122 | 0 | EState *estate = node->ss.ps.state; |
123 | | |
124 | | /* |
125 | | * Ignore direct modifications when EvalPlanQual is active --- they are |
126 | | * irrelevant for EvalPlanQual rechecking |
127 | | */ |
128 | 0 | if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT) |
129 | 0 | return NULL; |
130 | | |
131 | 0 | return ExecScan(&node->ss, |
132 | 0 | (ExecScanAccessMtd) ForeignNext, |
133 | 0 | (ExecScanRecheckMtd) ForeignRecheck); |
134 | 0 | } |
135 | | |
136 | | |
137 | | /* ---------------------------------------------------------------- |
138 | | * ExecInitForeignScan |
139 | | * ---------------------------------------------------------------- |
140 | | */ |
141 | | ForeignScanState * |
142 | | ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) |
143 | 0 | { |
144 | 0 | ForeignScanState *scanstate; |
145 | 0 | Relation currentRelation = NULL; |
146 | 0 | Index scanrelid = node->scan.scanrelid; |
147 | 0 | int tlistvarno; |
148 | 0 | FdwRoutine *fdwroutine; |
149 | | |
150 | | /* check for unsupported flags */ |
151 | 0 | Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
152 | | |
153 | | /* |
154 | | * create state structure |
155 | | */ |
156 | 0 | scanstate = makeNode(ForeignScanState); |
157 | 0 | scanstate->ss.ps.plan = (Plan *) node; |
158 | 0 | scanstate->ss.ps.state = estate; |
159 | 0 | scanstate->ss.ps.ExecProcNode = ExecForeignScan; |
160 | | |
161 | | /* |
162 | | * Miscellaneous initialization |
163 | | * |
164 | | * create expression context for node |
165 | | */ |
166 | 0 | ExecAssignExprContext(estate, &scanstate->ss.ps); |
167 | | |
168 | | /* |
169 | | * open the scan relation, if any; also acquire function pointers from the |
170 | | * FDW's handler |
171 | | */ |
172 | 0 | if (scanrelid > 0) |
173 | 0 | { |
174 | 0 | currentRelation = ExecOpenScanRelation(estate, scanrelid, eflags); |
175 | 0 | scanstate->ss.ss_currentRelation = currentRelation; |
176 | 0 | fdwroutine = GetFdwRoutineForRelation(currentRelation, true); |
177 | 0 | } |
178 | 0 | else |
179 | 0 | { |
180 | | /* We can't use the relcache, so get fdwroutine the hard way */ |
181 | 0 | fdwroutine = GetFdwRoutineByServerId(node->fs_server); |
182 | 0 | } |
183 | | |
184 | | /* |
185 | | * Determine the scan tuple type. If the FDW provided a targetlist |
186 | | * describing the scan tuples, use that; else use base relation's rowtype. |
187 | | */ |
188 | 0 | if (node->fdw_scan_tlist != NIL || currentRelation == NULL) |
189 | 0 | { |
190 | 0 | TupleDesc scan_tupdesc; |
191 | |
|
192 | 0 | scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist); |
193 | 0 | ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
194 | 0 | &TTSOpsHeapTuple); |
195 | | /* Node's targetlist will contain Vars with varno = INDEX_VAR */ |
196 | 0 | tlistvarno = INDEX_VAR; |
197 | 0 | } |
198 | 0 | else |
199 | 0 | { |
200 | 0 | TupleDesc scan_tupdesc; |
201 | | |
202 | | /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */ |
203 | 0 | scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation)); |
204 | 0 | ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
205 | 0 | &TTSOpsHeapTuple); |
206 | | /* Node's targetlist will contain Vars with varno = scanrelid */ |
207 | 0 | tlistvarno = scanrelid; |
208 | 0 | } |
209 | | |
210 | | /* Don't know what an FDW might return */ |
211 | 0 | scanstate->ss.ps.scanopsfixed = false; |
212 | 0 | scanstate->ss.ps.scanopsset = true; |
213 | | |
214 | | /* |
215 | | * Initialize result slot, type and projection. |
216 | | */ |
217 | 0 | ExecInitResultTypeTL(&scanstate->ss.ps); |
218 | 0 | ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno); |
219 | | |
220 | | /* |
221 | | * initialize child expressions |
222 | | */ |
223 | 0 | scanstate->ss.ps.qual = |
224 | 0 | ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate); |
225 | 0 | scanstate->fdw_recheck_quals = |
226 | 0 | ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); |
227 | | |
228 | | /* |
229 | | * Determine whether to scan the foreign relation asynchronously or not; |
230 | | * this has to be kept in sync with the code in ExecInitAppend(). |
231 | | */ |
232 | 0 | scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable && |
233 | 0 | estate->es_epq_active == NULL); |
234 | | |
235 | | /* |
236 | | * Initialize FDW-related state. |
237 | | */ |
238 | 0 | scanstate->fdwroutine = fdwroutine; |
239 | 0 | scanstate->fdw_state = NULL; |
240 | | |
241 | | /* |
242 | | * For the FDW's convenience, look up the modification target relation's |
243 | | * ResultRelInfo. The ModifyTable node should have initialized it for us, |
244 | | * see ExecInitModifyTable. |
245 | | * |
246 | | * Don't try to look up the ResultRelInfo when EvalPlanQual is active, |
247 | | * though. Direct modifications cannot be re-evaluated as part of |
248 | | * EvalPlanQual. The lookup wouldn't work anyway because during |
249 | | * EvalPlanQual processing, EvalPlanQual only initializes the subtree |
250 | | * under the ModifyTable, and doesn't run ExecInitModifyTable. |
251 | | */ |
252 | 0 | if (node->resultRelation > 0 && estate->es_epq_active == NULL) |
253 | 0 | { |
254 | 0 | if (estate->es_result_relations == NULL || |
255 | 0 | estate->es_result_relations[node->resultRelation - 1] == NULL) |
256 | 0 | { |
257 | 0 | elog(ERROR, "result relation not initialized"); |
258 | 0 | } |
259 | 0 | scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1]; |
260 | 0 | } |
261 | | |
262 | | /* Initialize any outer plan. */ |
263 | 0 | if (outerPlan(node)) |
264 | 0 | outerPlanState(scanstate) = |
265 | 0 | ExecInitNode(outerPlan(node), estate, eflags); |
266 | | |
267 | | /* |
268 | | * Tell the FDW to initialize the scan. |
269 | | */ |
270 | 0 | if (node->operation != CMD_SELECT) |
271 | 0 | { |
272 | | /* |
273 | | * Direct modifications cannot be re-evaluated by EvalPlanQual, so |
274 | | * don't bother preparing the FDW. |
275 | | * |
276 | | * In case of an inherited UPDATE/DELETE with foreign targets there |
277 | | * can be direct-modify ForeignScan nodes in the EvalPlanQual subtree, |
278 | | * so we need to ignore such ForeignScan nodes during EvalPlanQual |
279 | | * processing. See also ExecForeignScan/ExecReScanForeignScan. |
280 | | */ |
281 | 0 | if (estate->es_epq_active == NULL) |
282 | 0 | fdwroutine->BeginDirectModify(scanstate, eflags); |
283 | 0 | } |
284 | 0 | else |
285 | 0 | fdwroutine->BeginForeignScan(scanstate, eflags); |
286 | |
|
287 | 0 | return scanstate; |
288 | 0 | } |
289 | | |
290 | | /* ---------------------------------------------------------------- |
291 | | * ExecEndForeignScan |
292 | | * |
293 | | * frees any storage allocated through C routines. |
294 | | * ---------------------------------------------------------------- |
295 | | */ |
296 | | void |
297 | | ExecEndForeignScan(ForeignScanState *node) |
298 | 0 | { |
299 | 0 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
300 | 0 | EState *estate = node->ss.ps.state; |
301 | | |
302 | | /* Let the FDW shut down */ |
303 | 0 | if (plan->operation != CMD_SELECT) |
304 | 0 | { |
305 | 0 | if (estate->es_epq_active == NULL) |
306 | 0 | node->fdwroutine->EndDirectModify(node); |
307 | 0 | } |
308 | 0 | else |
309 | 0 | node->fdwroutine->EndForeignScan(node); |
310 | | |
311 | | /* Shut down any outer plan. */ |
312 | 0 | if (outerPlanState(node)) |
313 | 0 | ExecEndNode(outerPlanState(node)); |
314 | 0 | } |
315 | | |
316 | | /* ---------------------------------------------------------------- |
317 | | * ExecReScanForeignScan |
318 | | * |
319 | | * Rescans the relation. |
320 | | * ---------------------------------------------------------------- |
321 | | */ |
322 | | void |
323 | | ExecReScanForeignScan(ForeignScanState *node) |
324 | 0 | { |
325 | 0 | ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
326 | 0 | EState *estate = node->ss.ps.state; |
327 | 0 | PlanState *outerPlan = outerPlanState(node); |
328 | | |
329 | | /* |
330 | | * Ignore direct modifications when EvalPlanQual is active --- they are |
331 | | * irrelevant for EvalPlanQual rechecking |
332 | | */ |
333 | 0 | if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT) |
334 | 0 | return; |
335 | | |
336 | 0 | node->fdwroutine->ReScanForeignScan(node); |
337 | | |
338 | | /* |
339 | | * If chgParam of subnode is not null then plan will be re-scanned by |
340 | | * first ExecProcNode. outerPlan may also be NULL, in which case there is |
341 | | * nothing to rescan at all. |
342 | | */ |
343 | 0 | if (outerPlan != NULL && outerPlan->chgParam == NULL) |
344 | 0 | ExecReScan(outerPlan); |
345 | |
|
346 | 0 | ExecScanReScan(&node->ss); |
347 | 0 | } |
348 | | |
349 | | /* ---------------------------------------------------------------- |
350 | | * ExecForeignScanEstimate |
351 | | * |
352 | | * Informs size of the parallel coordination information, if any |
353 | | * ---------------------------------------------------------------- |
354 | | */ |
355 | | void |
356 | | ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) |
357 | 0 | { |
358 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
359 | |
|
360 | 0 | if (fdwroutine->EstimateDSMForeignScan) |
361 | 0 | { |
362 | 0 | node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); |
363 | 0 | shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); |
364 | 0 | shm_toc_estimate_keys(&pcxt->estimator, 1); |
365 | 0 | } |
366 | 0 | } |
367 | | |
368 | | /* ---------------------------------------------------------------- |
369 | | * ExecForeignScanInitializeDSM |
370 | | * |
371 | | * Initialize the parallel coordination information |
372 | | * ---------------------------------------------------------------- |
373 | | */ |
374 | | void |
375 | | ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
376 | 0 | { |
377 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
378 | |
|
379 | 0 | if (fdwroutine->InitializeDSMForeignScan) |
380 | 0 | { |
381 | 0 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
382 | 0 | void *coordinate; |
383 | |
|
384 | 0 | coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); |
385 | 0 | fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); |
386 | 0 | shm_toc_insert(pcxt->toc, plan_node_id, coordinate); |
387 | 0 | } |
388 | 0 | } |
389 | | |
390 | | /* ---------------------------------------------------------------- |
391 | | * ExecForeignScanReInitializeDSM |
392 | | * |
393 | | * Reset shared state before beginning a fresh scan. |
394 | | * ---------------------------------------------------------------- |
395 | | */ |
396 | | void |
397 | | ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
398 | 0 | { |
399 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
400 | |
|
401 | 0 | if (fdwroutine->ReInitializeDSMForeignScan) |
402 | 0 | { |
403 | 0 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
404 | 0 | void *coordinate; |
405 | |
|
406 | 0 | coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); |
407 | 0 | fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate); |
408 | 0 | } |
409 | 0 | } |
410 | | |
411 | | /* ---------------------------------------------------------------- |
412 | | * ExecForeignScanInitializeWorker |
413 | | * |
414 | | * Initialization according to the parallel coordination information |
415 | | * ---------------------------------------------------------------- |
416 | | */ |
417 | | void |
418 | | ExecForeignScanInitializeWorker(ForeignScanState *node, |
419 | | ParallelWorkerContext *pwcxt) |
420 | 0 | { |
421 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
422 | |
|
423 | 0 | if (fdwroutine->InitializeWorkerForeignScan) |
424 | 0 | { |
425 | 0 | int plan_node_id = node->ss.ps.plan->plan_node_id; |
426 | 0 | void *coordinate; |
427 | |
|
428 | 0 | coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); |
429 | 0 | fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate); |
430 | 0 | } |
431 | 0 | } |
432 | | |
433 | | /* ---------------------------------------------------------------- |
434 | | * ExecShutdownForeignScan |
435 | | * |
436 | | * Gives FDW chance to stop asynchronous resource consumption |
437 | | * and release any resources still held. |
438 | | * ---------------------------------------------------------------- |
439 | | */ |
440 | | void |
441 | | ExecShutdownForeignScan(ForeignScanState *node) |
442 | 0 | { |
443 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
444 | |
|
445 | 0 | if (fdwroutine->ShutdownForeignScan) |
446 | 0 | fdwroutine->ShutdownForeignScan(node); |
447 | 0 | } |
448 | | |
449 | | /* ---------------------------------------------------------------- |
450 | | * ExecAsyncForeignScanRequest |
451 | | * |
452 | | * Asynchronously request a tuple from a designed async-capable node |
453 | | * ---------------------------------------------------------------- |
454 | | */ |
455 | | void |
456 | | ExecAsyncForeignScanRequest(AsyncRequest *areq) |
457 | 0 | { |
458 | 0 | ForeignScanState *node = (ForeignScanState *) areq->requestee; |
459 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
460 | |
|
461 | 0 | Assert(fdwroutine->ForeignAsyncRequest != NULL); |
462 | 0 | fdwroutine->ForeignAsyncRequest(areq); |
463 | 0 | } |
464 | | |
465 | | /* ---------------------------------------------------------------- |
466 | | * ExecAsyncForeignScanConfigureWait |
467 | | * |
468 | | * In async mode, configure for a wait |
469 | | * ---------------------------------------------------------------- |
470 | | */ |
471 | | void |
472 | | ExecAsyncForeignScanConfigureWait(AsyncRequest *areq) |
473 | 0 | { |
474 | 0 | ForeignScanState *node = (ForeignScanState *) areq->requestee; |
475 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
476 | |
|
477 | 0 | Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); |
478 | 0 | fdwroutine->ForeignAsyncConfigureWait(areq); |
479 | 0 | } |
480 | | |
481 | | /* ---------------------------------------------------------------- |
482 | | * ExecAsyncForeignScanNotify |
483 | | * |
484 | | * Callback invoked when a relevant event has occurred |
485 | | * ---------------------------------------------------------------- |
486 | | */ |
487 | | void |
488 | | ExecAsyncForeignScanNotify(AsyncRequest *areq) |
489 | 0 | { |
490 | 0 | ForeignScanState *node = (ForeignScanState *) areq->requestee; |
491 | 0 | FdwRoutine *fdwroutine = node->fdwroutine; |
492 | |
|
493 | 0 | Assert(fdwroutine->ForeignAsyncNotify != NULL); |
494 | 0 | fdwroutine->ForeignAsyncNotify(areq); |
495 | 0 | } |