Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/periodic_task_scheduler.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
7
#include "db/periodic_task_scheduler.h"
8
9
#include "rocksdb/system_clock.h"
10
#include "test_util/sync_point.h"
11
12
namespace ROCKSDB_NAMESPACE {
13
14
// `timer_mutex` is a global mutex serves 3 purposes currently:
15
// (1) to ensure calls to `Start()` and `Shutdown()` are serialized, as
16
//     they are currently not implemented in a thread-safe way; and
17
// (2) to ensure the `Timer::Add()`s and `Timer::Start()` run atomically, and
18
//     the `Timer::Cancel()`s and `Timer::Shutdown()` run atomically.
19
// (3) protect tasks_map_ in PeriodicTaskScheduler
20
// Note: It's not efficient to have a static global mutex, for
21
// PeriodicTaskScheduler it should be okay, as the operations are called
22
// infrequently.
23
static port::Mutex timer_mutex;
24
25
static const std::map<PeriodicTaskType, uint64_t> kDefaultPeriodSeconds = {
26
    {PeriodicTaskType::kDumpStats, kInvalidPeriodSec},
27
    {PeriodicTaskType::kPersistStats, kInvalidPeriodSec},
28
    {PeriodicTaskType::kFlushInfoLog, 10},
29
    {PeriodicTaskType::kRecordSeqnoTime, kInvalidPeriodSec},
30
    {PeriodicTaskType::kTriggerCompaction, kInvalidPeriodSec},
31
};
32
33
static const std::map<PeriodicTaskType, std::string> kPeriodicTaskTypeNames = {
34
    {PeriodicTaskType::kDumpStats, "dump_st"},
35
    {PeriodicTaskType::kPersistStats, "pst_st"},
36
    {PeriodicTaskType::kFlushInfoLog, "flush_info_log"},
37
    {PeriodicTaskType::kRecordSeqnoTime, "record_seq_time"},
38
    {PeriodicTaskType::kTriggerCompaction, "trigger_compaction"},
39
};
40
41
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
42
                                       const PeriodicTaskFunc& fn,
43
49.7k
                                       bool run_immediately) {
44
49.7k
  return Register(task_type, fn, kDefaultPeriodSeconds.at(task_type),
45
49.7k
                  run_immediately);
46
49.7k
}
47
48
Status PeriodicTaskScheduler::Register(PeriodicTaskType task_type,
49
                                       const PeriodicTaskFunc& fn,
50
                                       uint64_t repeat_period_seconds,
51
198k
                                       bool run_immediately) {
52
198k
  MutexLock l(&timer_mutex);
53
198k
  static std::atomic<uint64_t> initial_delay(0);
54
55
198k
  if (repeat_period_seconds == kInvalidPeriodSec) {
56
0
    return Status::InvalidArgument("Invalid task repeat period");
57
0
  }
58
198k
  auto it = tasks_map_.find(task_type);
59
198k
  if (it != tasks_map_.end()) {
60
    // the task already exists and it's the same, no update needed
61
0
    if (it->second.repeat_every_sec == repeat_period_seconds) {
62
0
      return Status::OK();
63
0
    }
64
    // cancel the existing one before register new one
65
0
    timer_->Cancel(it->second.name);
66
0
    tasks_map_.erase(it);
67
0
  }
68
69
198k
  timer_->Start();
70
  // put task type name as prefix, for easy debug
71
198k
  std::string unique_id =
72
198k
      kPeriodicTaskTypeNames.at(task_type) + std::to_string(id_++);
73
74
198k
  uint64_t initial_delay_micros =
75
198k
      (initial_delay.fetch_add(1) % repeat_period_seconds) * kMicrosInSecond;
76
198k
  if (!run_immediately) {
77
49.7k
    initial_delay_micros += repeat_period_seconds * kMicrosInSecond;
78
49.7k
  }
79
198k
  bool succeeded = timer_->Add(fn, unique_id, initial_delay_micros,
80
198k
                               repeat_period_seconds * kMicrosInSecond);
81
198k
  if (!succeeded) {
82
0
    return Status::Aborted("Failed to register periodic task");
83
0
  }
84
198k
  auto result = tasks_map_.try_emplace(
85
198k
      task_type, TaskInfo{unique_id, repeat_period_seconds});
86
198k
  if (!result.second) {
87
0
    return Status::Aborted("Failed to add periodic task");
88
0
  }
89
#ifndef NDEBUG
90
  {
91
    std::pair<PeriodicTaskType, uint64_t> task_info{task_type,
92
                                                    repeat_period_seconds};
93
    TEST_SYNC_POINT_CALLBACK("PeriodicTaskScheduler::Register:TaskRegistered",
94
                             &task_info);
95
  }
96
#endif  // NDEBUG
97
198k
  return Status::OK();
98
198k
}
99
100
298k
Status PeriodicTaskScheduler::Unregister(PeriodicTaskType task_type) {
101
298k
  MutexLock l(&timer_mutex);
102
298k
  auto it = tasks_map_.find(task_type);
103
298k
  if (it != tasks_map_.end()) {
104
198k
    timer_->Cancel(it->second.name);
105
198k
    tasks_map_.erase(it);
106
198k
  }
107
298k
  if (!timer_->HasPendingTask()) {
108
49.7k
    timer_->Shutdown();
109
49.7k
  }
110
298k
  return Status::OK();
111
298k
}
112
113
49.7k
Timer* PeriodicTaskScheduler::Default() {
114
49.7k
  STATIC_AVOID_DESTRUCTION(Timer, timer)(SystemClock::Default().get());
115
49.7k
  return &timer;
116
49.7k
}
117
118
#ifndef NDEBUG
119
void PeriodicTaskScheduler::TEST_OverrideTimer(SystemClock* clock) {
120
  static Timer test_timer(clock);
121
  test_timer.TEST_OverrideTimer(clock);
122
  MutexLock l(&timer_mutex);
123
  timer_ = &test_timer;
124
}
125
#endif  // NDEBUG
126
127
}  // namespace ROCKSDB_NAMESPACE