/src/rocksdb/util/timer.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | |
7 | | #pragma once |
8 | | |
9 | | #include <functional> |
10 | | #include <memory> |
11 | | #include <queue> |
12 | | #include <unordered_map> |
13 | | #include <utility> |
14 | | #include <vector> |
15 | | |
16 | | #include "monitoring/instrumented_mutex.h" |
17 | | #include "rocksdb/system_clock.h" |
18 | | #include "test_util/sync_point.h" |
19 | | #include "util/mutexlock.h" |
20 | | |
21 | | namespace ROCKSDB_NAMESPACE { |
22 | | |
23 | | // A Timer class to handle repeated work. |
24 | | // |
25 | | // `Start()` and `Shutdown()` are currently not thread-safe. The client must |
26 | | // serialize calls to these two member functions. |
27 | | // |
28 | | // A single timer instance can handle multiple functions via a single thread. |
29 | | // It is better to leave long running work to a dedicated thread pool. |
30 | | // |
31 | | // Timer can be started by calling `Start()`, and ended by calling `Shutdown()`. |
32 | | // Work (in terms of a `void function`) can be scheduled by calling `Add` with |
33 | | // a unique function name and de-scheduled by calling `Cancel`. |
34 | | // Many functions can be added. |
35 | | // |
36 | | // Impl Details: |
37 | | // A heap is used to keep track of when the next timer goes off. |
38 | | // A map from a function name to the function keeps track of all the functions. |
39 | | class Timer { |
40 | | public: |
41 | | explicit Timer(SystemClock* clock) |
42 | | : clock_(clock), |
43 | | mutex_(clock), |
44 | | cond_var_(&mutex_), |
45 | | running_(false), |
46 | 1 | executing_task_(false) {} |
47 | | |
48 | 0 | ~Timer() { Shutdown(); } |
49 | | |
50 | | // Add a new function to run. |
51 | | // fn_name has to be identical, otherwise it will fail to add and return false |
52 | | // start_after_us is the initial delay. |
53 | | // repeat_every_us is the interval between ending time of the last call and |
54 | | // starting time of the next call. For example, repeat_every_us = 2000 and |
55 | | // the function takes 1000us to run. If it starts at time [now]us, then it |
56 | | // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us. |
57 | | // repeat_every_us == 0 means do not repeat. |
58 | | bool Add(std::function<void()> fn, const std::string& fn_name, |
59 | 41.9k | uint64_t start_after_us, uint64_t repeat_every_us) { |
60 | 41.9k | auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0, |
61 | 41.9k | repeat_every_us); |
62 | 41.9k | InstrumentedMutexLock l(&mutex_); |
63 | | // Assign time within mutex to make sure the next_run_time is larger than |
64 | | // the current running one |
65 | 41.9k | fn_info->next_run_time_us = clock_->NowMicros() + start_after_us; |
66 | | // the new task start time should never before the current task executing |
67 | | // time, as the executing task can only be running if it's next_run_time_us |
68 | | // is due (<= clock_->NowMicros()). |
69 | 41.9k | if (executing_task_ && |
70 | 41.9k | fn_info->next_run_time_us < heap_.top()->next_run_time_us) { |
71 | 0 | return false; |
72 | 0 | } |
73 | 41.9k | auto it = map_.find(fn_name); |
74 | 41.9k | if (it == map_.end()) { |
75 | 41.9k | heap_.push(fn_info.get()); |
76 | 41.9k | map_.try_emplace(fn_name, std::move(fn_info)); |
77 | 41.9k | } else { |
78 | | // timer doesn't support duplicated function name |
79 | 0 | return false; |
80 | 0 | } |
81 | 41.9k | cond_var_.SignalAll(); |
82 | 41.9k | return true; |
83 | 41.9k | } |
84 | | |
85 | 41.9k | void Cancel(const std::string& fn_name) { |
86 | 41.9k | InstrumentedMutexLock l(&mutex_); |
87 | | |
88 | | // Mark the function with fn_name as invalid so that it will not be |
89 | | // requeued. |
90 | 41.9k | auto it = map_.find(fn_name); |
91 | 41.9k | if (it != map_.end() && it->second) { |
92 | 41.9k | it->second->Cancel(); |
93 | 41.9k | } |
94 | | |
95 | | // If the currently running function is fn_name, then we need to wait |
96 | | // until it finishes before returning to caller. |
97 | 42.0k | while (!heap_.empty() && executing_task_) { |
98 | 39 | FunctionInfo* func_info = heap_.top(); |
99 | 39 | assert(func_info); |
100 | 39 | if (func_info->name == fn_name) { |
101 | 10 | WaitForTaskCompleteIfNecessary(); |
102 | 29 | } else { |
103 | 29 | break; |
104 | 29 | } |
105 | 39 | } |
106 | 41.9k | } |
107 | | |
108 | 0 | void CancelAll() { |
109 | 0 | InstrumentedMutexLock l(&mutex_); |
110 | 0 | CancelAllWithLock(); |
111 | 0 | } |
112 | | |
113 | | // Start the Timer |
114 | 41.9k | bool Start() { |
115 | 41.9k | InstrumentedMutexLock l(&mutex_); |
116 | 41.9k | if (running_) { |
117 | 27.9k | return false; |
118 | 27.9k | } |
119 | | |
120 | 13.9k | running_ = true; |
121 | 13.9k | thread_ = std::make_unique<port::Thread>(&Timer::Run, this); |
122 | 13.9k | return true; |
123 | 41.9k | } |
124 | | |
125 | | // Shutdown the Timer |
126 | 27.9k | bool Shutdown() { |
127 | 27.9k | { |
128 | 27.9k | InstrumentedMutexLock l(&mutex_); |
129 | 27.9k | if (!running_) { |
130 | 13.9k | return false; |
131 | 13.9k | } |
132 | 13.9k | running_ = false; |
133 | 13.9k | CancelAllWithLock(); |
134 | 13.9k | cond_var_.SignalAll(); |
135 | 13.9k | } |
136 | | |
137 | 13.9k | if (thread_) { |
138 | 13.9k | thread_->join(); |
139 | 13.9k | } |
140 | 13.9k | return true; |
141 | 27.9k | } |
142 | | |
143 | 69.9k | bool HasPendingTask() const { |
144 | 69.9k | InstrumentedMutexLock l(&mutex_); |
145 | 86.0k | for (const auto& fn_info : map_) { |
146 | 86.0k | if (fn_info.second->IsValid()) { |
147 | 41.9k | return true; |
148 | 41.9k | } |
149 | 86.0k | } |
150 | 27.9k | return false; |
151 | 69.9k | } |
152 | | |
153 | | #ifndef NDEBUG |
154 | | // Wait until Timer starting waiting, call the optional callback, then wait |
155 | | // for Timer waiting again. |
156 | | // Tests can provide a custom Clock object to mock time, and use the callback |
157 | | // here to bump current time and trigger Timer. See timer_test for example. |
158 | | // |
159 | | // Note: only support one caller of this method. |
160 | | void TEST_WaitForRun(const std::function<void()>& callback = nullptr) { |
161 | | InstrumentedMutexLock l(&mutex_); |
162 | | // It act as a spin lock |
163 | | while (executing_task_ || |
164 | | (!heap_.empty() && |
165 | | heap_.top()->next_run_time_us <= clock_->NowMicros())) { |
166 | | cond_var_.TimedWait(clock_->NowMicros() + 1000); |
167 | | } |
168 | | if (callback != nullptr) { |
169 | | callback(); |
170 | | } |
171 | | cond_var_.SignalAll(); |
172 | | do { |
173 | | cond_var_.TimedWait(clock_->NowMicros() + 1000); |
174 | | } while (executing_task_ || |
175 | | (!heap_.empty() && |
176 | | heap_.top()->next_run_time_us <= clock_->NowMicros())); |
177 | | } |
178 | | |
179 | | size_t TEST_GetPendingTaskNum() const { |
180 | | InstrumentedMutexLock l(&mutex_); |
181 | | size_t ret = 0; |
182 | | for (const auto& fn_info : map_) { |
183 | | if (fn_info.second->IsValid()) { |
184 | | ret++; |
185 | | } |
186 | | } |
187 | | return ret; |
188 | | } |
189 | | |
190 | | void TEST_OverrideTimer(SystemClock* clock) { |
191 | | InstrumentedMutexLock l(&mutex_); |
192 | | clock_ = clock; |
193 | | } |
194 | | #endif // NDEBUG |
195 | | |
196 | | private: |
197 | 13.9k | void Run() { |
198 | 13.9k | InstrumentedMutexLock l(&mutex_); |
199 | | |
200 | 28.8k | while (running_) { |
201 | 14.8k | if (heap_.empty()) { |
202 | | // wait |
203 | 75 | TEST_SYNC_POINT("Timer::Run::Waiting"); |
204 | 75 | cond_var_.Wait(); |
205 | 75 | continue; |
206 | 75 | } |
207 | | |
208 | 14.7k | FunctionInfo* current_fn = heap_.top(); |
209 | 14.7k | assert(current_fn); |
210 | | |
211 | 14.7k | if (!current_fn->IsValid()) { |
212 | 219 | heap_.pop(); |
213 | 219 | map_.erase(current_fn->name); |
214 | 219 | continue; |
215 | 219 | } |
216 | | |
217 | 14.5k | if (current_fn->next_run_time_us <= clock_->NowMicros()) { |
218 | | // make a copy of the function so it won't be changed after |
219 | | // mutex_.unlock. |
220 | 1.38k | std::function<void()> fn = current_fn->fn; |
221 | 1.38k | executing_task_ = true; |
222 | 1.38k | mutex_.Unlock(); |
223 | | // Execute the work |
224 | 1.38k | fn(); |
225 | 1.38k | mutex_.Lock(); |
226 | 1.38k | executing_task_ = false; |
227 | 1.38k | cond_var_.SignalAll(); |
228 | | |
229 | | // Remove the work from the heap once it is done executing, make sure |
230 | | // it's the same function after executing the work while mutex is |
231 | | // released. |
232 | | // Note that we are just removing the pointer from the heap. Its |
233 | | // memory is still managed in the map (as it holds a unique ptr). |
234 | | // So current_fn is still a valid ptr. |
235 | 1.38k | assert(heap_.top() == current_fn); |
236 | 1.38k | heap_.pop(); |
237 | | |
238 | | // current_fn may be cancelled already. |
239 | 1.38k | if (current_fn->IsValid() && current_fn->repeat_every_us > 0) { |
240 | 1.37k | assert(running_); |
241 | 1.37k | current_fn->next_run_time_us = |
242 | 1.37k | clock_->NowMicros() + current_fn->repeat_every_us; |
243 | | |
244 | | // Schedule new work into the heap with new time. |
245 | 1.37k | heap_.push(current_fn); |
246 | 1.37k | } else { |
247 | | // if current_fn is cancelled or no need to repeat, remove it from the |
248 | | // map to avoid leak. |
249 | 10 | map_.erase(current_fn->name); |
250 | 10 | } |
251 | 13.1k | } else { |
252 | 13.1k | cond_var_.TimedWait(current_fn->next_run_time_us); |
253 | 13.1k | } |
254 | 14.5k | } |
255 | 13.9k | } |
256 | | |
257 | 13.9k | void CancelAllWithLock() { |
258 | 13.9k | mutex_.AssertHeld(); |
259 | 13.9k | if (map_.empty() && heap_.empty()) { |
260 | 70 | return; |
261 | 70 | } |
262 | | |
263 | | // With mutex_ held, set all tasks to invalid so that they will not be |
264 | | // re-queued. |
265 | 41.7k | for (auto& elem : map_) { |
266 | 41.7k | auto& func_info = elem.second; |
267 | 41.7k | assert(func_info); |
268 | 41.7k | func_info->Cancel(); |
269 | 41.7k | } |
270 | | |
271 | | // WaitForTaskCompleteIfNecessary() may release mutex_ |
272 | 13.9k | WaitForTaskCompleteIfNecessary(); |
273 | | |
274 | 55.6k | while (!heap_.empty()) { |
275 | 41.7k | heap_.pop(); |
276 | 41.7k | } |
277 | 13.9k | map_.clear(); |
278 | 13.9k | } |
279 | | |
280 | | // A wrapper around std::function to keep track when it should run next |
281 | | // and at what frequency. |
282 | | struct FunctionInfo { |
283 | | // the actual work |
284 | | std::function<void()> fn; |
285 | | // name of the function |
286 | | std::string name; |
287 | | // when the function should run next |
288 | | uint64_t next_run_time_us; |
289 | | // repeat interval |
290 | | uint64_t repeat_every_us; |
291 | | // controls whether this function is valid. |
292 | | // A function is valid upon construction and until someone explicitly |
293 | | // calls `Cancel()`. |
294 | | bool valid; |
295 | | |
296 | | FunctionInfo(std::function<void()>&& _fn, std::string _name, |
297 | | const uint64_t _next_run_time_us, uint64_t _repeat_every_us) |
298 | | : fn(std::move(_fn)), |
299 | | name(std::move(_name)), |
300 | | next_run_time_us(_next_run_time_us), |
301 | | repeat_every_us(_repeat_every_us), |
302 | 41.9k | valid(true) {} |
303 | | |
304 | 83.7k | void Cancel() { valid = false; } |
305 | | |
306 | 102k | bool IsValid() const { return valid; } |
307 | | }; |
308 | | |
309 | 13.9k | void WaitForTaskCompleteIfNecessary() { |
310 | 13.9k | mutex_.AssertHeld(); |
311 | 13.9k | while (executing_task_) { |
312 | 10 | TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting"); |
313 | 10 | cond_var_.Wait(); |
314 | 10 | } |
315 | 13.9k | } |
316 | | |
317 | | struct RunTimeOrder { |
318 | 45.0k | bool operator()(const FunctionInfo* f1, const FunctionInfo* f2) { |
319 | 45.0k | return f1->next_run_time_us > f2->next_run_time_us; |
320 | 45.0k | } |
321 | | }; |
322 | | |
323 | | SystemClock* clock_; |
324 | | // This mutex controls both the heap_ and the map_. It needs to be held for |
325 | | // making any changes in them. |
326 | | mutable InstrumentedMutex mutex_; |
327 | | InstrumentedCondVar cond_var_; |
328 | | std::unique_ptr<port::Thread> thread_; |
329 | | bool running_; |
330 | | bool executing_task_; |
331 | | |
332 | | std::priority_queue<FunctionInfo*, std::vector<FunctionInfo*>, RunTimeOrder> |
333 | | heap_; |
334 | | |
335 | | // In addition to providing a mapping from a function name to a function, |
336 | | // it is also responsible for memory management. |
337 | | std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_; |
338 | | }; |
339 | | |
340 | | } // namespace ROCKSDB_NAMESPACE |