Coverage Report

Created: 2025-11-01 07:40

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/src/parallel/meta_pipeline.cpp
Line
Count
Source
1
#include "duckdb/parallel/meta_pipeline.hpp"
2
3
#include "duckdb/execution/executor.hpp"
4
5
namespace duckdb {
6
7
MetaPipeline::MetaPipeline(Executor &executor_p, PipelineBuildState &state_p, optional_ptr<PhysicalOperator> sink_p,
8
                           MetaPipelineType type_p)
9
288k
    : executor(executor_p), state(state_p), sink(sink_p), type(type_p), recursive_cte(false), next_batch_index(0) {
10
288k
  CreatePipeline();
11
288k
}
12
13
0
Executor &MetaPipeline::GetExecutor() const {
14
0
  return executor;
15
0
}
16
17
678k
PipelineBuildState &MetaPipeline::GetState() const {
18
678k
  return state;
19
678k
}
20
21
15.1k
optional_ptr<PhysicalOperator> MetaPipeline::GetSink() const {
22
15.1k
  return sink;
23
15.1k
}
24
25
537k
optional_ptr<Pipeline> MetaPipeline::GetParent() const {
26
537k
  return parent;
27
537k
}
28
29
771k
shared_ptr<Pipeline> &MetaPipeline::GetBasePipeline() {
30
771k
  return pipelines[0];
31
771k
}
32
33
639k
void MetaPipeline::GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive) {
34
639k
  result.insert(result.end(), pipelines.begin(), pipelines.end());
35
639k
  if (recursive) {
36
288k
    for (auto &child : children) {
37
206k
      child->GetPipelines(result, true);
38
206k
    }
39
288k
  }
40
639k
}
41
42
495k
void MetaPipeline::GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip) {
43
495k
  if (!skip) {
44
0
    result.push_back(shared_from_this());
45
0
  }
46
495k
  for (auto &child : children) {
47
331k
    result.push_back(child);
48
331k
    if (recursive) {
49
206k
      child->GetMetaPipelines(result, true, true);
50
206k
    }
51
331k
  }
52
495k
}
53
54
0
MetaPipeline &MetaPipeline::GetLastChild() {
55
0
  if (children.empty()) {
56
0
    return *this;
57
0
  }
58
0
  reference<const vector<shared_ptr<MetaPipeline>>> current_children = children;
59
0
  while (!current_children.get().back()->children.empty()) {
60
0
    current_children = current_children.get().back()->children;
61
0
  }
62
0
  return *current_children.get().back();
63
0
}
64
65
206k
const reference_map_t<Pipeline, vector<reference<Pipeline>>> &MetaPipeline::GetDependencies() const {
66
206k
  return pipeline_dependencies;
67
206k
}
68
69
478k
MetaPipelineType MetaPipeline::Type() const {
70
478k
  return type;
71
478k
}
72
73
0
bool MetaPipeline::HasRecursiveCTE() const {
74
0
  return recursive_cte;
75
0
}
76
77
0
void MetaPipeline::SetRecursiveCTE() {
78
0
  recursive_cte = true;
79
0
}
80
81
29.4k
void MetaPipeline::AssignNextBatchIndex(Pipeline &pipeline) {
82
29.4k
  pipeline.base_batch_index = next_batch_index++ * PipelineBuildState::BATCH_INCREMENT;
83
29.4k
}
84
85
293k
void MetaPipeline::Build(PhysicalOperator &op) {
86
293k
  D_ASSERT(pipelines.size() == 1);
87
293k
  D_ASSERT(children.empty());
88
293k
  op.BuildPipelines(*pipelines.back(), *this);
89
293k
}
90
91
288k
void MetaPipeline::Ready() const {
92
348k
  for (auto &pipeline : pipelines) {
93
348k
    pipeline->Ready();
94
348k
  }
95
288k
  for (auto &child : children) {
96
206k
    child->Ready();
97
206k
  }
98
288k
}
99
100
206k
MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline &current, PhysicalOperator &op, MetaPipelineType type) {
101
206k
  children.push_back(make_shared_ptr<MetaPipeline>(executor, state, &op, type));
102
206k
  auto &child_meta_pipeline = *children.back().get();
103
  // store the parent
104
206k
  child_meta_pipeline.parent = &current;
105
  // child MetaPipeline must finish completely before this MetaPipeline can start
106
206k
  current.AddDependency(child_meta_pipeline.GetBasePipeline());
107
  // child meta pipeline is part of the recursive CTE too
108
206k
  child_meta_pipeline.recursive_cte = recursive_cte;
109
206k
  return child_meta_pipeline;
110
206k
}
111
112
317k
Pipeline &MetaPipeline::CreatePipeline() {
113
317k
  pipelines.emplace_back(make_shared_ptr<Pipeline>(executor));
114
317k
  state.SetPipelineSink(*pipelines.back(), sink, next_batch_index++);
115
317k
  return *pipelines.back();
116
317k
}
117
118
vector<shared_ptr<Pipeline>> MetaPipeline::AddDependenciesFrom(Pipeline &dependant, const Pipeline &start,
119
32.3k
                                                               const bool including) {
120
  // find 'start'
121
32.3k
  auto it = pipelines.begin();
122
38.3k
  for (; !RefersToSameObject(**it, start); it++) {
123
6.02k
  }
124
125
32.3k
  if (!including) {
126
32.3k
    it++;
127
32.3k
  }
128
129
  // collect pipelines that were created from then
130
32.3k
  vector<shared_ptr<Pipeline>> created_pipelines;
131
67.3k
  for (; it != pipelines.end(); it++) {
132
34.9k
    if (RefersToSameObject(**it, dependant)) {
133
      // cannot depend on itself
134
30.6k
      continue;
135
30.6k
    }
136
4.30k
    created_pipelines.push_back(*it);
137
4.30k
  }
138
139
  // add them to the dependencies
140
32.3k
  auto &explicit_deps = pipeline_dependencies[dependant];
141
32.3k
  for (auto &created_pipeline : created_pipelines) {
142
4.30k
    explicit_deps.push_back(*created_pipeline);
143
4.30k
  }
144
145
32.3k
  return created_pipelines;
146
32.3k
}
147
148
0
static bool PipelineExceedsThreadCount(Pipeline &pipeline, const idx_t thread_count) {
149
#ifdef DEBUG
150
  // we always add the dependency in debug mode so that this is well-tested
151
  return true;
152
#else
153
0
  return pipeline.GetSource()->EstimatedThreadCount() > thread_count;
154
0
#endif
155
0
}
156
157
void MetaPipeline::AddRecursiveDependencies(const vector<shared_ptr<Pipeline>> &new_dependencies,
158
0
                                            const MetaPipeline &last_child) {
159
0
  if (recursive_cte) {
160
0
    return; // let's not burn our fingers on this for now
161
0
  }
162
163
0
  vector<shared_ptr<MetaPipeline>> child_meta_pipelines;
164
0
  this->GetMetaPipelines(child_meta_pipelines, true, false);
165
166
  // find the meta pipeline that has the same sink as 'pipeline'
167
0
  auto it = child_meta_pipelines.begin();
168
0
  for (; !RefersToSameObject(last_child, **it); it++) {
169
0
  }
170
0
  D_ASSERT(it != child_meta_pipelines.end());
171
172
  // skip over it
173
0
  it++;
174
175
  // we try to limit the performance impact of these dependencies on smaller workloads,
176
  // by only adding the dependencies if the source operator can likely keep all threads busy
177
0
  const auto thread_count = NumericCast<idx_t>(TaskScheduler::GetScheduler(executor.context).NumberOfThreads());
178
0
  for (; it != child_meta_pipelines.end(); it++) {
179
0
    for (auto &pipeline : it->get()->pipelines) {
180
0
      if (!PipelineExceedsThreadCount(*pipeline, thread_count)) {
181
0
        continue;
182
0
      }
183
0
      auto &pipeline_deps = pipeline_dependencies[*pipeline];
184
0
      for (auto &new_dependency : new_dependencies) {
185
0
        if (!PipelineExceedsThreadCount(*new_dependency, thread_count)) {
186
0
          continue;
187
0
        }
188
0
        pipeline_deps.push_back(*new_dependency);
189
0
      }
190
0
    }
191
0
  }
192
0
}
193
194
0
void MetaPipeline::AddFinishEvent(Pipeline &pipeline) {
195
0
  D_ASSERT(finish_pipelines.find(pipeline) == finish_pipelines.end());
196
0
  finish_pipelines.insert(pipeline);
197
198
  // add all pipelines that were added since 'pipeline' was added (including 'pipeline') to the finish group
199
0
  auto it = pipelines.begin();
200
0
  for (; !RefersToSameObject(**it, pipeline); it++) {
201
0
  }
202
0
  it++;
203
0
  for (; it != pipelines.end(); it++) {
204
0
    finish_map.emplace(**it, pipeline);
205
0
  }
206
0
}
207
208
60.0k
bool MetaPipeline::HasFinishEvent(Pipeline &pipeline) const {
209
60.0k
  return finish_pipelines.find(pipeline) != finish_pipelines.end();
210
60.0k
}
211
212
60.0k
optional_ptr<Pipeline> MetaPipeline::GetFinishGroup(Pipeline &pipeline) const {
213
60.0k
  auto it = finish_map.find(pipeline);
214
60.0k
  return it == finish_map.end() ? nullptr : &it->second;
215
60.0k
}
216
217
29.4k
Pipeline &MetaPipeline::CreateUnionPipeline(Pipeline &current, bool order_matters) {
218
  // create the union pipeline (batch index 0, should be set correctly afterwards)
219
29.4k
  auto &union_pipeline = CreatePipeline();
220
29.4k
  state.SetPipelineOperators(union_pipeline, state.GetPipelineOperators(current));
221
29.4k
  state.SetPipelineSink(union_pipeline, sink, 0);
222
223
  // 'union_pipeline' inherits ALL dependencies of 'current' (within this MetaPipeline, and across MetaPipelines)
224
29.4k
  union_pipeline.dependencies = current.dependencies;
225
29.4k
  auto it = pipeline_dependencies.find(current);
226
29.4k
  if (it != pipeline_dependencies.end()) {
227
0
    pipeline_dependencies[union_pipeline] = it->second;
228
0
  }
229
230
29.4k
  if (order_matters) {
231
    // if we need to preserve order, or if the sink is not parallel, we set a dependency
232
1.72k
    pipeline_dependencies[union_pipeline].push_back(current);
233
1.72k
  }
234
235
29.4k
  return union_pipeline;
236
29.4k
}
237
238
30.6k
void MetaPipeline::CreateChildPipeline(Pipeline &current, PhysicalOperator &op, Pipeline &last_pipeline) {
239
  // rule 2: 'current' must be fully built (down to the source) before creating the child pipeline
240
30.6k
  D_ASSERT(current.source);
241
242
  // create the child pipeline (same batch index)
243
30.6k
  pipelines.emplace_back(state.CreateChildPipeline(executor, current, op));
244
30.6k
  auto &child_pipeline = *pipelines.back();
245
30.6k
  child_pipeline.base_batch_index = current.base_batch_index;
246
247
  // child pipeline has a dependency (within this MetaPipeline on all pipelines that were scheduled
248
  // between 'current' and now (including 'current') - set them up
249
30.6k
  pipeline_dependencies[child_pipeline].push_back(current);
250
30.6k
  AddDependenciesFrom(child_pipeline, last_pipeline, false);
251
30.6k
  D_ASSERT(pipeline_dependencies.find(child_pipeline) != pipeline_dependencies.end());
252
30.6k
}
253
254
} // namespace duckdb