Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/util/timer.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
7
#pragma once
8
9
#include <functional>
10
#include <memory>
11
#include <queue>
12
#include <unordered_map>
13
#include <utility>
14
#include <vector>
15
16
#include "monitoring/instrumented_mutex.h"
17
#include "rocksdb/system_clock.h"
18
#include "test_util/sync_point.h"
19
#include "util/mutexlock.h"
20
21
namespace ROCKSDB_NAMESPACE {
22
23
// A Timer class to handle repeated work.
24
//
25
// `Start()` and `Shutdown()` are currently not thread-safe. The client must
26
// serialize calls to these two member functions.
27
//
28
// A single timer instance can handle multiple functions via a single thread.
29
// It is better to leave long running work to a dedicated thread pool.
30
//
31
// Timer can be started by calling `Start()`, and ended by calling `Shutdown()`.
32
// Work (in terms of a `void function`) can be scheduled by calling `Add` with
33
// a unique function name and de-scheduled by calling `Cancel`.
34
// Many functions can be added.
35
//
36
// Impl Details:
37
// A heap is used to keep track of when the next timer goes off.
38
// A map from a function name to the function keeps track of all the functions.
39
class Timer {
40
 public:
41
  explicit Timer(SystemClock* clock)
42
      : clock_(clock),
43
        mutex_(clock),
44
        cond_var_(&mutex_),
45
        running_(false),
46
1
        executing_task_(false) {}
47
48
0
  ~Timer() { Shutdown(); }
49
50
  // Add a new function to run.
51
  // fn_name has to be identical, otherwise it will fail to add and return false
52
  // start_after_us is the initial delay.
53
  // repeat_every_us is the interval between ending time of the last call and
54
  // starting time of the next call. For example, repeat_every_us = 2000 and
55
  // the function takes 1000us to run. If it starts at time [now]us, then it
56
  // finishes at [now]+1000us, 2nd run starting time will be at [now]+3000us.
57
  // repeat_every_us == 0 means do not repeat.
58
  bool Add(std::function<void()> fn, const std::string& fn_name,
59
41.9k
           uint64_t start_after_us, uint64_t repeat_every_us) {
60
41.9k
    auto fn_info = std::make_unique<FunctionInfo>(std::move(fn), fn_name, 0,
61
41.9k
                                                  repeat_every_us);
62
41.9k
    InstrumentedMutexLock l(&mutex_);
63
    // Assign time within mutex to make sure the next_run_time is larger than
64
    // the current running one
65
41.9k
    fn_info->next_run_time_us = clock_->NowMicros() + start_after_us;
66
    // the new task start time should never before the current task executing
67
    // time, as the executing task can only be running if it's next_run_time_us
68
    // is due (<= clock_->NowMicros()).
69
41.9k
    if (executing_task_ &&
70
41.9k
        fn_info->next_run_time_us < heap_.top()->next_run_time_us) {
71
0
      return false;
72
0
    }
73
41.9k
    auto it = map_.find(fn_name);
74
41.9k
    if (it == map_.end()) {
75
41.9k
      heap_.push(fn_info.get());
76
41.9k
      map_.try_emplace(fn_name, std::move(fn_info));
77
41.9k
    } else {
78
      // timer doesn't support duplicated function name
79
0
      return false;
80
0
    }
81
41.9k
    cond_var_.SignalAll();
82
41.9k
    return true;
83
41.9k
  }
84
85
41.9k
  void Cancel(const std::string& fn_name) {
86
41.9k
    InstrumentedMutexLock l(&mutex_);
87
88
    // Mark the function with fn_name as invalid so that it will not be
89
    // requeued.
90
41.9k
    auto it = map_.find(fn_name);
91
41.9k
    if (it != map_.end() && it->second) {
92
41.9k
      it->second->Cancel();
93
41.9k
    }
94
95
    // If the currently running function is fn_name, then we need to wait
96
    // until it finishes before returning to caller.
97
42.0k
    while (!heap_.empty() && executing_task_) {
98
39
      FunctionInfo* func_info = heap_.top();
99
39
      assert(func_info);
100
39
      if (func_info->name == fn_name) {
101
10
        WaitForTaskCompleteIfNecessary();
102
29
      } else {
103
29
        break;
104
29
      }
105
39
    }
106
41.9k
  }
107
108
0
  void CancelAll() {
109
0
    InstrumentedMutexLock l(&mutex_);
110
0
    CancelAllWithLock();
111
0
  }
112
113
  // Start the Timer
114
41.9k
  bool Start() {
115
41.9k
    InstrumentedMutexLock l(&mutex_);
116
41.9k
    if (running_) {
117
27.9k
      return false;
118
27.9k
    }
119
120
13.9k
    running_ = true;
121
13.9k
    thread_ = std::make_unique<port::Thread>(&Timer::Run, this);
122
13.9k
    return true;
123
41.9k
  }
124
125
  // Shutdown the Timer
126
27.9k
  bool Shutdown() {
127
27.9k
    {
128
27.9k
      InstrumentedMutexLock l(&mutex_);
129
27.9k
      if (!running_) {
130
13.9k
        return false;
131
13.9k
      }
132
13.9k
      running_ = false;
133
13.9k
      CancelAllWithLock();
134
13.9k
      cond_var_.SignalAll();
135
13.9k
    }
136
137
13.9k
    if (thread_) {
138
13.9k
      thread_->join();
139
13.9k
    }
140
13.9k
    return true;
141
27.9k
  }
142
143
69.9k
  bool HasPendingTask() const {
144
69.9k
    InstrumentedMutexLock l(&mutex_);
145
86.0k
    for (const auto& fn_info : map_) {
146
86.0k
      if (fn_info.second->IsValid()) {
147
41.9k
        return true;
148
41.9k
      }
149
86.0k
    }
150
27.9k
    return false;
151
69.9k
  }
152
153
#ifndef NDEBUG
154
  // Wait until Timer starting waiting, call the optional callback, then wait
155
  // for Timer waiting again.
156
  // Tests can provide a custom Clock object to mock time, and use the callback
157
  // here to bump current time and trigger Timer. See timer_test for example.
158
  //
159
  // Note: only support one caller of this method.
160
  void TEST_WaitForRun(const std::function<void()>& callback = nullptr) {
161
    InstrumentedMutexLock l(&mutex_);
162
    // It act as a spin lock
163
    while (executing_task_ ||
164
           (!heap_.empty() &&
165
            heap_.top()->next_run_time_us <= clock_->NowMicros())) {
166
      cond_var_.TimedWait(clock_->NowMicros() + 1000);
167
    }
168
    if (callback != nullptr) {
169
      callback();
170
    }
171
    cond_var_.SignalAll();
172
    do {
173
      cond_var_.TimedWait(clock_->NowMicros() + 1000);
174
    } while (executing_task_ ||
175
             (!heap_.empty() &&
176
              heap_.top()->next_run_time_us <= clock_->NowMicros()));
177
  }
178
179
  size_t TEST_GetPendingTaskNum() const {
180
    InstrumentedMutexLock l(&mutex_);
181
    size_t ret = 0;
182
    for (const auto& fn_info : map_) {
183
      if (fn_info.second->IsValid()) {
184
        ret++;
185
      }
186
    }
187
    return ret;
188
  }
189
190
  void TEST_OverrideTimer(SystemClock* clock) {
191
    InstrumentedMutexLock l(&mutex_);
192
    clock_ = clock;
193
  }
194
#endif  // NDEBUG
195
196
 private:
197
13.9k
  void Run() {
198
13.9k
    InstrumentedMutexLock l(&mutex_);
199
200
28.8k
    while (running_) {
201
14.8k
      if (heap_.empty()) {
202
        // wait
203
75
        TEST_SYNC_POINT("Timer::Run::Waiting");
204
75
        cond_var_.Wait();
205
75
        continue;
206
75
      }
207
208
14.7k
      FunctionInfo* current_fn = heap_.top();
209
14.7k
      assert(current_fn);
210
211
14.7k
      if (!current_fn->IsValid()) {
212
219
        heap_.pop();
213
219
        map_.erase(current_fn->name);
214
219
        continue;
215
219
      }
216
217
14.5k
      if (current_fn->next_run_time_us <= clock_->NowMicros()) {
218
        // make a copy of the function so it won't be changed after
219
        // mutex_.unlock.
220
1.38k
        std::function<void()> fn = current_fn->fn;
221
1.38k
        executing_task_ = true;
222
1.38k
        mutex_.Unlock();
223
        // Execute the work
224
1.38k
        fn();
225
1.38k
        mutex_.Lock();
226
1.38k
        executing_task_ = false;
227
1.38k
        cond_var_.SignalAll();
228
229
        // Remove the work from the heap once it is done executing, make sure
230
        // it's the same function after executing the work while mutex is
231
        // released.
232
        // Note that we are just removing the pointer from the heap. Its
233
        // memory is still managed in the map (as it holds a unique ptr).
234
        // So current_fn is still a valid ptr.
235
1.38k
        assert(heap_.top() == current_fn);
236
1.38k
        heap_.pop();
237
238
        // current_fn may be cancelled already.
239
1.38k
        if (current_fn->IsValid() && current_fn->repeat_every_us > 0) {
240
1.37k
          assert(running_);
241
1.37k
          current_fn->next_run_time_us =
242
1.37k
              clock_->NowMicros() + current_fn->repeat_every_us;
243
244
          // Schedule new work into the heap with new time.
245
1.37k
          heap_.push(current_fn);
246
1.37k
        } else {
247
          // if current_fn is cancelled or no need to repeat, remove it from the
248
          // map to avoid leak.
249
10
          map_.erase(current_fn->name);
250
10
        }
251
13.1k
      } else {
252
13.1k
        cond_var_.TimedWait(current_fn->next_run_time_us);
253
13.1k
      }
254
14.5k
    }
255
13.9k
  }
256
257
13.9k
  void CancelAllWithLock() {
258
13.9k
    mutex_.AssertHeld();
259
13.9k
    if (map_.empty() && heap_.empty()) {
260
70
      return;
261
70
    }
262
263
    // With mutex_ held, set all tasks to invalid so that they will not be
264
    // re-queued.
265
41.7k
    for (auto& elem : map_) {
266
41.7k
      auto& func_info = elem.second;
267
41.7k
      assert(func_info);
268
41.7k
      func_info->Cancel();
269
41.7k
    }
270
271
    // WaitForTaskCompleteIfNecessary() may release mutex_
272
13.9k
    WaitForTaskCompleteIfNecessary();
273
274
55.6k
    while (!heap_.empty()) {
275
41.7k
      heap_.pop();
276
41.7k
    }
277
13.9k
    map_.clear();
278
13.9k
  }
279
280
  // A wrapper around std::function to keep track when it should run next
281
  // and at what frequency.
282
  struct FunctionInfo {
283
    // the actual work
284
    std::function<void()> fn;
285
    // name of the function
286
    std::string name;
287
    // when the function should run next
288
    uint64_t next_run_time_us;
289
    // repeat interval
290
    uint64_t repeat_every_us;
291
    // controls whether this function is valid.
292
    // A function is valid upon construction and until someone explicitly
293
    // calls `Cancel()`.
294
    bool valid;
295
296
    FunctionInfo(std::function<void()>&& _fn, std::string _name,
297
                 const uint64_t _next_run_time_us, uint64_t _repeat_every_us)
298
        : fn(std::move(_fn)),
299
          name(std::move(_name)),
300
          next_run_time_us(_next_run_time_us),
301
          repeat_every_us(_repeat_every_us),
302
41.9k
          valid(true) {}
303
304
83.7k
    void Cancel() { valid = false; }
305
306
102k
    bool IsValid() const { return valid; }
307
  };
308
309
13.9k
  void WaitForTaskCompleteIfNecessary() {
310
13.9k
    mutex_.AssertHeld();
311
13.9k
    while (executing_task_) {
312
10
      TEST_SYNC_POINT("Timer::WaitForTaskCompleteIfNecessary:TaskExecuting");
313
10
      cond_var_.Wait();
314
10
    }
315
13.9k
  }
316
317
  struct RunTimeOrder {
318
45.0k
    bool operator()(const FunctionInfo* f1, const FunctionInfo* f2) {
319
45.0k
      return f1->next_run_time_us > f2->next_run_time_us;
320
45.0k
    }
321
  };
322
323
  SystemClock* clock_;
324
  // This mutex controls both the heap_ and the map_. It needs to be held for
325
  // making any changes in them.
326
  mutable InstrumentedMutex mutex_;
327
  InstrumentedCondVar cond_var_;
328
  std::unique_ptr<port::Thread> thread_;
329
  bool running_;
330
  bool executing_task_;
331
332
  std::priority_queue<FunctionInfo*, std::vector<FunctionInfo*>, RunTimeOrder>
333
      heap_;
334
335
  // In addition to providing a mapping from a function name to a function,
336
  // it is also responsible for memory management.
337
  std::unordered_map<std::string, std::unique_ptr<FunctionInfo>> map_;
338
};
339
340
}  // namespace ROCKSDB_NAMESPACE