/src/duckdb/src/parallel/pipeline_executor.cpp
Line | Count | Source |
1 | | #include "duckdb/parallel/pipeline_executor.hpp" |
2 | | |
3 | | #include "duckdb/common/limits.hpp" |
4 | | #include "duckdb/main/client_context.hpp" |
5 | | |
6 | | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
7 | | #include <chrono> |
8 | | #include <thread> |
9 | | #endif |
10 | | |
11 | | namespace duckdb { |
12 | | |
13 | | PipelineExecutor::PipelineExecutor(ClientContext &context_p, Pipeline &pipeline_p) |
14 | 404k | : pipeline(pipeline_p), thread(context_p), context(context_p, thread, &pipeline_p) { |
15 | 404k | D_ASSERT(pipeline.source_state); |
16 | 404k | if (pipeline.sink) { |
17 | 328k | local_sink_state = pipeline.sink->GetLocalSinkState(context); |
18 | 328k | required_partition_info = pipeline.sink->RequiredPartitionInfo(); |
19 | 328k | if (required_partition_info.AnyRequired()) { |
20 | 10.2k | D_ASSERT(pipeline.source->SupportsPartitioning(OperatorPartitionInfo::BatchIndex())); |
21 | 10.2k | auto &partition_info = local_sink_state->partition_info; |
22 | 10.2k | D_ASSERT(!partition_info.batch_index.IsValid()); |
23 | | // batch index is not set yet - initialize before fetching anything |
24 | 10.2k | partition_info.batch_index = pipeline.RegisterNewBatchIndex(); |
25 | 10.2k | partition_info.min_batch_index = partition_info.batch_index; |
26 | 10.2k | } |
27 | 328k | } |
28 | 404k | local_source_state = pipeline.source->GetLocalSourceState(context, *pipeline.source_state); |
29 | | |
30 | 404k | intermediate_chunks.reserve(pipeline.operators.size()); |
31 | 404k | intermediate_states.reserve(pipeline.operators.size()); |
32 | 887k | for (idx_t i = 0; i < pipeline.operators.size(); i++) { |
33 | 482k | auto &prev_operator = i == 0 ? *pipeline.source : pipeline.operators[i - 1].get(); |
34 | 482k | auto ¤t_operator = pipeline.operators[i].get(); |
35 | | |
36 | 482k | auto chunk = make_uniq<DataChunk>(); |
37 | 482k | chunk->Initialize(BufferAllocator::Get(context.client), prev_operator.GetTypes()); |
38 | 482k | intermediate_chunks.push_back(std::move(chunk)); |
39 | | |
40 | 482k | auto op_state = current_operator.GetOperatorState(context); |
41 | 482k | intermediate_states.push_back(std::move(op_state)); |
42 | | |
43 | 482k | if (current_operator.IsSink() && current_operator.sink_state->state == SinkFinalizeType::NO_OUTPUT_POSSIBLE) { |
44 | | // one of the operators has already figured out no output is possible |
45 | | // we can skip executing the pipeline |
46 | 22.1k | FinishProcessing(); |
47 | 22.1k | } |
48 | 482k | } |
49 | 404k | InitializeChunk(final_chunk); |
50 | 404k | } |
51 | | |
52 | 324k | bool PipelineExecutor::TryFlushCachingOperators(ExecutionBudget &chunk_budget) { |
53 | 324k | if (!started_flushing) { |
54 | | // Remainder of this method assumes any in process operators are from flushing |
55 | 324k | D_ASSERT(in_process_operators.empty()); |
56 | 324k | started_flushing = true; |
57 | 324k | flushing_idx = IsFinished() ? idx_t(finished_processing_idx) : 0; |
58 | 324k | } |
59 | | |
60 | | // For each operator that supports FinalExecute, |
61 | | // extract every chunk from it and push it through the rest of the pipeline |
62 | | // before moving onto the next operators' FinalExecute |
63 | 745k | while (flushing_idx < pipeline.operators.size()) { |
64 | 420k | if (!pipeline.operators[flushing_idx].get().RequiresFinalExecute()) { |
65 | 310k | flushing_idx++; |
66 | 310k | continue; |
67 | 310k | } |
68 | | |
69 | | // This slightly awkward way of increasing the flushing idx is to make the code re-entrant: We need to call this |
70 | | // method again in the case of a Sink returning BLOCKED. |
71 | 109k | if (!should_flush_current_idx && in_process_operators.empty()) { |
72 | 54.6k | should_flush_current_idx = true; |
73 | 54.6k | flushing_idx++; |
74 | 54.6k | continue; |
75 | 54.6k | } |
76 | | |
77 | 55.1k | auto &curr_chunk = |
78 | 55.1k | flushing_idx + 1 >= intermediate_chunks.size() ? final_chunk : *intermediate_chunks[flushing_idx + 1]; |
79 | 55.1k | auto ¤t_operator = pipeline.operators[flushing_idx].get(); |
80 | | |
81 | 55.1k | OperatorFinalizeResultType finalize_result; |
82 | | |
83 | 55.1k | if (in_process_operators.empty()) { |
84 | 54.6k | curr_chunk.Reset(); |
85 | 54.6k | StartOperator(current_operator); |
86 | 54.6k | finalize_result = current_operator.FinalExecute(context, curr_chunk, *current_operator.op_state, |
87 | 54.6k | *intermediate_states[flushing_idx]); |
88 | 54.6k | EndOperator(current_operator, &curr_chunk); |
89 | 54.6k | } else { |
90 | | // Reset flag and reflush the last chunk we were flushing. |
91 | 454 | finalize_result = OperatorFinalizeResultType::HAVE_MORE_OUTPUT; |
92 | 454 | } |
93 | | |
94 | 55.1k | auto push_result = ExecutePushInternal(curr_chunk, chunk_budget, flushing_idx + 1); |
95 | | |
96 | 55.1k | if (finalize_result == OperatorFinalizeResultType::HAVE_MORE_OUTPUT) { |
97 | 0 | should_flush_current_idx = true; |
98 | 55.1k | } else { |
99 | 55.1k | should_flush_current_idx = false; |
100 | 55.1k | } |
101 | | |
102 | 55.1k | switch (push_result) { |
103 | 0 | case OperatorResultType::BLOCKED: { |
104 | 0 | remaining_sink_chunk = true; |
105 | 0 | return false; |
106 | 0 | } |
107 | 0 | case OperatorResultType::HAVE_MORE_OUTPUT: { |
108 | 0 | D_ASSERT(chunk_budget.IsDepleted()); |
109 | | // The chunk budget was used up, pushing the chunk through the pipeline created more chunks |
110 | | // we need to continue this the next time Execute is called. |
111 | 0 | return false; |
112 | 0 | } |
113 | 54.7k | case OperatorResultType::NEED_MORE_INPUT: |
114 | 54.7k | continue; |
115 | 0 | case OperatorResultType::FINISHED: |
116 | 0 | break; |
117 | 0 | default: |
118 | 0 | throw InternalException("Unexpected OperatorResultType (%s) in TryFlushCachingOperators", |
119 | 0 | EnumUtil::ToString(push_result)); |
120 | 55.1k | } |
121 | 0 | break; |
122 | 55.1k | } |
123 | 324k | return true; |
124 | 324k | } |
125 | | |
126 | 11.0k | SinkNextBatchType PipelineExecutor::NextBatch(DataChunk &source_chunk, const bool have_more_output) { |
127 | 11.0k | D_ASSERT(required_partition_info.AnyRequired()); |
128 | 11.0k | auto max_batch_index = pipeline.base_batch_index + PipelineBuildState::BATCH_INCREMENT - 1; |
129 | | // by default set it to the maximum valid batch index value for the current pipeline |
130 | 11.0k | auto &partition_info = local_sink_state->partition_info; |
131 | 11.0k | OperatorPartitionData next_data(max_batch_index); |
132 | 11.0k | if ((source_chunk.size() > 0)) { |
133 | 799 | D_ASSERT(local_source_state); |
134 | 799 | D_ASSERT(pipeline.source_state); |
135 | | // if we retrieved data - initialize the next batch index |
136 | 799 | auto partition_data = pipeline.source->GetPartitionData(context, source_chunk, *pipeline.source_state, |
137 | 799 | *local_source_state, required_partition_info); |
138 | 799 | auto batch_index = partition_data.batch_index; |
139 | | // we start with the base_batch_index as a valid starting value. Make sure that next batch is called below |
140 | 799 | next_data = std::move(partition_data); |
141 | 799 | next_data.batch_index = pipeline.base_batch_index + batch_index + 1; |
142 | 799 | if (next_data.batch_index >= max_batch_index) { |
143 | 0 | throw InternalException("Pipeline batch index - invalid batch index %llu returned by source operator", |
144 | 0 | batch_index); |
145 | 0 | } |
146 | 10.2k | } else if (have_more_output) { |
147 | 0 | next_data.batch_index = partition_info.batch_index.GetIndex(); |
148 | 0 | } |
149 | 11.0k | if (next_data.batch_index == partition_info.batch_index.GetIndex()) { |
150 | | // no changes, return |
151 | 0 | return SinkNextBatchType::READY; |
152 | 0 | } |
153 | | // batch index has changed - update it |
154 | 11.0k | if (partition_info.batch_index.GetIndex() > next_data.batch_index) { |
155 | 0 | throw InternalException( |
156 | 0 | "Pipeline batch index - gotten lower batch index %llu (down from previous batch index of %llu)", |
157 | 0 | next_data.batch_index, partition_info.batch_index.GetIndex()); |
158 | 0 | } |
159 | | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
160 | | if (debug_blocked_next_batch_count < debug_blocked_target_count) { |
161 | | debug_blocked_next_batch_count++; |
162 | | |
163 | | auto &callback_state = interrupt_state; |
164 | | std::thread rewake_thread([callback_state] { |
165 | | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
166 | | callback_state.Callback(); |
167 | | }); |
168 | | rewake_thread.detach(); |
169 | | |
170 | | return SinkNextBatchType::BLOCKED; |
171 | | } |
172 | | #endif |
173 | 11.0k | auto current_batch = partition_info.batch_index.GetIndex(); |
174 | 11.0k | partition_info.batch_index = next_data.batch_index; |
175 | 11.0k | partition_info.partition_data = std::move(next_data.partition_data); |
176 | 11.0k | OperatorSinkNextBatchInput next_batch_input {*pipeline.sink->sink_state, *local_sink_state, interrupt_state}; |
177 | | // call NextBatch before updating min_batch_index to provide the opportunity to flush the previous batch |
178 | 11.0k | auto next_batch_result = pipeline.sink->NextBatch(context, next_batch_input); |
179 | | |
180 | 11.0k | if (next_batch_result == SinkNextBatchType::BLOCKED) { |
181 | 0 | partition_info.batch_index = current_batch; // set batch_index back to what it was before |
182 | 0 | return SinkNextBatchType::BLOCKED; |
183 | 0 | } |
184 | | |
185 | 11.0k | partition_info.min_batch_index = pipeline.UpdateBatchIndex(current_batch, next_data.batch_index); |
186 | | |
187 | 11.0k | return SinkNextBatchType::READY; |
188 | 11.0k | } |
189 | | |
190 | 328k | PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) { |
191 | 328k | D_ASSERT(pipeline.sink); |
192 | 328k | auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0]; |
193 | 328k | ExecutionBudget chunk_budget(max_chunks); |
194 | 739k | do { |
195 | 739k | if (context.client.interrupted) { |
196 | 137 | throw InterruptException(); |
197 | 137 | } |
198 | | |
199 | 738k | OperatorResultType result; |
200 | 738k | if (exhausted_source && done_flushing && !remaining_sink_chunk && !next_batch_blocked && |
201 | 0 | in_process_operators.empty()) { |
202 | 0 | break; |
203 | 738k | } else if (remaining_sink_chunk) { |
204 | | // The pipeline was interrupted by the Sink. We should retry sinking the final chunk. |
205 | 0 | result = ExecutePushInternal(final_chunk, chunk_budget); |
206 | 0 | D_ASSERT(result != OperatorResultType::HAVE_MORE_OUTPUT); |
207 | 0 | remaining_sink_chunk = false; |
208 | 738k | } else if (!in_process_operators.empty() && !started_flushing) { |
209 | | // Operator(s) in the pipeline have returned `HAVE_MORE_OUTPUT` in the last Execute call |
210 | | // the operators have to be called with the same input chunk to produce the rest of the output |
211 | 0 | D_ASSERT(source_chunk.size() > 0); |
212 | 0 | result = ExecutePushInternal(source_chunk, chunk_budget); |
213 | 738k | } else if (exhausted_source && !next_batch_blocked && !done_flushing) { |
214 | | // The source was exhausted, try flushing all operators |
215 | 324k | auto flush_completed = TryFlushCachingOperators(chunk_budget); |
216 | 325k | if (flush_completed) { |
217 | 325k | done_flushing = true; |
218 | 325k | break; |
219 | 18.4E | } else { |
220 | 18.4E | if (remaining_sink_chunk) { |
221 | 0 | return PipelineExecuteResult::INTERRUPTED; |
222 | 18.4E | } else { |
223 | 18.4E | D_ASSERT(chunk_budget.IsDepleted()); |
224 | 18.4E | return PipelineExecuteResult::NOT_FINISHED; |
225 | 18.4E | } |
226 | 18.4E | } |
227 | 414k | } else if (!exhausted_source || next_batch_blocked) { |
228 | 413k | SourceResultType source_result = SourceResultType::BLOCKED; |
229 | 413k | if (!next_batch_blocked) { |
230 | | // "Regular" path: fetch a chunk from the source and push it through the pipeline |
231 | 413k | source_chunk.Reset(); |
232 | 413k | source_result = FetchFromSource(source_chunk); |
233 | 413k | if (source_result == SourceResultType::BLOCKED) { |
234 | 0 | return PipelineExecuteResult::INTERRUPTED; |
235 | 0 | } |
236 | 413k | if (source_result == SourceResultType::FINISHED) { |
237 | 327k | exhausted_source = true; |
238 | 327k | } |
239 | 413k | } |
240 | | |
241 | 413k | if (required_partition_info.AnyRequired()) { |
242 | 11.0k | auto next_batch_result = NextBatch(source_chunk, source_result == SourceResultType::HAVE_MORE_OUTPUT); |
243 | 11.0k | next_batch_blocked = next_batch_result == SinkNextBatchType::BLOCKED; |
244 | 11.0k | if (next_batch_blocked) { |
245 | 0 | return PipelineExecuteResult::INTERRUPTED; |
246 | 0 | } |
247 | 11.0k | } |
248 | | |
249 | 413k | if (exhausted_source && source_chunk.size() == 0) { |
250 | 270k | continue; |
251 | 270k | } |
252 | | |
253 | 143k | result = ExecutePushInternal(source_chunk, chunk_budget); |
254 | 143k | } else { |
255 | 589 | throw InternalException("Unexpected state reached in pipeline executor"); |
256 | 589 | } |
257 | | |
258 | | // SINK INTERRUPT |
259 | 143k | if (result == OperatorResultType::BLOCKED) { |
260 | 0 | remaining_sink_chunk = true; |
261 | 0 | return PipelineExecuteResult::INTERRUPTED; |
262 | 0 | } |
263 | | |
264 | 143k | if (result == OperatorResultType::FINISHED) { |
265 | 13 | break; |
266 | 13 | } |
267 | 413k | } while (chunk_budget.Next()); |
268 | | |
269 | 328k | if ((!exhausted_source || !done_flushing) && !IsFinished()) { |
270 | 0 | return PipelineExecuteResult::NOT_FINISHED; |
271 | 0 | } |
272 | | |
273 | 328k | return PushFinalize(); |
274 | 328k | } |
275 | | |
276 | 0 | bool PipelineExecutor::RemainingSinkChunk() const { |
277 | 0 | return remaining_sink_chunk; |
278 | 0 | } |
279 | | |
280 | 216k | PipelineExecuteResult PipelineExecutor::Execute() { |
281 | 216k | return Execute(NumericLimits<idx_t>::Maximum()); |
282 | 216k | } |
283 | | |
284 | 22.1k | void PipelineExecutor::FinishProcessing(int32_t operator_idx) { |
285 | 22.1k | finished_processing_idx = operator_idx < 0 ? NumericLimits<int32_t>::Maximum() : operator_idx; |
286 | 22.1k | in_process_operators = stack<idx_t>(); |
287 | | |
288 | 22.1k | if (pipeline.GetSource()) { |
289 | 22.1k | auto guard = pipeline.source_state->Lock(); |
290 | 22.1k | pipeline.source_state->PreventBlocking(guard); |
291 | 22.1k | pipeline.source_state->UnblockTasks(guard); |
292 | 22.1k | } |
293 | 22.1k | if (pipeline.GetSink()) { |
294 | 22.1k | auto guard = pipeline.GetSink()->sink_state->Lock(); |
295 | 22.1k | pipeline.GetSink()->sink_state->PreventBlocking(guard); |
296 | 22.1k | pipeline.GetSink()->sink_state->UnblockTasks(guard); |
297 | 22.1k | } |
298 | 22.1k | } |
299 | | |
300 | 324k | bool PipelineExecutor::IsFinished() { |
301 | 324k | return finished_processing_idx >= 0; |
302 | 324k | } |
303 | | |
304 | | OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, ExecutionBudget &chunk_budget, |
305 | 197k | idx_t initial_idx) { |
306 | 197k | D_ASSERT(pipeline.sink); |
307 | 197k | if (input.size() == 0) { // LCOV_EXCL_START |
308 | 26.9k | return OperatorResultType::NEED_MORE_INPUT; |
309 | 26.9k | } // LCOV_EXCL_STOP |
310 | | |
311 | | // this loop will continuously push the input chunk through the pipeline as long as: |
312 | | // - the OperatorResultType for the Execute is HAVE_MORE_OUTPUT |
313 | | // - the Sink doesn't block |
314 | | // - the ExecutionBudget has not been depleted |
315 | 170k | OperatorResultType result = OperatorResultType::HAVE_MORE_OUTPUT; |
316 | 172k | do { |
317 | | // Note: if input is the final_chunk, we don't do any executing, the chunk just needs to be sinked |
318 | 172k | if (&input != &final_chunk) { |
319 | 151k | final_chunk.Reset(); |
320 | | // Execute and put the result into 'final_chunk' |
321 | 151k | result = Execute(input, final_chunk, initial_idx); |
322 | 151k | if (result == OperatorResultType::FINISHED) { |
323 | 13 | return OperatorResultType::FINISHED; |
324 | 13 | } |
325 | 151k | } else { |
326 | 20.6k | result = OperatorResultType::NEED_MORE_INPUT; |
327 | 20.6k | } |
328 | 172k | auto &sink_chunk = final_chunk; |
329 | 172k | if (sink_chunk.size() > 0) { |
330 | 105k | StartOperator(*pipeline.sink); |
331 | 105k | D_ASSERT(pipeline.sink); |
332 | 105k | D_ASSERT(pipeline.sink->sink_state); |
333 | 105k | OperatorSinkInput sink_input {*pipeline.sink->sink_state, *local_sink_state, interrupt_state}; |
334 | | |
335 | 105k | auto sink_result = Sink(sink_chunk, sink_input); |
336 | | |
337 | 105k | EndOperator(*pipeline.sink, nullptr); |
338 | | |
339 | 105k | if (sink_result == SinkResultType::BLOCKED) { |
340 | 0 | return OperatorResultType::BLOCKED; |
341 | 105k | } else if (sink_result == SinkResultType::FINISHED) { |
342 | 0 | FinishProcessing(); |
343 | 0 | return OperatorResultType::FINISHED; |
344 | 0 | } |
345 | 105k | } |
346 | 172k | if (result == OperatorResultType::NEED_MORE_INPUT) { |
347 | 168k | return OperatorResultType::NEED_MORE_INPUT; |
348 | 168k | } |
349 | 172k | } while (chunk_budget.Next()); |
350 | 2.40k | return result; |
351 | 170k | } |
352 | | |
353 | 324k | PipelineExecuteResult PipelineExecutor::PushFinalize() { |
354 | 324k | if (finalized) { |
355 | 0 | throw InternalException("Calling PushFinalize on a pipeline that has been finalized already"); |
356 | 0 | } |
357 | | |
358 | 324k | D_ASSERT(local_sink_state); |
359 | | |
360 | | // Run the combine for the sink |
361 | 324k | OperatorSinkCombineInput combine_input {*pipeline.sink->sink_state, *local_sink_state, interrupt_state}; |
362 | | |
363 | | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
364 | | if (debug_blocked_combine_count < debug_blocked_target_count) { |
365 | | debug_blocked_combine_count++; |
366 | | |
367 | | auto &callback_state = combine_input.interrupt_state; |
368 | | std::thread rewake_thread([callback_state] { |
369 | | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
370 | | callback_state.Callback(); |
371 | | }); |
372 | | rewake_thread.detach(); |
373 | | |
374 | | return PipelineExecuteResult::INTERRUPTED; |
375 | | } |
376 | | #endif |
377 | 324k | auto result = pipeline.sink->Combine(context, combine_input); |
378 | | |
379 | 324k | if (result == SinkCombineResultType::BLOCKED) { |
380 | 0 | return PipelineExecuteResult::INTERRUPTED; |
381 | 0 | } |
382 | | |
383 | 324k | finalized = true; |
384 | | // flush all query profiler info |
385 | 802k | for (idx_t i = 0; i < intermediate_states.size(); i++) { |
386 | 477k | intermediate_states[i]->Finalize(pipeline.operators[i].get(), context); |
387 | 477k | } |
388 | 324k | pipeline.executor.Flush(thread); |
389 | 324k | local_sink_state.reset(); |
390 | | |
391 | 324k | return PipelineExecuteResult::FINISHED; |
392 | 324k | } |
393 | | |
394 | 243k | void PipelineExecutor::GoToSource(idx_t ¤t_idx, idx_t initial_idx) { |
395 | | // we go back to the first operator (the source) |
396 | 243k | current_idx = initial_idx; |
397 | 243k | if (!in_process_operators.empty()) { |
398 | | // ... UNLESS there is an in process operator |
399 | | // if there is an in-process operator, we start executing at the latest one |
400 | | // for example, if we have a join operator that has tuples left, we first need to emit those tuples |
401 | 29.2k | current_idx = in_process_operators.top(); |
402 | 29.2k | in_process_operators.pop(); |
403 | 29.2k | } |
404 | 243k | D_ASSERT(current_idx >= initial_idx); |
405 | 243k | } |
406 | | |
407 | 151k | OperatorResultType PipelineExecutor::Execute(DataChunk &input, DataChunk &result, idx_t initial_idx) { |
408 | 151k | if (input.size() == 0) { // LCOV_EXCL_START |
409 | 0 | return OperatorResultType::NEED_MORE_INPUT; |
410 | 0 | } // LCOV_EXCL_STOP |
411 | 151k | D_ASSERT(!pipeline.operators.empty()); |
412 | | |
413 | 151k | idx_t current_idx; |
414 | 151k | GoToSource(current_idx, initial_idx); |
415 | 151k | if (current_idx == initial_idx) { |
416 | 150k | current_idx++; |
417 | 150k | } |
418 | 151k | if (current_idx > pipeline.operators.size()) { |
419 | 0 | result.Reference(input); |
420 | 0 | return OperatorResultType::NEED_MORE_INPUT; |
421 | 0 | } |
422 | 318k | while (true) { |
423 | 315k | if (context.client.interrupted) { |
424 | 53 | throw InterruptException(); |
425 | 53 | } |
426 | | // now figure out where to put the chunk |
427 | | // if current_idx is the last possible index (>= operators.size()) we write to the result |
428 | | // otherwise we write to an intermediate chunk |
429 | 315k | auto current_intermediate = current_idx; |
430 | 315k | auto ¤t_chunk = |
431 | 315k | current_intermediate >= intermediate_chunks.size() ? result : *intermediate_chunks[current_intermediate]; |
432 | 315k | current_chunk.Reset(); |
433 | 315k | if (current_idx == initial_idx) { |
434 | | // we went back to the source: we need more input |
435 | 63.8k | return OperatorResultType::NEED_MORE_INPUT; |
436 | 252k | } else { |
437 | 252k | auto &prev_chunk = |
438 | 252k | current_intermediate == initial_idx + 1 ? input : *intermediate_chunks[current_intermediate - 1]; |
439 | 252k | auto operator_idx = current_idx - 1; |
440 | 252k | auto ¤t_operator = pipeline.operators[operator_idx].get(); |
441 | | |
442 | | // if current_idx > source_idx, we pass the previous operators' output through the Execute of the current |
443 | | // operator |
444 | 252k | StartOperator(current_operator); |
445 | 252k | auto result = current_operator.Execute(context, prev_chunk, current_chunk, *current_operator.op_state, |
446 | 252k | *intermediate_states[current_intermediate - 1]); |
447 | 252k | EndOperator(current_operator, ¤t_chunk); |
448 | 252k | if (result == OperatorResultType::HAVE_MORE_OUTPUT) { |
449 | | // more data remains in this operator |
450 | | // push in-process marker |
451 | 29.2k | in_process_operators.push(current_idx); |
452 | 222k | } else if (result == OperatorResultType::FINISHED) { |
453 | 13 | D_ASSERT(current_chunk.size() == 0); |
454 | 13 | FinishProcessing(NumericCast<int32_t>(current_idx)); |
455 | 13 | return OperatorResultType::FINISHED; |
456 | 13 | } |
457 | 252k | current_chunk.Verify(); |
458 | 252k | } |
459 | | |
460 | 252k | if (current_chunk.size() == 0) { |
461 | | // no output from this operator! |
462 | 91.6k | if (current_idx == initial_idx) { |
463 | | // if we got no output from the scan, we are done |
464 | 0 | break; |
465 | 91.6k | } else { |
466 | | // if we got no output from an intermediate op |
467 | | // we go back and try to pull data from the source again |
468 | 91.6k | GoToSource(current_idx, initial_idx); |
469 | 91.6k | continue; |
470 | 91.6k | } |
471 | 160k | } else { |
472 | | // we got output! continue to the next operator |
473 | 160k | current_idx++; |
474 | 160k | if (current_idx > pipeline.operators.size()) { |
475 | | // if we got output and are at the last operator, we are finished executing for this output chunk |
476 | | // return the data and push it into the chunk |
477 | 84.8k | break; |
478 | 84.8k | } |
479 | 160k | } |
480 | 252k | } |
481 | 87.5k | return in_process_operators.empty() ? OperatorResultType::NEED_MORE_INPUT : OperatorResultType::HAVE_MORE_OUTPUT; |
482 | 151k | } |
483 | | |
484 | 329k | void PipelineExecutor::SetTaskForInterrupts(weak_ptr<Task> current_task) { |
485 | 329k | interrupt_state = InterruptState(std::move(current_task)); |
486 | 329k | } |
487 | | |
488 | 413k | SourceResultType PipelineExecutor::GetData(DataChunk &chunk, OperatorSourceInput &input) { |
489 | | //! Testing feature to enable async source on every operator |
490 | | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
491 | | if (debug_blocked_source_count < debug_blocked_target_count) { |
492 | | debug_blocked_source_count++; |
493 | | |
494 | | auto &callback_state = input.interrupt_state; |
495 | | std::thread rewake_thread([callback_state] { |
496 | | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
497 | | callback_state.Callback(); |
498 | | }); |
499 | | rewake_thread.detach(); |
500 | | |
501 | | return SourceResultType::BLOCKED; |
502 | | } |
503 | | #endif |
504 | | |
505 | 413k | return pipeline.source->GetData(context, chunk, input); |
506 | 413k | } |
507 | | |
508 | 105k | SinkResultType PipelineExecutor::Sink(DataChunk &chunk, OperatorSinkInput &input) { |
509 | | //! Testing feature to enable async sink on every operator |
510 | | #ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE |
511 | | if (debug_blocked_sink_count < debug_blocked_target_count) { |
512 | | debug_blocked_sink_count++; |
513 | | |
514 | | auto &callback_state = input.interrupt_state; |
515 | | std::thread rewake_thread([callback_state] { |
516 | | std::this_thread::sleep_for(std::chrono::milliseconds(1)); |
517 | | callback_state.Callback(); |
518 | | }); |
519 | | rewake_thread.detach(); |
520 | | |
521 | | return SinkResultType::BLOCKED; |
522 | | } |
523 | | #endif |
524 | 105k | return pipeline.sink->Sink(context, chunk, input); |
525 | 105k | } |
526 | | |
527 | 413k | SourceResultType PipelineExecutor::FetchFromSource(DataChunk &result) { |
528 | 413k | StartOperator(*pipeline.source); |
529 | | |
530 | 413k | OperatorSourceInput source_input = {*pipeline.source_state, *local_source_state, interrupt_state}; |
531 | 413k | auto res = GetData(result, source_input); |
532 | | |
533 | | // Ensures sources only return empty results when Blocking or Finished |
534 | 413k | D_ASSERT(res != SourceResultType::BLOCKED || result.size() == 0); |
535 | 413k | if (res == SourceResultType::FINISHED) { |
536 | | // final call into the source - finish source execution |
537 | 327k | context.thread.profiler.FinishSource(*pipeline.source_state, *local_source_state); |
538 | 327k | } |
539 | 413k | EndOperator(*pipeline.source, &result); |
540 | | |
541 | 413k | return res; |
542 | 413k | } |
543 | | |
544 | 404k | void PipelineExecutor::InitializeChunk(DataChunk &chunk) { |
545 | 404k | auto &last_op = pipeline.operators.empty() ? *pipeline.source : pipeline.operators.back().get(); |
546 | 404k | chunk.Initialize(BufferAllocator::Get(context.client), last_op.GetTypes()); |
547 | 404k | } |
548 | | |
549 | 824k | void PipelineExecutor::StartOperator(PhysicalOperator &op) { |
550 | 824k | if (context.client.interrupted) { |
551 | 4 | throw InterruptException(); |
552 | 4 | } |
553 | 824k | context.thread.profiler.StartOperator(&op); |
554 | 824k | } |
555 | | |
556 | 822k | void PipelineExecutor::EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk) { |
557 | 822k | context.thread.profiler.EndOperator(chunk); |
558 | | |
559 | 822k | if (chunk) { |
560 | 717k | chunk->Verify(); |
561 | 717k | } |
562 | 822k | } |
563 | | |
564 | | } // namespace duckdb |