/src/duckdb/src/parallel/meta_pipeline.cpp
Line | Count | Source |
1 | | #include "duckdb/parallel/meta_pipeline.hpp" |
2 | | |
3 | | #include "duckdb/execution/executor.hpp" |
4 | | |
5 | | namespace duckdb { |
6 | | |
7 | | MetaPipeline::MetaPipeline(Executor &executor_p, PipelineBuildState &state_p, optional_ptr<PhysicalOperator> sink_p, |
8 | | MetaPipelineType type_p) |
9 | 288k | : executor(executor_p), state(state_p), sink(sink_p), type(type_p), recursive_cte(false), next_batch_index(0) { |
10 | 288k | CreatePipeline(); |
11 | 288k | } |
12 | | |
13 | 0 | Executor &MetaPipeline::GetExecutor() const { |
14 | 0 | return executor; |
15 | 0 | } |
16 | | |
17 | 678k | PipelineBuildState &MetaPipeline::GetState() const { |
18 | 678k | return state; |
19 | 678k | } |
20 | | |
21 | 15.1k | optional_ptr<PhysicalOperator> MetaPipeline::GetSink() const { |
22 | 15.1k | return sink; |
23 | 15.1k | } |
24 | | |
25 | 537k | optional_ptr<Pipeline> MetaPipeline::GetParent() const { |
26 | 537k | return parent; |
27 | 537k | } |
28 | | |
29 | 771k | shared_ptr<Pipeline> &MetaPipeline::GetBasePipeline() { |
30 | 771k | return pipelines[0]; |
31 | 771k | } |
32 | | |
33 | 639k | void MetaPipeline::GetPipelines(vector<shared_ptr<Pipeline>> &result, bool recursive) { |
34 | 639k | result.insert(result.end(), pipelines.begin(), pipelines.end()); |
35 | 639k | if (recursive) { |
36 | 288k | for (auto &child : children) { |
37 | 206k | child->GetPipelines(result, true); |
38 | 206k | } |
39 | 288k | } |
40 | 639k | } |
41 | | |
42 | 495k | void MetaPipeline::GetMetaPipelines(vector<shared_ptr<MetaPipeline>> &result, bool recursive, bool skip) { |
43 | 495k | if (!skip) { |
44 | 0 | result.push_back(shared_from_this()); |
45 | 0 | } |
46 | 495k | for (auto &child : children) { |
47 | 331k | result.push_back(child); |
48 | 331k | if (recursive) { |
49 | 206k | child->GetMetaPipelines(result, true, true); |
50 | 206k | } |
51 | 331k | } |
52 | 495k | } |
53 | | |
54 | 0 | MetaPipeline &MetaPipeline::GetLastChild() { |
55 | 0 | if (children.empty()) { |
56 | 0 | return *this; |
57 | 0 | } |
58 | 0 | reference<const vector<shared_ptr<MetaPipeline>>> current_children = children; |
59 | 0 | while (!current_children.get().back()->children.empty()) { |
60 | 0 | current_children = current_children.get().back()->children; |
61 | 0 | } |
62 | 0 | return *current_children.get().back(); |
63 | 0 | } |
64 | | |
65 | 206k | const reference_map_t<Pipeline, vector<reference<Pipeline>>> &MetaPipeline::GetDependencies() const { |
66 | 206k | return pipeline_dependencies; |
67 | 206k | } |
68 | | |
69 | 478k | MetaPipelineType MetaPipeline::Type() const { |
70 | 478k | return type; |
71 | 478k | } |
72 | | |
73 | 0 | bool MetaPipeline::HasRecursiveCTE() const { |
74 | 0 | return recursive_cte; |
75 | 0 | } |
76 | | |
77 | 0 | void MetaPipeline::SetRecursiveCTE() { |
78 | 0 | recursive_cte = true; |
79 | 0 | } |
80 | | |
81 | 29.4k | void MetaPipeline::AssignNextBatchIndex(Pipeline &pipeline) { |
82 | 29.4k | pipeline.base_batch_index = next_batch_index++ * PipelineBuildState::BATCH_INCREMENT; |
83 | 29.4k | } |
84 | | |
85 | 293k | void MetaPipeline::Build(PhysicalOperator &op) { |
86 | 293k | D_ASSERT(pipelines.size() == 1); |
87 | 293k | D_ASSERT(children.empty()); |
88 | 293k | op.BuildPipelines(*pipelines.back(), *this); |
89 | 293k | } |
90 | | |
91 | 288k | void MetaPipeline::Ready() const { |
92 | 348k | for (auto &pipeline : pipelines) { |
93 | 348k | pipeline->Ready(); |
94 | 348k | } |
95 | 288k | for (auto &child : children) { |
96 | 206k | child->Ready(); |
97 | 206k | } |
98 | 288k | } |
99 | | |
100 | 206k | MetaPipeline &MetaPipeline::CreateChildMetaPipeline(Pipeline ¤t, PhysicalOperator &op, MetaPipelineType type) { |
101 | 206k | children.push_back(make_shared_ptr<MetaPipeline>(executor, state, &op, type)); |
102 | 206k | auto &child_meta_pipeline = *children.back().get(); |
103 | | // store the parent |
104 | 206k | child_meta_pipeline.parent = ¤t; |
105 | | // child MetaPipeline must finish completely before this MetaPipeline can start |
106 | 206k | current.AddDependency(child_meta_pipeline.GetBasePipeline()); |
107 | | // child meta pipeline is part of the recursive CTE too |
108 | 206k | child_meta_pipeline.recursive_cte = recursive_cte; |
109 | 206k | return child_meta_pipeline; |
110 | 206k | } |
111 | | |
112 | 317k | Pipeline &MetaPipeline::CreatePipeline() { |
113 | 317k | pipelines.emplace_back(make_shared_ptr<Pipeline>(executor)); |
114 | 317k | state.SetPipelineSink(*pipelines.back(), sink, next_batch_index++); |
115 | 317k | return *pipelines.back(); |
116 | 317k | } |
117 | | |
118 | | vector<shared_ptr<Pipeline>> MetaPipeline::AddDependenciesFrom(Pipeline &dependant, const Pipeline &start, |
119 | 32.3k | const bool including) { |
120 | | // find 'start' |
121 | 32.3k | auto it = pipelines.begin(); |
122 | 38.3k | for (; !RefersToSameObject(**it, start); it++) { |
123 | 6.02k | } |
124 | | |
125 | 32.3k | if (!including) { |
126 | 32.3k | it++; |
127 | 32.3k | } |
128 | | |
129 | | // collect pipelines that were created from then |
130 | 32.3k | vector<shared_ptr<Pipeline>> created_pipelines; |
131 | 67.3k | for (; it != pipelines.end(); it++) { |
132 | 34.9k | if (RefersToSameObject(**it, dependant)) { |
133 | | // cannot depend on itself |
134 | 30.6k | continue; |
135 | 30.6k | } |
136 | 4.30k | created_pipelines.push_back(*it); |
137 | 4.30k | } |
138 | | |
139 | | // add them to the dependencies |
140 | 32.3k | auto &explicit_deps = pipeline_dependencies[dependant]; |
141 | 32.3k | for (auto &created_pipeline : created_pipelines) { |
142 | 4.30k | explicit_deps.push_back(*created_pipeline); |
143 | 4.30k | } |
144 | | |
145 | 32.3k | return created_pipelines; |
146 | 32.3k | } |
147 | | |
148 | 0 | static bool PipelineExceedsThreadCount(Pipeline &pipeline, const idx_t thread_count) { |
149 | | #ifdef DEBUG |
150 | | // we always add the dependency in debug mode so that this is well-tested |
151 | | return true; |
152 | | #else |
153 | 0 | return pipeline.GetSource()->EstimatedThreadCount() > thread_count; |
154 | 0 | #endif |
155 | 0 | } |
156 | | |
157 | | void MetaPipeline::AddRecursiveDependencies(const vector<shared_ptr<Pipeline>> &new_dependencies, |
158 | 0 | const MetaPipeline &last_child) { |
159 | 0 | if (recursive_cte) { |
160 | 0 | return; // let's not burn our fingers on this for now |
161 | 0 | } |
162 | | |
163 | 0 | vector<shared_ptr<MetaPipeline>> child_meta_pipelines; |
164 | 0 | this->GetMetaPipelines(child_meta_pipelines, true, false); |
165 | | |
166 | | // find the meta pipeline that has the same sink as 'pipeline' |
167 | 0 | auto it = child_meta_pipelines.begin(); |
168 | 0 | for (; !RefersToSameObject(last_child, **it); it++) { |
169 | 0 | } |
170 | 0 | D_ASSERT(it != child_meta_pipelines.end()); |
171 | | |
172 | | // skip over it |
173 | 0 | it++; |
174 | | |
175 | | // we try to limit the performance impact of these dependencies on smaller workloads, |
176 | | // by only adding the dependencies if the source operator can likely keep all threads busy |
177 | 0 | const auto thread_count = NumericCast<idx_t>(TaskScheduler::GetScheduler(executor.context).NumberOfThreads()); |
178 | 0 | for (; it != child_meta_pipelines.end(); it++) { |
179 | 0 | for (auto &pipeline : it->get()->pipelines) { |
180 | 0 | if (!PipelineExceedsThreadCount(*pipeline, thread_count)) { |
181 | 0 | continue; |
182 | 0 | } |
183 | 0 | auto &pipeline_deps = pipeline_dependencies[*pipeline]; |
184 | 0 | for (auto &new_dependency : new_dependencies) { |
185 | 0 | if (!PipelineExceedsThreadCount(*new_dependency, thread_count)) { |
186 | 0 | continue; |
187 | 0 | } |
188 | 0 | pipeline_deps.push_back(*new_dependency); |
189 | 0 | } |
190 | 0 | } |
191 | 0 | } |
192 | 0 | } |
193 | | |
194 | 0 | void MetaPipeline::AddFinishEvent(Pipeline &pipeline) { |
195 | 0 | D_ASSERT(finish_pipelines.find(pipeline) == finish_pipelines.end()); |
196 | 0 | finish_pipelines.insert(pipeline); |
197 | | |
198 | | // add all pipelines that were added since 'pipeline' was added (including 'pipeline') to the finish group |
199 | 0 | auto it = pipelines.begin(); |
200 | 0 | for (; !RefersToSameObject(**it, pipeline); it++) { |
201 | 0 | } |
202 | 0 | it++; |
203 | 0 | for (; it != pipelines.end(); it++) { |
204 | 0 | finish_map.emplace(**it, pipeline); |
205 | 0 | } |
206 | 0 | } |
207 | | |
208 | 60.0k | bool MetaPipeline::HasFinishEvent(Pipeline &pipeline) const { |
209 | 60.0k | return finish_pipelines.find(pipeline) != finish_pipelines.end(); |
210 | 60.0k | } |
211 | | |
212 | 60.0k | optional_ptr<Pipeline> MetaPipeline::GetFinishGroup(Pipeline &pipeline) const { |
213 | 60.0k | auto it = finish_map.find(pipeline); |
214 | 60.0k | return it == finish_map.end() ? nullptr : &it->second; |
215 | 60.0k | } |
216 | | |
217 | 29.4k | Pipeline &MetaPipeline::CreateUnionPipeline(Pipeline ¤t, bool order_matters) { |
218 | | // create the union pipeline (batch index 0, should be set correctly afterwards) |
219 | 29.4k | auto &union_pipeline = CreatePipeline(); |
220 | 29.4k | state.SetPipelineOperators(union_pipeline, state.GetPipelineOperators(current)); |
221 | 29.4k | state.SetPipelineSink(union_pipeline, sink, 0); |
222 | | |
223 | | // 'union_pipeline' inherits ALL dependencies of 'current' (within this MetaPipeline, and across MetaPipelines) |
224 | 29.4k | union_pipeline.dependencies = current.dependencies; |
225 | 29.4k | auto it = pipeline_dependencies.find(current); |
226 | 29.4k | if (it != pipeline_dependencies.end()) { |
227 | 0 | pipeline_dependencies[union_pipeline] = it->second; |
228 | 0 | } |
229 | | |
230 | 29.4k | if (order_matters) { |
231 | | // if we need to preserve order, or if the sink is not parallel, we set a dependency |
232 | 1.72k | pipeline_dependencies[union_pipeline].push_back(current); |
233 | 1.72k | } |
234 | | |
235 | 29.4k | return union_pipeline; |
236 | 29.4k | } |
237 | | |
238 | 30.6k | void MetaPipeline::CreateChildPipeline(Pipeline ¤t, PhysicalOperator &op, Pipeline &last_pipeline) { |
239 | | // rule 2: 'current' must be fully built (down to the source) before creating the child pipeline |
240 | 30.6k | D_ASSERT(current.source); |
241 | | |
242 | | // create the child pipeline (same batch index) |
243 | 30.6k | pipelines.emplace_back(state.CreateChildPipeline(executor, current, op)); |
244 | 30.6k | auto &child_pipeline = *pipelines.back(); |
245 | 30.6k | child_pipeline.base_batch_index = current.base_batch_index; |
246 | | |
247 | | // child pipeline has a dependency (within this MetaPipeline on all pipelines that were scheduled |
248 | | // between 'current' and now (including 'current') - set them up |
249 | 30.6k | pipeline_dependencies[child_pipeline].push_back(current); |
250 | 30.6k | AddDependenciesFrom(child_pipeline, last_pipeline, false); |
251 | 30.6k | D_ASSERT(pipeline_dependencies.find(child_pipeline) != pipeline_dependencies.end()); |
252 | 30.6k | } |
253 | | |
254 | | } // namespace duckdb |