/src/rocksdb/db/periodic_task_scheduler.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | |
7 | | #include "db/periodic_task_scheduler.h" |
8 | | |
9 | | #include "rocksdb/system_clock.h" |
10 | | #include "test_util/sync_point.h" |
11 | | |
12 | | namespace ROCKSDB_NAMESPACE { |
13 | | |
14 | | // `timer_mutex` is a global mutex serves 3 purposes currently: |
15 | | // (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as |
16 | | // they are currently not implemented in a thread-safe way; and |
17 | | // (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and |
18 | | // the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically. |
19 | | // (3) protect tasks_map_ in PeriodicTaskScheduler |
20 | | // Note: It's not efficient to have a static global mutex, for |
21 | | // PeriodicTaskScheduler it should be okay, as the operations are called |
22 | | // infrequently. |
23 | | static port::Mutex timer_mutex; |
24 | | |
25 | | static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = { |
26 | | {PeriodicTaskType::kDumpStats, kInvalidPeriodSec}, |
27 | | {PeriodicTaskType::kPersistStats, kInvalidPeriodSec}, |
28 | | {PeriodicTaskType::kFlushInfoLog, 10}, |
29 | | {PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec}, |
30 | | {PeriodicTaskType::kTriggerCompaction, kInvalidPeriodSec}, |
31 | | }; |
32 | | |
33 | | static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = { |
34 | | {PeriodicTaskType::kDumpStats, "dump_st"}, |
35 | | {PeriodicTaskType::kPersistStats, "pst_st"}, |
36 | | {PeriodicTaskType::kFlushInfoLog, "flush_info_log"}, |
37 | | {PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"}, |
38 | | {PeriodicTaskType::kTriggerCompaction, "trigger_compaction"}, |
39 | | }; |
40 | | |
41 | | Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type, |
42 | | const PeriodicTaskFunc& fn, |
43 | 49.7k | bool run_immediately) { |
44 | 49.7k | return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type), |
45 | 49.7k | run_immediately); |
46 | 49.7k | } |
47 | | |
48 | | Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type, |
49 | | const PeriodicTaskFunc& fn, |
50 | | uint64_t repeat_period_seconds, |
51 | 198k | bool run_immediately) { |
52 | 198k | MutexLock l(&timer_mutex); |
53 | 198k | static std::atomic<uint64_t> initial_delay(0); |
54 | | |
55 | 198k | if (repeat_period_seconds == kInvalidPeriodSec) { |
56 | 0 | return Status::InvalidArgument("Invalid task repeat period"); |
57 | 0 | } |
58 | 198k | auto it = tasks_map_.find(task_type); |
59 | 198k | if (it != tasks_map_.end()) { |
60 | | // the task already exists and it's the same, no update needed |
61 | 0 | if (it->second.repeat_every_sec == repeat_period_seconds) { |
62 | 0 | return Status::OK(); |
63 | 0 | } |
64 | | // cancel the existing one before register new one |
65 | 0 | timer_->Cancel(it->second.name); |
66 | 0 | tasks_map_.erase(it); |
67 | 0 | } |
68 | | |
69 | 198k | timer_->Start(); |
70 | | // put task type name as prefix, for easy debug |
71 | 198k | std::string unique_id = |
72 | 198k | kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++); |
73 | | |
74 | 198k | uint64_t initial_delay_micros = |
75 | 198k | (initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond; |
76 | 198k | if (!run_immediately) { |
77 | 49.7k | initial_delay_micros += repeat_period_seconds * kMicrosInSecond; |
78 | 49.7k | } |
79 | 198k | bool succeeded = timer_->Add(fn, unique_id, initial_delay_micros, |
80 | 198k | repeat_period_seconds * kMicrosInSecond); |
81 | 198k | if (!succeeded) { |
82 | 0 | return Status::Aborted("Failed to register periodic task"); |
83 | 0 | } |
84 | 198k | auto result = tasks_map_.try_emplace( |
85 | 198k | task_type, TaskInfo{unique_id, repeat_period_seconds}); |
86 | 198k | if (!result.second) { |
87 | 0 | return Status::Aborted("Failed to add periodic task"); |
88 | 0 | } |
89 | | #ifndef NDEBUG |
90 | | { |
91 | | std::pair<PeriodicTaskType, uint64_t> task_info{task_type, |
92 | | repeat_period_seconds}; |
93 | | TEST_SYNC_POINT_CALLBACK("PeriodicTaskScheduler::Register:TaskRegistered", |
94 | | &task_info); |
95 | | } |
96 | | #endif // NDEBUG |
97 | 198k | return Status::OK(); |
98 | 198k | } |
99 | | |
100 | 298k | Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) { |
101 | 298k | MutexLock l(&timer_mutex); |
102 | 298k | auto it = tasks_map_.find(task_type); |
103 | 298k | if (it != tasks_map_.end()) { |
104 | 198k | timer_->Cancel(it->second.name); |
105 | 198k | tasks_map_.erase(it); |
106 | 198k | } |
107 | 298k | if (!timer_->HasPendingTask()) { |
108 | 49.7k | timer_->Shutdown(); |
109 | 49.7k | } |
110 | 298k | return Status::OK(); |
111 | 298k | } |
112 | | |
113 | 49.7k | Timer* PeriodicTaskScheduler::Default() { |
114 | 49.7k | STATIC_AVOID_DESTRUCTION(Timer, timer)(SystemClock::Default().get()); |
115 | 49.7k | return &timer; |
116 | 49.7k | } |
117 | | |
118 | | #ifndef NDEBUG |
119 | | void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) { |
120 | | static Timer test_timer(clock); |
121 | | test_timer.TEST_OverrideTimer(clock); |
122 | | MutexLock l(&timer_mutex); |
123 | | timer_ = &test_timer; |
124 | | } |
125 | | #endif // NDEBUG |
126 | | |
127 | | } // namespace ROCKSDB_NAMESPACE |