Coverage Report

Created: 2025-11-15 07:36

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/src/parallel/pipeline_executor.cpp
Line
Count
Source
1
#include "duckdb/parallel/pipeline_executor.hpp"
2
3
#include "duckdb/common/limits.hpp"
4
#include "duckdb/main/client_context.hpp"
5
6
#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
7
#include <chrono>
8
#include <thread>
9
#endif
10
11
namespace duckdb {
12
13
PipelineExecutor::PipelineExecutor(ClientContext &context_p, Pipeline &pipeline_p)
14
404k
    : pipeline(pipeline_p), thread(context_p), context(context_p, thread, &pipeline_p) {
15
404k
  D_ASSERT(pipeline.source_state);
16
404k
  if (pipeline.sink) {
17
328k
    local_sink_state = pipeline.sink->GetLocalSinkState(context);
18
328k
    required_partition_info = pipeline.sink->RequiredPartitionInfo();
19
328k
    if (required_partition_info.AnyRequired()) {
20
10.2k
      D_ASSERT(pipeline.source->SupportsPartitioning(OperatorPartitionInfo::BatchIndex()));
21
10.2k
      auto &partition_info = local_sink_state->partition_info;
22
10.2k
      D_ASSERT(!partition_info.batch_index.IsValid());
23
      // batch index is not set yet - initialize before fetching anything
24
10.2k
      partition_info.batch_index = pipeline.RegisterNewBatchIndex();
25
10.2k
      partition_info.min_batch_index = partition_info.batch_index;
26
10.2k
    }
27
328k
  }
28
404k
  local_source_state = pipeline.source->GetLocalSourceState(context, *pipeline.source_state);
29
30
404k
  intermediate_chunks.reserve(pipeline.operators.size());
31
404k
  intermediate_states.reserve(pipeline.operators.size());
32
887k
  for (idx_t i = 0; i < pipeline.operators.size(); i++) {
33
482k
    auto &prev_operator = i == 0 ? *pipeline.source : pipeline.operators[i - 1].get();
34
482k
    auto &current_operator = pipeline.operators[i].get();
35
36
482k
    auto chunk = make_uniq<DataChunk>();
37
482k
    chunk->Initialize(BufferAllocator::Get(context.client), prev_operator.GetTypes());
38
482k
    intermediate_chunks.push_back(std::move(chunk));
39
40
482k
    auto op_state = current_operator.GetOperatorState(context);
41
482k
    intermediate_states.push_back(std::move(op_state));
42
43
482k
    if (current_operator.IsSink() && current_operator.sink_state->state == SinkFinalizeType::NO_OUTPUT_POSSIBLE) {
44
      // one of the operators has already figured out no output is possible
45
      // we can skip executing the pipeline
46
22.1k
      FinishProcessing();
47
22.1k
    }
48
482k
  }
49
404k
  InitializeChunk(final_chunk);
50
404k
}
51
52
324k
bool PipelineExecutor::TryFlushCachingOperators(ExecutionBudget &chunk_budget) {
53
324k
  if (!started_flushing) {
54
    // Remainder of this method assumes any in process operators are from flushing
55
324k
    D_ASSERT(in_process_operators.empty());
56
324k
    started_flushing = true;
57
324k
    flushing_idx = IsFinished() ? idx_t(finished_processing_idx) : 0;
58
324k
  }
59
60
  // For each operator that supports FinalExecute,
61
  // extract every chunk from it and push it through the rest of the pipeline
62
  // before moving onto the next operators' FinalExecute
63
745k
  while (flushing_idx < pipeline.operators.size()) {
64
420k
    if (!pipeline.operators[flushing_idx].get().RequiresFinalExecute()) {
65
310k
      flushing_idx++;
66
310k
      continue;
67
310k
    }
68
69
    // This slightly awkward way of increasing the flushing idx is to make the code re-entrant: We need to call this
70
    // method again in the case of a Sink returning BLOCKED.
71
109k
    if (!should_flush_current_idx && in_process_operators.empty()) {
72
54.6k
      should_flush_current_idx = true;
73
54.6k
      flushing_idx++;
74
54.6k
      continue;
75
54.6k
    }
76
77
55.1k
    auto &curr_chunk =
78
55.1k
        flushing_idx + 1 >= intermediate_chunks.size() ? final_chunk : *intermediate_chunks[flushing_idx + 1];
79
55.1k
    auto &current_operator = pipeline.operators[flushing_idx].get();
80
81
55.1k
    OperatorFinalizeResultType finalize_result;
82
83
55.1k
    if (in_process_operators.empty()) {
84
54.6k
      curr_chunk.Reset();
85
54.6k
      StartOperator(current_operator);
86
54.6k
      finalize_result = current_operator.FinalExecute(context, curr_chunk, *current_operator.op_state,
87
54.6k
                                                      *intermediate_states[flushing_idx]);
88
54.6k
      EndOperator(current_operator, &curr_chunk);
89
54.6k
    } else {
90
      // Reset flag and reflush the last chunk we were flushing.
91
454
      finalize_result = OperatorFinalizeResultType::HAVE_MORE_OUTPUT;
92
454
    }
93
94
55.1k
    auto push_result = ExecutePushInternal(curr_chunk, chunk_budget, flushing_idx + 1);
95
96
55.1k
    if (finalize_result == OperatorFinalizeResultType::HAVE_MORE_OUTPUT) {
97
0
      should_flush_current_idx = true;
98
55.1k
    } else {
99
55.1k
      should_flush_current_idx = false;
100
55.1k
    }
101
102
55.1k
    switch (push_result) {
103
0
    case OperatorResultType::BLOCKED: {
104
0
      remaining_sink_chunk = true;
105
0
      return false;
106
0
    }
107
0
    case OperatorResultType::HAVE_MORE_OUTPUT: {
108
0
      D_ASSERT(chunk_budget.IsDepleted());
109
      // The chunk budget was used up, pushing the chunk through the pipeline created more chunks
110
      // we need to continue this the next time Execute is called.
111
0
      return false;
112
0
    }
113
54.7k
    case OperatorResultType::NEED_MORE_INPUT:
114
54.7k
      continue;
115
0
    case OperatorResultType::FINISHED:
116
0
      break;
117
0
    default:
118
0
      throw InternalException("Unexpected OperatorResultType (%s) in TryFlushCachingOperators",
119
0
                              EnumUtil::ToString(push_result));
120
55.1k
    }
121
0
    break;
122
55.1k
  }
123
324k
  return true;
124
324k
}
125
126
11.0k
SinkNextBatchType PipelineExecutor::NextBatch(DataChunk &source_chunk, const bool have_more_output) {
127
11.0k
  D_ASSERT(required_partition_info.AnyRequired());
128
11.0k
  auto max_batch_index = pipeline.base_batch_index + PipelineBuildState::BATCH_INCREMENT - 1;
129
  // by default set it to the maximum valid batch index value for the current pipeline
130
11.0k
  auto &partition_info = local_sink_state->partition_info;
131
11.0k
  OperatorPartitionData next_data(max_batch_index);
132
11.0k
  if ((source_chunk.size() > 0)) {
133
799
    D_ASSERT(local_source_state);
134
799
    D_ASSERT(pipeline.source_state);
135
    // if we retrieved data - initialize the next batch index
136
799
    auto partition_data = pipeline.source->GetPartitionData(context, source_chunk, *pipeline.source_state,
137
799
                                                            *local_source_state, required_partition_info);
138
799
    auto batch_index = partition_data.batch_index;
139
    // we start with the base_batch_index as a valid starting value. Make sure that next batch is called below
140
799
    next_data = std::move(partition_data);
141
799
    next_data.batch_index = pipeline.base_batch_index + batch_index + 1;
142
799
    if (next_data.batch_index >= max_batch_index) {
143
0
      throw InternalException("Pipeline batch index - invalid batch index %llu returned by source operator",
144
0
                              batch_index);
145
0
    }
146
10.2k
  } else if (have_more_output) {
147
0
    next_data.batch_index = partition_info.batch_index.GetIndex();
148
0
  }
149
11.0k
  if (next_data.batch_index == partition_info.batch_index.GetIndex()) {
150
    // no changes, return
151
0
    return SinkNextBatchType::READY;
152
0
  }
153
  // batch index has changed - update it
154
11.0k
  if (partition_info.batch_index.GetIndex() > next_data.batch_index) {
155
0
    throw InternalException(
156
0
        "Pipeline batch index - gotten lower batch index %llu (down from previous batch index of %llu)",
157
0
        next_data.batch_index, partition_info.batch_index.GetIndex());
158
0
  }
159
#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
160
  if (debug_blocked_next_batch_count < debug_blocked_target_count) {
161
    debug_blocked_next_batch_count++;
162
163
    auto &callback_state = interrupt_state;
164
    std::thread rewake_thread([callback_state] {
165
      std::this_thread::sleep_for(std::chrono::milliseconds(1));
166
      callback_state.Callback();
167
    });
168
    rewake_thread.detach();
169
170
    return SinkNextBatchType::BLOCKED;
171
  }
172
#endif
173
11.0k
  auto current_batch = partition_info.batch_index.GetIndex();
174
11.0k
  partition_info.batch_index = next_data.batch_index;
175
11.0k
  partition_info.partition_data = std::move(next_data.partition_data);
176
11.0k
  OperatorSinkNextBatchInput next_batch_input {*pipeline.sink->sink_state, *local_sink_state, interrupt_state};
177
  // call NextBatch before updating min_batch_index to provide the opportunity to flush the previous batch
178
11.0k
  auto next_batch_result = pipeline.sink->NextBatch(context, next_batch_input);
179
180
11.0k
  if (next_batch_result == SinkNextBatchType::BLOCKED) {
181
0
    partition_info.batch_index = current_batch; // set batch_index back to what it was before
182
0
    return SinkNextBatchType::BLOCKED;
183
0
  }
184
185
11.0k
  partition_info.min_batch_index = pipeline.UpdateBatchIndex(current_batch, next_data.batch_index);
186
187
11.0k
  return SinkNextBatchType::READY;
188
11.0k
}
189
190
328k
PipelineExecuteResult PipelineExecutor::Execute(idx_t max_chunks) {
191
328k
  D_ASSERT(pipeline.sink);
192
328k
  auto &source_chunk = pipeline.operators.empty() ? final_chunk : *intermediate_chunks[0];
193
328k
  ExecutionBudget chunk_budget(max_chunks);
194
739k
  do {
195
739k
    if (context.client.interrupted) {
196
137
      throw InterruptException();
197
137
    }
198
199
738k
    OperatorResultType result;
200
738k
    if (exhausted_source && done_flushing && !remaining_sink_chunk && !next_batch_blocked &&
201
0
        in_process_operators.empty()) {
202
0
      break;
203
738k
    } else if (remaining_sink_chunk) {
204
      // The pipeline was interrupted by the Sink. We should retry sinking the final chunk.
205
0
      result = ExecutePushInternal(final_chunk, chunk_budget);
206
0
      D_ASSERT(result != OperatorResultType::HAVE_MORE_OUTPUT);
207
0
      remaining_sink_chunk = false;
208
738k
    } else if (!in_process_operators.empty() && !started_flushing) {
209
      // Operator(s) in the pipeline have returned `HAVE_MORE_OUTPUT` in the last Execute call
210
      // the operators have to be called with the same input chunk to produce the rest of the output
211
0
      D_ASSERT(source_chunk.size() > 0);
212
0
      result = ExecutePushInternal(source_chunk, chunk_budget);
213
738k
    } else if (exhausted_source && !next_batch_blocked && !done_flushing) {
214
      // The source was exhausted, try flushing all operators
215
324k
      auto flush_completed = TryFlushCachingOperators(chunk_budget);
216
325k
      if (flush_completed) {
217
325k
        done_flushing = true;
218
325k
        break;
219
18.4E
      } else {
220
18.4E
        if (remaining_sink_chunk) {
221
0
          return PipelineExecuteResult::INTERRUPTED;
222
18.4E
        } else {
223
18.4E
          D_ASSERT(chunk_budget.IsDepleted());
224
18.4E
          return PipelineExecuteResult::NOT_FINISHED;
225
18.4E
        }
226
18.4E
      }
227
414k
    } else if (!exhausted_source || next_batch_blocked) {
228
413k
      SourceResultType source_result = SourceResultType::BLOCKED;
229
413k
      if (!next_batch_blocked) {
230
        // "Regular" path: fetch a chunk from the source and push it through the pipeline
231
413k
        source_chunk.Reset();
232
413k
        source_result = FetchFromSource(source_chunk);
233
413k
        if (source_result == SourceResultType::BLOCKED) {
234
0
          return PipelineExecuteResult::INTERRUPTED;
235
0
        }
236
413k
        if (source_result == SourceResultType::FINISHED) {
237
327k
          exhausted_source = true;
238
327k
        }
239
413k
      }
240
241
413k
      if (required_partition_info.AnyRequired()) {
242
11.0k
        auto next_batch_result = NextBatch(source_chunk, source_result == SourceResultType::HAVE_MORE_OUTPUT);
243
11.0k
        next_batch_blocked = next_batch_result == SinkNextBatchType::BLOCKED;
244
11.0k
        if (next_batch_blocked) {
245
0
          return PipelineExecuteResult::INTERRUPTED;
246
0
        }
247
11.0k
      }
248
249
413k
      if (exhausted_source && source_chunk.size() == 0) {
250
270k
        continue;
251
270k
      }
252
253
143k
      result = ExecutePushInternal(source_chunk, chunk_budget);
254
143k
    } else {
255
589
      throw InternalException("Unexpected state reached in pipeline executor");
256
589
    }
257
258
    // SINK INTERRUPT
259
143k
    if (result == OperatorResultType::BLOCKED) {
260
0
      remaining_sink_chunk = true;
261
0
      return PipelineExecuteResult::INTERRUPTED;
262
0
    }
263
264
143k
    if (result == OperatorResultType::FINISHED) {
265
13
      break;
266
13
    }
267
413k
  } while (chunk_budget.Next());
268
269
328k
  if ((!exhausted_source || !done_flushing) && !IsFinished()) {
270
0
    return PipelineExecuteResult::NOT_FINISHED;
271
0
  }
272
273
328k
  return PushFinalize();
274
328k
}
275
276
0
bool PipelineExecutor::RemainingSinkChunk() const {
277
0
  return remaining_sink_chunk;
278
0
}
279
280
216k
PipelineExecuteResult PipelineExecutor::Execute() {
281
216k
  return Execute(NumericLimits<idx_t>::Maximum());
282
216k
}
283
284
22.1k
void PipelineExecutor::FinishProcessing(int32_t operator_idx) {
285
22.1k
  finished_processing_idx = operator_idx < 0 ? NumericLimits<int32_t>::Maximum() : operator_idx;
286
22.1k
  in_process_operators = stack<idx_t>();
287
288
22.1k
  if (pipeline.GetSource()) {
289
22.1k
    auto guard = pipeline.source_state->Lock();
290
22.1k
    pipeline.source_state->PreventBlocking(guard);
291
22.1k
    pipeline.source_state->UnblockTasks(guard);
292
22.1k
  }
293
22.1k
  if (pipeline.GetSink()) {
294
22.1k
    auto guard = pipeline.GetSink()->sink_state->Lock();
295
22.1k
    pipeline.GetSink()->sink_state->PreventBlocking(guard);
296
22.1k
    pipeline.GetSink()->sink_state->UnblockTasks(guard);
297
22.1k
  }
298
22.1k
}
299
300
324k
bool PipelineExecutor::IsFinished() {
301
324k
  return finished_processing_idx >= 0;
302
324k
}
303
304
OperatorResultType PipelineExecutor::ExecutePushInternal(DataChunk &input, ExecutionBudget &chunk_budget,
305
197k
                                                         idx_t initial_idx) {
306
197k
  D_ASSERT(pipeline.sink);
307
197k
  if (input.size() == 0) { // LCOV_EXCL_START
308
26.9k
    return OperatorResultType::NEED_MORE_INPUT;
309
26.9k
  } // LCOV_EXCL_STOP
310
311
  // this loop will continuously push the input chunk through the pipeline as long as:
312
  // - the OperatorResultType for the Execute is HAVE_MORE_OUTPUT
313
  // - the Sink doesn't block
314
  // - the ExecutionBudget has not been depleted
315
170k
  OperatorResultType result = OperatorResultType::HAVE_MORE_OUTPUT;
316
172k
  do {
317
    // Note: if input is the final_chunk, we don't do any executing, the chunk just needs to be sinked
318
172k
    if (&input != &final_chunk) {
319
151k
      final_chunk.Reset();
320
      // Execute and put the result into 'final_chunk'
321
151k
      result = Execute(input, final_chunk, initial_idx);
322
151k
      if (result == OperatorResultType::FINISHED) {
323
13
        return OperatorResultType::FINISHED;
324
13
      }
325
151k
    } else {
326
20.6k
      result = OperatorResultType::NEED_MORE_INPUT;
327
20.6k
    }
328
172k
    auto &sink_chunk = final_chunk;
329
172k
    if (sink_chunk.size() > 0) {
330
105k
      StartOperator(*pipeline.sink);
331
105k
      D_ASSERT(pipeline.sink);
332
105k
      D_ASSERT(pipeline.sink->sink_state);
333
105k
      OperatorSinkInput sink_input {*pipeline.sink->sink_state, *local_sink_state, interrupt_state};
334
335
105k
      auto sink_result = Sink(sink_chunk, sink_input);
336
337
105k
      EndOperator(*pipeline.sink, nullptr);
338
339
105k
      if (sink_result == SinkResultType::BLOCKED) {
340
0
        return OperatorResultType::BLOCKED;
341
105k
      } else if (sink_result == SinkResultType::FINISHED) {
342
0
        FinishProcessing();
343
0
        return OperatorResultType::FINISHED;
344
0
      }
345
105k
    }
346
172k
    if (result == OperatorResultType::NEED_MORE_INPUT) {
347
168k
      return OperatorResultType::NEED_MORE_INPUT;
348
168k
    }
349
172k
  } while (chunk_budget.Next());
350
2.40k
  return result;
351
170k
}
352
353
324k
PipelineExecuteResult PipelineExecutor::PushFinalize() {
354
324k
  if (finalized) {
355
0
    throw InternalException("Calling PushFinalize on a pipeline that has been finalized already");
356
0
  }
357
358
324k
  D_ASSERT(local_sink_state);
359
360
  // Run the combine for the sink
361
324k
  OperatorSinkCombineInput combine_input {*pipeline.sink->sink_state, *local_sink_state, interrupt_state};
362
363
#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
364
  if (debug_blocked_combine_count < debug_blocked_target_count) {
365
    debug_blocked_combine_count++;
366
367
    auto &callback_state = combine_input.interrupt_state;
368
    std::thread rewake_thread([callback_state] {
369
      std::this_thread::sleep_for(std::chrono::milliseconds(1));
370
      callback_state.Callback();
371
    });
372
    rewake_thread.detach();
373
374
    return PipelineExecuteResult::INTERRUPTED;
375
  }
376
#endif
377
324k
  auto result = pipeline.sink->Combine(context, combine_input);
378
379
324k
  if (result == SinkCombineResultType::BLOCKED) {
380
0
    return PipelineExecuteResult::INTERRUPTED;
381
0
  }
382
383
324k
  finalized = true;
384
  // flush all query profiler info
385
802k
  for (idx_t i = 0; i < intermediate_states.size(); i++) {
386
477k
    intermediate_states[i]->Finalize(pipeline.operators[i].get(), context);
387
477k
  }
388
324k
  pipeline.executor.Flush(thread);
389
324k
  local_sink_state.reset();
390
391
324k
  return PipelineExecuteResult::FINISHED;
392
324k
}
393
394
243k
void PipelineExecutor::GoToSource(idx_t &current_idx, idx_t initial_idx) {
395
  // we go back to the first operator (the source)
396
243k
  current_idx = initial_idx;
397
243k
  if (!in_process_operators.empty()) {
398
    // ... UNLESS there is an in process operator
399
    // if there is an in-process operator, we start executing at the latest one
400
    // for example, if we have a join operator that has tuples left, we first need to emit those tuples
401
29.2k
    current_idx = in_process_operators.top();
402
29.2k
    in_process_operators.pop();
403
29.2k
  }
404
243k
  D_ASSERT(current_idx >= initial_idx);
405
243k
}
406
407
151k
OperatorResultType PipelineExecutor::Execute(DataChunk &input, DataChunk &result, idx_t initial_idx) {
408
151k
  if (input.size() == 0) { // LCOV_EXCL_START
409
0
    return OperatorResultType::NEED_MORE_INPUT;
410
0
  } // LCOV_EXCL_STOP
411
151k
  D_ASSERT(!pipeline.operators.empty());
412
413
151k
  idx_t current_idx;
414
151k
  GoToSource(current_idx, initial_idx);
415
151k
  if (current_idx == initial_idx) {
416
150k
    current_idx++;
417
150k
  }
418
151k
  if (current_idx > pipeline.operators.size()) {
419
0
    result.Reference(input);
420
0
    return OperatorResultType::NEED_MORE_INPUT;
421
0
  }
422
318k
  while (true) {
423
315k
    if (context.client.interrupted) {
424
53
      throw InterruptException();
425
53
    }
426
    // now figure out where to put the chunk
427
    // if current_idx is the last possible index (>= operators.size()) we write to the result
428
    // otherwise we write to an intermediate chunk
429
315k
    auto current_intermediate = current_idx;
430
315k
    auto &current_chunk =
431
315k
        current_intermediate >= intermediate_chunks.size() ? result : *intermediate_chunks[current_intermediate];
432
315k
    current_chunk.Reset();
433
315k
    if (current_idx == initial_idx) {
434
      // we went back to the source: we need more input
435
63.8k
      return OperatorResultType::NEED_MORE_INPUT;
436
252k
    } else {
437
252k
      auto &prev_chunk =
438
252k
          current_intermediate == initial_idx + 1 ? input : *intermediate_chunks[current_intermediate - 1];
439
252k
      auto operator_idx = current_idx - 1;
440
252k
      auto &current_operator = pipeline.operators[operator_idx].get();
441
442
      // if current_idx > source_idx, we pass the previous operators' output through the Execute of the current
443
      // operator
444
252k
      StartOperator(current_operator);
445
252k
      auto result = current_operator.Execute(context, prev_chunk, current_chunk, *current_operator.op_state,
446
252k
                                             *intermediate_states[current_intermediate - 1]);
447
252k
      EndOperator(current_operator, &current_chunk);
448
252k
      if (result == OperatorResultType::HAVE_MORE_OUTPUT) {
449
        // more data remains in this operator
450
        // push in-process marker
451
29.2k
        in_process_operators.push(current_idx);
452
222k
      } else if (result == OperatorResultType::FINISHED) {
453
13
        D_ASSERT(current_chunk.size() == 0);
454
13
        FinishProcessing(NumericCast<int32_t>(current_idx));
455
13
        return OperatorResultType::FINISHED;
456
13
      }
457
252k
      current_chunk.Verify();
458
252k
    }
459
460
252k
    if (current_chunk.size() == 0) {
461
      // no output from this operator!
462
91.6k
      if (current_idx == initial_idx) {
463
        // if we got no output from the scan, we are done
464
0
        break;
465
91.6k
      } else {
466
        // if we got no output from an intermediate op
467
        // we go back and try to pull data from the source again
468
91.6k
        GoToSource(current_idx, initial_idx);
469
91.6k
        continue;
470
91.6k
      }
471
160k
    } else {
472
      // we got output! continue to the next operator
473
160k
      current_idx++;
474
160k
      if (current_idx > pipeline.operators.size()) {
475
        // if we got output and are at the last operator, we are finished executing for this output chunk
476
        // return the data and push it into the chunk
477
84.8k
        break;
478
84.8k
      }
479
160k
    }
480
252k
  }
481
87.5k
  return in_process_operators.empty() ? OperatorResultType::NEED_MORE_INPUT : OperatorResultType::HAVE_MORE_OUTPUT;
482
151k
}
483
484
329k
void PipelineExecutor::SetTaskForInterrupts(weak_ptr<Task> current_task) {
485
329k
  interrupt_state = InterruptState(std::move(current_task));
486
329k
}
487
488
413k
SourceResultType PipelineExecutor::GetData(DataChunk &chunk, OperatorSourceInput &input) {
489
  //! Testing feature to enable async source on every operator
490
#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
491
  if (debug_blocked_source_count < debug_blocked_target_count) {
492
    debug_blocked_source_count++;
493
494
    auto &callback_state = input.interrupt_state;
495
    std::thread rewake_thread([callback_state] {
496
      std::this_thread::sleep_for(std::chrono::milliseconds(1));
497
      callback_state.Callback();
498
    });
499
    rewake_thread.detach();
500
501
    return SourceResultType::BLOCKED;
502
  }
503
#endif
504
505
413k
  return pipeline.source->GetData(context, chunk, input);
506
413k
}
507
508
105k
SinkResultType PipelineExecutor::Sink(DataChunk &chunk, OperatorSinkInput &input) {
509
  //! Testing feature to enable async sink on every operator
510
#ifdef DUCKDB_DEBUG_ASYNC_SINK_SOURCE
511
  if (debug_blocked_sink_count < debug_blocked_target_count) {
512
    debug_blocked_sink_count++;
513
514
    auto &callback_state = input.interrupt_state;
515
    std::thread rewake_thread([callback_state] {
516
      std::this_thread::sleep_for(std::chrono::milliseconds(1));
517
      callback_state.Callback();
518
    });
519
    rewake_thread.detach();
520
521
    return SinkResultType::BLOCKED;
522
  }
523
#endif
524
105k
  return pipeline.sink->Sink(context, chunk, input);
525
105k
}
526
527
413k
SourceResultType PipelineExecutor::FetchFromSource(DataChunk &result) {
528
413k
  StartOperator(*pipeline.source);
529
530
413k
  OperatorSourceInput source_input = {*pipeline.source_state, *local_source_state, interrupt_state};
531
413k
  auto res = GetData(result, source_input);
532
533
  // Ensures sources only return empty results when Blocking or Finished
534
413k
  D_ASSERT(res != SourceResultType::BLOCKED || result.size() == 0);
535
413k
  if (res == SourceResultType::FINISHED) {
536
    // final call into the source - finish source execution
537
327k
    context.thread.profiler.FinishSource(*pipeline.source_state, *local_source_state);
538
327k
  }
539
413k
  EndOperator(*pipeline.source, &result);
540
541
413k
  return res;
542
413k
}
543
544
404k
void PipelineExecutor::InitializeChunk(DataChunk &chunk) {
545
404k
  auto &last_op = pipeline.operators.empty() ? *pipeline.source : pipeline.operators.back().get();
546
404k
  chunk.Initialize(BufferAllocator::Get(context.client), last_op.GetTypes());
547
404k
}
548
549
824k
void PipelineExecutor::StartOperator(PhysicalOperator &op) {
550
824k
  if (context.client.interrupted) {
551
4
    throw InterruptException();
552
4
  }
553
824k
  context.thread.profiler.StartOperator(&op);
554
824k
}
555
556
822k
void PipelineExecutor::EndOperator(PhysicalOperator &op, optional_ptr<DataChunk> chunk) {
557
822k
  context.thread.profiler.EndOperator(chunk);
558
559
822k
  if (chunk) {
560
717k
    chunk->Verify();
561
717k
  }
562
822k
}
563
564
} // namespace duckdb