Coverage Report

Created: 2025-12-16 09:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/duckdb/src/parallel/event.cpp
Line
Count
Source
1
#include "duckdb/parallel/event.hpp"
2
#include "duckdb/common/assert.hpp"
3
#include "duckdb/common/exception.hpp"
4
#include "duckdb/parallel/task_scheduler.hpp"
5
#include "duckdb/execution/executor.hpp"
6
7
namespace duckdb {
8
9
Event::Event(Executor &executor_p)
10
524k
    : executor(executor_p), finished_tasks(0), total_tasks(0), finished_dependencies(0), total_dependencies(0),
11
524k
      finished(false) {
12
524k
}
13
14
859k
void Event::CompleteDependency() {
15
859k
  idx_t current_finished = ++finished_dependencies;
16
859k
  D_ASSERT(current_finished <= total_dependencies);
17
859k
  if (current_finished == total_dependencies) {
18
    // all dependencies have been completed: schedule the event
19
416k
    D_ASSERT(total_tasks == 0);
20
416k
    Schedule();
21
416k
    if (total_tasks == 0) {
22
98.1k
      Finish();
23
98.1k
    }
24
416k
  }
25
859k
}
26
27
513k
void Event::Finish() {
28
513k
  D_ASSERT(!finished);
29
513k
  FinishEvent();
30
513k
  finished = true;
31
  // finished processing the pipeline, now we can schedule pipelines that depend on this pipeline
32
878k
  for (auto &parent_entry : parents) {
33
878k
    auto parent = parent_entry.lock();
34
878k
    if (!parent) { // LCOV_EXCL_START
35
75
      continue;
36
75
    } // LCOV_EXCL_STOP
37
    // mark a dependency as completed for each of the parents
38
878k
    parent->CompleteDependency();
39
878k
  }
40
513k
  FinalizeFinish();
41
513k
}
42
43
936k
void Event::AddDependency(Event &event) {
44
936k
  total_dependencies++;
45
936k
  event.parents.push_back(weak_ptr<Event>(shared_from_this()));
46
#ifdef DEBUG
47
  event.parents_raw.push_back(*this);
48
#endif
49
936k
}
50
51
0
const vector<reference<Event>> &Event::GetParentsVerification() const {
52
0
  D_ASSERT(parents.size() == parents_raw.size());
53
0
  return parents_raw;
54
0
}
55
56
487k
void Event::FinishTask() {
57
487k
  D_ASSERT(finished_tasks.load() < total_tasks.load());
58
487k
  idx_t current_tasks = total_tasks;
59
487k
  idx_t current_finished = ++finished_tasks;
60
487k
  D_ASSERT(current_finished <= current_tasks);
61
487k
  if (current_finished == current_tasks) {
62
415k
    Finish();
63
415k
  }
64
487k
}
65
66
0
ClientContext &Event::GetClientContext() {
67
0
  return executor.context;
68
0
}
69
70
9.18k
void Event::InsertEvent(shared_ptr<Event> replacement_event) {
71
9.18k
  replacement_event->parents = std::move(parents);
72
#ifdef DEBUG
73
  replacement_event->parents_raw = std::move(parents_raw);
74
#endif
75
9.18k
  replacement_event->AddDependency(*this);
76
9.18k
  executor.AddEvent(std::move(replacement_event));
77
9.18k
}
78
79
418k
void Event::SetTasks(vector<shared_ptr<Task>> tasks) {
80
418k
  auto &ts = TaskScheduler::GetScheduler(executor.context);
81
418k
  D_ASSERT(total_tasks == 0);
82
418k
  D_ASSERT(!tasks.empty());
83
418k
  this->total_tasks = tasks.size();
84
418k
  ts.ScheduleTasks(executor.GetToken(), tasks);
85
418k
}
86
87
} // namespace duckdb