/src/duckdb/src/parallel/event.cpp
Line | Count | Source |
1 | | #include "duckdb/parallel/event.hpp" |
2 | | #include "duckdb/common/assert.hpp" |
3 | | #include "duckdb/common/exception.hpp" |
4 | | #include "duckdb/parallel/task_scheduler.hpp" |
5 | | #include "duckdb/execution/executor.hpp" |
6 | | |
7 | | namespace duckdb { |
8 | | |
9 | | Event::Event(Executor &executor_p) |
10 | 524k | : executor(executor_p), finished_tasks(0), total_tasks(0), finished_dependencies(0), total_dependencies(0), |
11 | 524k | finished(false) { |
12 | 524k | } |
13 | | |
14 | 859k | void Event::CompleteDependency() { |
15 | 859k | idx_t current_finished = ++finished_dependencies; |
16 | 859k | D_ASSERT(current_finished <= total_dependencies); |
17 | 859k | if (current_finished == total_dependencies) { |
18 | | // all dependencies have been completed: schedule the event |
19 | 416k | D_ASSERT(total_tasks == 0); |
20 | 416k | Schedule(); |
21 | 416k | if (total_tasks == 0) { |
22 | 98.1k | Finish(); |
23 | 98.1k | } |
24 | 416k | } |
25 | 859k | } |
26 | | |
27 | 513k | void Event::Finish() { |
28 | 513k | D_ASSERT(!finished); |
29 | 513k | FinishEvent(); |
30 | 513k | finished = true; |
31 | | // finished processing the pipeline, now we can schedule pipelines that depend on this pipeline |
32 | 878k | for (auto &parent_entry : parents) { |
33 | 878k | auto parent = parent_entry.lock(); |
34 | 878k | if (!parent) { // LCOV_EXCL_START |
35 | 75 | continue; |
36 | 75 | } // LCOV_EXCL_STOP |
37 | | // mark a dependency as completed for each of the parents |
38 | 878k | parent->CompleteDependency(); |
39 | 878k | } |
40 | 513k | FinalizeFinish(); |
41 | 513k | } |
42 | | |
43 | 936k | void Event::AddDependency(Event &event) { |
44 | 936k | total_dependencies++; |
45 | 936k | event.parents.push_back(weak_ptr<Event>(shared_from_this())); |
46 | | #ifdef DEBUG |
47 | | event.parents_raw.push_back(*this); |
48 | | #endif |
49 | 936k | } |
50 | | |
51 | 0 | const vector<reference<Event>> &Event::GetParentsVerification() const { |
52 | 0 | D_ASSERT(parents.size() == parents_raw.size()); |
53 | 0 | return parents_raw; |
54 | 0 | } |
55 | | |
56 | 487k | void Event::FinishTask() { |
57 | 487k | D_ASSERT(finished_tasks.load() < total_tasks.load()); |
58 | 487k | idx_t current_tasks = total_tasks; |
59 | 487k | idx_t current_finished = ++finished_tasks; |
60 | 487k | D_ASSERT(current_finished <= current_tasks); |
61 | 487k | if (current_finished == current_tasks) { |
62 | 415k | Finish(); |
63 | 415k | } |
64 | 487k | } |
65 | | |
66 | 0 | ClientContext &Event::GetClientContext() { |
67 | 0 | return executor.context; |
68 | 0 | } |
69 | | |
70 | 9.18k | void Event::InsertEvent(shared_ptr<Event> replacement_event) { |
71 | 9.18k | replacement_event->parents = std::move(parents); |
72 | | #ifdef DEBUG |
73 | | replacement_event->parents_raw = std::move(parents_raw); |
74 | | #endif |
75 | 9.18k | replacement_event->AddDependency(*this); |
76 | 9.18k | executor.AddEvent(std::move(replacement_event)); |
77 | 9.18k | } |
78 | | |
79 | 418k | void Event::SetTasks(vector<shared_ptr<Task>> tasks) { |
80 | 418k | auto &ts = TaskScheduler::GetScheduler(executor.context); |
81 | 418k | D_ASSERT(total_tasks == 0); |
82 | 418k | D_ASSERT(!tasks.empty()); |
83 | 418k | this->total_tasks = tasks.size(); |
84 | 418k | ts.ScheduleTasks(executor.GetToken(), tasks); |
85 | 418k | } |
86 | | |
87 | | } // namespace duckdb |