/src/duckdb/src/main/stream_query_result.cpp
Line | Count | Source |
1 | | #include "duckdb/main/stream_query_result.hpp" |
2 | | |
3 | | #include "duckdb/main/client_context.hpp" |
4 | | #include "duckdb/main/materialized_query_result.hpp" |
5 | | #include "duckdb/common/box_renderer.hpp" |
6 | | #include "duckdb/main/database.hpp" |
7 | | |
8 | | namespace duckdb { |
9 | | |
10 | | StreamQueryResult::StreamQueryResult(StatementType statement_type, StatementProperties properties, |
11 | | vector<LogicalType> types, vector<string> names, |
12 | | ClientProperties client_properties, shared_ptr<BufferedData> data) |
13 | 0 | : QueryResult(QueryResultType::STREAM_RESULT, statement_type, std::move(properties), std::move(types), |
14 | 0 | std::move(names), std::move(client_properties)), |
15 | 0 | buffered_data(std::move(data)) { |
16 | 0 | context = buffered_data->GetContext(); |
17 | 0 | } |
18 | | |
19 | 0 | StreamQueryResult::StreamQueryResult(ErrorData error) : QueryResult(QueryResultType::STREAM_RESULT, std::move(error)) { |
20 | 0 | } |
21 | | |
22 | 0 | StreamQueryResult::~StreamQueryResult() { |
23 | 0 | } |
24 | | |
25 | 0 | string StreamQueryResult::ToString() { |
26 | 0 | string result; |
27 | 0 | if (success) { |
28 | 0 | result = HeaderToString(); |
29 | 0 | result += "[[STREAM RESULT]]"; |
30 | 0 | } else { |
31 | 0 | result = GetError() + "\n"; |
32 | 0 | } |
33 | 0 | return result; |
34 | 0 | } |
35 | | |
36 | 0 | unique_ptr<ClientContextLock> StreamQueryResult::LockContext() { |
37 | 0 | if (!context) { |
38 | 0 | string error_str = "Attempting to execute an unsuccessful or closed pending query result"; |
39 | 0 | if (HasError()) { |
40 | 0 | error_str += StringUtil::Format("\nError: %s", GetError()); |
41 | 0 | } |
42 | 0 | throw InvalidInputException(error_str); |
43 | 0 | } |
44 | 0 | return context->LockContext(); |
45 | 0 | } |
46 | | |
47 | 0 | StreamExecutionResult StreamQueryResult::ExecuteTaskInternal(ClientContextLock &lock) { |
48 | 0 | return buffered_data->ExecuteTaskInternal(*this, lock); |
49 | 0 | } |
50 | | |
51 | 0 | StreamExecutionResult StreamQueryResult::ExecuteTask() { |
52 | 0 | auto lock = LockContext(); |
53 | 0 | return ExecuteTaskInternal(*lock); |
54 | 0 | } |
55 | | |
56 | 0 | void StreamQueryResult::WaitForTask() { |
57 | 0 | auto lock = LockContext(); |
58 | 0 | buffered_data->UnblockSinks(); |
59 | 0 | context->WaitForTask(*lock, *this); |
60 | 0 | } |
61 | | |
62 | 0 | static bool ExecutionErrorOccurred(StreamExecutionResult result) { |
63 | 0 | if (result == StreamExecutionResult::EXECUTION_CANCELLED) { |
64 | 0 | return true; |
65 | 0 | } |
66 | 0 | if (result == StreamExecutionResult::EXECUTION_ERROR) { |
67 | 0 | return true; |
68 | 0 | } |
69 | 0 | return false; |
70 | 0 | } |
71 | | |
72 | 0 | unique_ptr<DataChunk> StreamQueryResult::FetchNextInternal(ClientContextLock &lock) { |
73 | 0 | bool invalidate_query = true; |
74 | 0 | unique_ptr<DataChunk> chunk; |
75 | 0 | try { |
76 | | // fetch the chunk and return it |
77 | 0 | auto stream_execution_result = buffered_data->ReplenishBuffer(*this, lock); |
78 | 0 | if (ExecutionErrorOccurred(stream_execution_result)) { |
79 | 0 | return chunk; |
80 | 0 | } |
81 | 0 | chunk = buffered_data->Scan(); |
82 | 0 | if (!chunk || chunk->ColumnCount() == 0 || chunk->size() == 0) { |
83 | 0 | context->CleanupInternal(lock, this); |
84 | 0 | chunk = nullptr; |
85 | 0 | } |
86 | 0 | return chunk; |
87 | 0 | } catch (std::exception &ex) { |
88 | 0 | ErrorData error(ex); |
89 | 0 | if (!Exception::InvalidatesTransaction(error.Type())) { |
90 | | // standard exceptions do not invalidate the current transaction |
91 | 0 | invalidate_query = false; |
92 | 0 | } else if (Exception::InvalidatesDatabase(error.Type())) { |
93 | | // fatal exceptions invalidate the entire database |
94 | 0 | auto &db_instance = DatabaseInstance::GetDatabase(*context); |
95 | 0 | ValidChecker::Invalidate(db_instance, error.RawMessage()); |
96 | 0 | } |
97 | 0 | context->ProcessError(error, context->GetCurrentQuery()); |
98 | 0 | SetError(std::move(error)); |
99 | 0 | } catch (...) { // LCOV_EXCL_START |
100 | 0 | SetError(ErrorData("Unhandled exception in FetchInternal")); |
101 | 0 | } // LCOV_EXCL_STOP |
102 | 0 | context->CleanupInternal(lock, this, invalidate_query); |
103 | 0 | return nullptr; |
104 | 0 | } |
105 | | |
106 | 0 | unique_ptr<DataChunk> StreamQueryResult::FetchInternal() { |
107 | 0 | unique_ptr<DataChunk> chunk; |
108 | 0 | { |
109 | 0 | auto lock = LockContext(); |
110 | 0 | CheckExecutableInternal(*lock); |
111 | 0 | chunk = FetchNextInternal(*lock); |
112 | 0 | } |
113 | 0 | if (!chunk || chunk->ColumnCount() == 0 || chunk->size() == 0) { |
114 | 0 | Close(); |
115 | 0 | return nullptr; |
116 | 0 | } |
117 | 0 | return chunk; |
118 | 0 | } |
119 | | |
120 | | #ifdef DUCKDB_ALTERNATIVE_VERIFY |
121 | | static unique_ptr<DataChunk> AlternativeFetch(StreamQueryResult &stream_result) { |
122 | | // We first use StreamQueryResult::ExecuteTask until IsChunkReady becomes true |
123 | | // then call Fetch |
124 | | StreamExecutionResult execution_result; |
125 | | while (!StreamQueryResult::IsChunkReady(execution_result = stream_result.ExecuteTask())) { |
126 | | if (execution_result == StreamExecutionResult::BLOCKED) { |
127 | | stream_result.WaitForTask(); |
128 | | } |
129 | | } |
130 | | if (execution_result == StreamExecutionResult::EXECUTION_CANCELLED) { |
131 | | throw InvalidInputException("The execution of the query was cancelled before it could finish, likely " |
132 | | "caused by executing a different query"); |
133 | | } |
134 | | if (execution_result == StreamExecutionResult::EXECUTION_ERROR) { |
135 | | stream_result.ThrowError(); |
136 | | } |
137 | | return stream_result.Fetch(); |
138 | | } |
139 | | #endif |
140 | | |
141 | 0 | unique_ptr<MaterializedQueryResult> StreamQueryResult::Materialize() { |
142 | 0 | if (HasError() || !context) { |
143 | 0 | return make_uniq<MaterializedQueryResult>(GetErrorObject()); |
144 | 0 | } |
145 | 0 | auto collection = make_uniq<ColumnDataCollection>(Allocator::DefaultAllocator(), types); |
146 | |
|
147 | 0 | ColumnDataAppendState append_state; |
148 | 0 | collection->InitializeAppend(append_state); |
149 | 0 | while (true) { |
150 | | #ifdef DUCKDB_ALTERNATIVE_VERIFY |
151 | | auto chunk = AlternativeFetch(*this); |
152 | | #else |
153 | 0 | auto chunk = Fetch(); |
154 | 0 | #endif |
155 | 0 | if (!chunk || chunk->size() == 0) { |
156 | 0 | break; |
157 | 0 | } |
158 | 0 | collection->Append(append_state, *chunk); |
159 | 0 | } |
160 | 0 | auto result = |
161 | 0 | make_uniq<MaterializedQueryResult>(statement_type, properties, names, std::move(collection), client_properties); |
162 | 0 | if (HasError()) { |
163 | 0 | return make_uniq<MaterializedQueryResult>(GetErrorObject()); |
164 | 0 | } |
165 | 0 | return result; |
166 | 0 | } |
167 | | |
168 | 0 | bool StreamQueryResult::IsOpenInternal(ClientContextLock &lock) { |
169 | 0 | bool invalidated = !success || !context; |
170 | 0 | if (!invalidated) { |
171 | 0 | invalidated = !context->IsActiveResult(lock, *this); |
172 | 0 | } |
173 | 0 | return !invalidated; |
174 | 0 | } |
175 | | |
176 | 0 | void StreamQueryResult::CheckExecutableInternal(ClientContextLock &lock) { |
177 | 0 | if (!IsOpenInternal(lock)) { |
178 | 0 | string error_str = "Attempting to execute an unsuccessful or closed pending query result"; |
179 | 0 | if (HasError()) { |
180 | 0 | error_str += StringUtil::Format("\nError: %s", GetError()); |
181 | 0 | } |
182 | 0 | throw InvalidInputException(error_str); |
183 | 0 | } |
184 | 0 | } |
185 | | |
186 | 0 | bool StreamQueryResult::IsOpen() { |
187 | 0 | if (!success || !context) { |
188 | 0 | return false; |
189 | 0 | } |
190 | 0 | auto lock = LockContext(); |
191 | 0 | return IsOpenInternal(*lock); |
192 | 0 | } |
193 | | |
194 | 0 | void StreamQueryResult::Close() { |
195 | 0 | buffered_data->Close(); |
196 | 0 | if (context) { |
197 | 0 | auto lock = LockContext(); |
198 | 0 | if (context->IsActiveResult(*lock, *this)) { |
199 | | // Abandoned before the stream was fully drained: release the active-query state now |
200 | | // (matching InitialCleanup) instead of leaking it until the next query or context teardown. |
201 | 0 | context->CleanupInternal(*lock, this, false); |
202 | 0 | } |
203 | 0 | } |
204 | 0 | context.reset(); |
205 | 0 | } |
206 | | |
207 | 0 | bool StreamQueryResult::IsChunkReady(StreamExecutionResult result) { |
208 | 0 | if (result == StreamExecutionResult::CHUNK_READY) { |
209 | | // A chunk is ready to be fetched with Fetch() |
210 | 0 | return true; |
211 | 0 | } |
212 | 0 | if (result == StreamExecutionResult::EXECUTION_CANCELLED) { |
213 | | // Another query execution was started that cancelled this one |
214 | 0 | return true; |
215 | 0 | } |
216 | 0 | if (result == StreamExecutionResult::EXECUTION_ERROR) { |
217 | | // An error was encountered while executing the final pipeline |
218 | 0 | return true; |
219 | 0 | } |
220 | 0 | if (result == StreamExecutionResult::EXECUTION_FINISHED) { |
221 | | // The final pipeline completed successfully |
222 | 0 | return true; |
223 | 0 | } |
224 | 0 | return false; |
225 | 0 | } |
226 | | |
227 | | } // namespace duckdb |