Coverage Report

Created: 2025-06-13 06:06

/src/postgres/src/backend/executor/execAsync.c
Line
Count
Source (jump to first uncovered line)
1
/*-------------------------------------------------------------------------
2
 *
3
 * execAsync.c
4
 *    Support routines for asynchronous execution
5
 *
6
 * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7
 * Portions Copyright (c) 1994, Regents of the University of California
8
 *
9
 * IDENTIFICATION
10
 *    src/backend/executor/execAsync.c
11
 *
12
 *-------------------------------------------------------------------------
13
 */
14
15
#include "postgres.h"
16
17
#include "executor/execAsync.h"
18
#include "executor/executor.h"
19
#include "executor/nodeAppend.h"
20
#include "executor/nodeForeignscan.h"
21
22
/*
23
 * Asynchronously request a tuple from a designed async-capable node.
24
 */
25
void
26
ExecAsyncRequest(AsyncRequest *areq)
27
0
{
28
0
  if (areq->requestee->chgParam != NULL) /* something changed? */
29
0
    ExecReScan(areq->requestee); /* let ReScan handle this */
30
31
  /* must provide our own instrumentation support */
32
0
  if (areq->requestee->instrument)
33
0
    InstrStartNode(areq->requestee->instrument);
34
35
0
  switch (nodeTag(areq->requestee))
36
0
  {
37
0
    case T_ForeignScanState:
38
0
      ExecAsyncForeignScanRequest(areq);
39
0
      break;
40
0
    default:
41
      /* If the node doesn't support async, caller messed up. */
42
0
      elog(ERROR, "unrecognized node type: %d",
43
0
         (int) nodeTag(areq->requestee));
44
0
  }
45
46
0
  ExecAsyncResponse(areq);
47
48
  /* must provide our own instrumentation support */
49
0
  if (areq->requestee->instrument)
50
0
    InstrStopNode(areq->requestee->instrument,
51
0
            TupIsNull(areq->result) ? 0.0 : 1.0);
52
0
}
53
54
/*
55
 * Give the asynchronous node a chance to configure the file descriptor event
56
 * for which it wishes to wait.  We expect the node-type specific callback to
57
 * make a single call of the following form:
58
 *
59
 * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
60
 */
61
void
62
ExecAsyncConfigureWait(AsyncRequest *areq)
63
0
{
64
  /* must provide our own instrumentation support */
65
0
  if (areq->requestee->instrument)
66
0
    InstrStartNode(areq->requestee->instrument);
67
68
0
  switch (nodeTag(areq->requestee))
69
0
  {
70
0
    case T_ForeignScanState:
71
0
      ExecAsyncForeignScanConfigureWait(areq);
72
0
      break;
73
0
    default:
74
      /* If the node doesn't support async, caller messed up. */
75
0
      elog(ERROR, "unrecognized node type: %d",
76
0
         (int) nodeTag(areq->requestee));
77
0
  }
78
79
  /* must provide our own instrumentation support */
80
0
  if (areq->requestee->instrument)
81
0
    InstrStopNode(areq->requestee->instrument, 0.0);
82
0
}
83
84
/*
85
 * Call the asynchronous node back when a relevant event has occurred.
86
 */
87
void
88
ExecAsyncNotify(AsyncRequest *areq)
89
0
{
90
  /* must provide our own instrumentation support */
91
0
  if (areq->requestee->instrument)
92
0
    InstrStartNode(areq->requestee->instrument);
93
94
0
  switch (nodeTag(areq->requestee))
95
0
  {
96
0
    case T_ForeignScanState:
97
0
      ExecAsyncForeignScanNotify(areq);
98
0
      break;
99
0
    default:
100
      /* If the node doesn't support async, caller messed up. */
101
0
      elog(ERROR, "unrecognized node type: %d",
102
0
         (int) nodeTag(areq->requestee));
103
0
  }
104
105
0
  ExecAsyncResponse(areq);
106
107
  /* must provide our own instrumentation support */
108
0
  if (areq->requestee->instrument)
109
0
    InstrStopNode(areq->requestee->instrument,
110
0
            TupIsNull(areq->result) ? 0.0 : 1.0);
111
0
}
112
113
/*
114
 * Call the requestor back when an asynchronous node has produced a result.
115
 */
116
void
117
ExecAsyncResponse(AsyncRequest *areq)
118
0
{
119
0
  switch (nodeTag(areq->requestor))
120
0
  {
121
0
    case T_AppendState:
122
0
      ExecAsyncAppendResponse(areq);
123
0
      break;
124
0
    default:
125
      /* If the node doesn't support async, caller messed up. */
126
0
      elog(ERROR, "unrecognized node type: %d",
127
0
         (int) nodeTag(areq->requestor));
128
0
  }
129
0
}
130
131
/*
132
 * A requestee node should call this function to deliver the tuple to its
133
 * requestor node.  The requestee node can call this from its ExecAsyncRequest
134
 * or ExecAsyncNotify callback.
135
 */
136
void
137
ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
138
0
{
139
0
  areq->request_complete = true;
140
0
  areq->result = result;
141
0
}
142
143
/*
144
 * A requestee node should call this function to indicate that it is pending
145
 * for a callback.  The requestee node can call this from its ExecAsyncRequest
146
 * or ExecAsyncNotify callback.
147
 */
148
void
149
ExecAsyncRequestPending(AsyncRequest *areq)
150
0
{
151
0
  areq->callback_pending = true;
152
0
  areq->request_complete = false;
153
0
  areq->result = NULL;
154
0
}