/src/postgres/src/backend/executor/execAsync.c
Line | Count | Source (jump to first uncovered line) |
1 | | /*------------------------------------------------------------------------- |
2 | | * |
3 | | * execAsync.c |
4 | | * Support routines for asynchronous execution |
5 | | * |
6 | | * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group |
7 | | * Portions Copyright (c) 1994, Regents of the University of California |
8 | | * |
9 | | * IDENTIFICATION |
10 | | * src/backend/executor/execAsync.c |
11 | | * |
12 | | *------------------------------------------------------------------------- |
13 | | */ |
14 | | |
15 | | #include "postgres.h" |
16 | | |
17 | | #include "executor/execAsync.h" |
18 | | #include "executor/executor.h" |
19 | | #include "executor/nodeAppend.h" |
20 | | #include "executor/nodeForeignscan.h" |
21 | | |
22 | | /* |
23 | | * Asynchronously request a tuple from a designed async-capable node. |
24 | | */ |
25 | | void |
26 | | ExecAsyncRequest(AsyncRequest *areq) |
27 | 0 | { |
28 | 0 | if (areq->requestee->chgParam != NULL) /* something changed? */ |
29 | 0 | ExecReScan(areq->requestee); /* let ReScan handle this */ |
30 | | |
31 | | /* must provide our own instrumentation support */ |
32 | 0 | if (areq->requestee->instrument) |
33 | 0 | InstrStartNode(areq->requestee->instrument); |
34 | |
|
35 | 0 | switch (nodeTag(areq->requestee)) |
36 | 0 | { |
37 | 0 | case T_ForeignScanState: |
38 | 0 | ExecAsyncForeignScanRequest(areq); |
39 | 0 | break; |
40 | 0 | default: |
41 | | /* If the node doesn't support async, caller messed up. */ |
42 | 0 | elog(ERROR, "unrecognized node type: %d", |
43 | 0 | (int) nodeTag(areq->requestee)); |
44 | 0 | } |
45 | | |
46 | 0 | ExecAsyncResponse(areq); |
47 | | |
48 | | /* must provide our own instrumentation support */ |
49 | 0 | if (areq->requestee->instrument) |
50 | 0 | InstrStopNode(areq->requestee->instrument, |
51 | 0 | TupIsNull(areq->result) ? 0.0 : 1.0); |
52 | 0 | } |
53 | | |
54 | | /* |
55 | | * Give the asynchronous node a chance to configure the file descriptor event |
56 | | * for which it wishes to wait. We expect the node-type specific callback to |
57 | | * make a single call of the following form: |
58 | | * |
59 | | * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq); |
60 | | */ |
61 | | void |
62 | | ExecAsyncConfigureWait(AsyncRequest *areq) |
63 | 0 | { |
64 | | /* must provide our own instrumentation support */ |
65 | 0 | if (areq->requestee->instrument) |
66 | 0 | InstrStartNode(areq->requestee->instrument); |
67 | |
|
68 | 0 | switch (nodeTag(areq->requestee)) |
69 | 0 | { |
70 | 0 | case T_ForeignScanState: |
71 | 0 | ExecAsyncForeignScanConfigureWait(areq); |
72 | 0 | break; |
73 | 0 | default: |
74 | | /* If the node doesn't support async, caller messed up. */ |
75 | 0 | elog(ERROR, "unrecognized node type: %d", |
76 | 0 | (int) nodeTag(areq->requestee)); |
77 | 0 | } |
78 | | |
79 | | /* must provide our own instrumentation support */ |
80 | 0 | if (areq->requestee->instrument) |
81 | 0 | InstrStopNode(areq->requestee->instrument, 0.0); |
82 | 0 | } |
83 | | |
84 | | /* |
85 | | * Call the asynchronous node back when a relevant event has occurred. |
86 | | */ |
87 | | void |
88 | | ExecAsyncNotify(AsyncRequest *areq) |
89 | 0 | { |
90 | | /* must provide our own instrumentation support */ |
91 | 0 | if (areq->requestee->instrument) |
92 | 0 | InstrStartNode(areq->requestee->instrument); |
93 | |
|
94 | 0 | switch (nodeTag(areq->requestee)) |
95 | 0 | { |
96 | 0 | case T_ForeignScanState: |
97 | 0 | ExecAsyncForeignScanNotify(areq); |
98 | 0 | break; |
99 | 0 | default: |
100 | | /* If the node doesn't support async, caller messed up. */ |
101 | 0 | elog(ERROR, "unrecognized node type: %d", |
102 | 0 | (int) nodeTag(areq->requestee)); |
103 | 0 | } |
104 | | |
105 | 0 | ExecAsyncResponse(areq); |
106 | | |
107 | | /* must provide our own instrumentation support */ |
108 | 0 | if (areq->requestee->instrument) |
109 | 0 | InstrStopNode(areq->requestee->instrument, |
110 | 0 | TupIsNull(areq->result) ? 0.0 : 1.0); |
111 | 0 | } |
112 | | |
113 | | /* |
114 | | * Call the requestor back when an asynchronous node has produced a result. |
115 | | */ |
116 | | void |
117 | | ExecAsyncResponse(AsyncRequest *areq) |
118 | 0 | { |
119 | 0 | switch (nodeTag(areq->requestor)) |
120 | 0 | { |
121 | 0 | case T_AppendState: |
122 | 0 | ExecAsyncAppendResponse(areq); |
123 | 0 | break; |
124 | 0 | default: |
125 | | /* If the node doesn't support async, caller messed up. */ |
126 | 0 | elog(ERROR, "unrecognized node type: %d", |
127 | 0 | (int) nodeTag(areq->requestor)); |
128 | 0 | } |
129 | 0 | } |
130 | | |
131 | | /* |
132 | | * A requestee node should call this function to deliver the tuple to its |
133 | | * requestor node. The requestee node can call this from its ExecAsyncRequest |
134 | | * or ExecAsyncNotify callback. |
135 | | */ |
136 | | void |
137 | | ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result) |
138 | 0 | { |
139 | 0 | areq->request_complete = true; |
140 | 0 | areq->result = result; |
141 | 0 | } |
142 | | |
143 | | /* |
144 | | * A requestee node should call this function to indicate that it is pending |
145 | | * for a callback. The requestee node can call this from its ExecAsyncRequest |
146 | | * or ExecAsyncNotify callback. |
147 | | */ |
148 | | void |
149 | | ExecAsyncRequestPending(AsyncRequest *areq) |
150 | 0 | { |
151 | 0 | areq->callback_pending = true; |
152 | 0 | areq->request_complete = false; |
153 | 0 | areq->result = NULL; |
154 | 0 | } |