Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/write_controller.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/write_controller.h"
7
8
#include <algorithm>
9
#include <atomic>
10
#include <cassert>
11
#include <ratio>
12
13
#include "rocksdb/system_clock.h"
14
15
namespace ROCKSDB_NAMESPACE {
16
17
0
std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() {
18
0
  ++total_stopped_;
19
0
  return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this));
20
0
}
21
22
std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken(
23
31
    uint64_t write_rate) {
24
31
  if (0 == total_delayed_++) {
25
    // Starting delay, so reset counters.
26
31
    next_refill_time_ = 0;
27
31
    credit_in_bytes_ = 0;
28
31
  }
29
  // NOTE: for simplicity, any current credit_in_bytes_ or "debt" in
30
  // next_refill_time_ will be based on an old rate. This rate will apply
31
  // for subsequent additional debts and for the next refill.
32
31
  set_delayed_write_rate(write_rate);
33
31
  return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this));
34
31
}
35
36
std::unique_ptr<WriteControllerToken>
37
195
WriteController::GetCompactionPressureToken() {
38
195
  ++total_compaction_pressure_;
39
195
  return std::unique_ptr<WriteControllerToken>(
40
195
      new CompactionPressureToken(this));
41
195
}
42
43
1.20M
bool WriteController::IsStopped() const {
44
1.20M
  return total_stopped_.load(std::memory_order_relaxed) > 0;
45
1.20M
}
46
// This is inside DB mutex, so we can't sleep and need to minimize
47
// frequency to get time.
48
// If it turns out to be a performance issue, we can redesign the thread
49
// synchronization model here.
50
// The function trust caller will sleep micros returned.
51
31
uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
52
31
  if (total_stopped_.load(std::memory_order_relaxed) > 0) {
53
0
    return 0;
54
0
  }
55
31
  if (total_delayed_.load(std::memory_order_relaxed) == 0) {
56
0
    return 0;
57
0
  }
58
59
31
  if (credit_in_bytes_ >= num_bytes) {
60
29
    credit_in_bytes_ -= num_bytes;
61
29
    return 0;
62
29
  }
63
  // The frequency to get time inside DB mutex is less than one per refill
64
  // interval.
65
2
  auto time_now = NowMicrosMonotonic(clock);
66
67
2
  const uint64_t kMicrosPerSecond = 1000000;
68
  // Refill every 1 ms
69
2
  const uint64_t kMicrosPerRefill = 1000;
70
71
2
  if (next_refill_time_ == 0) {
72
    // Start with an initial allotment of bytes for one interval
73
2
    next_refill_time_ = time_now;
74
2
  }
75
2
  if (next_refill_time_ <= time_now) {
76
    // Refill based on time interval plus any extra elapsed
77
2
    uint64_t elapsed = time_now - next_refill_time_ + kMicrosPerRefill;
78
2
    credit_in_bytes_ += static_cast<uint64_t>(
79
2
        1.0 * elapsed / kMicrosPerSecond * delayed_write_rate_ + 0.999999);
80
2
    next_refill_time_ = time_now + kMicrosPerRefill;
81
82
2
    if (credit_in_bytes_ >= num_bytes) {
83
      // Avoid delay if possible, to reduce DB mutex release & re-aquire.
84
2
      credit_in_bytes_ -= num_bytes;
85
2
      return 0;
86
2
    }
87
2
  }
88
89
  // We need to delay to avoid exceeding write rate.
90
2
  assert(num_bytes > credit_in_bytes_);
91
0
  uint64_t bytes_over_budget = num_bytes - credit_in_bytes_;
92
0
  uint64_t needed_delay = static_cast<uint64_t>(
93
0
      1.0 * bytes_over_budget / delayed_write_rate_ * kMicrosPerSecond);
94
95
0
  credit_in_bytes_ = 0;
96
0
  next_refill_time_ += needed_delay;
97
98
  // Minimum delay of refill interval, to reduce DB mutex contention.
99
0
  return std::max(next_refill_time_ - time_now, kMicrosPerRefill);
100
2
}
101
102
2
uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
103
2
  return clock->NowNanos() / std::milli::den;
104
2
}
105
106
0
StopWriteToken::~StopWriteToken() {
107
0
  assert(controller_->total_stopped_ >= 1);
108
0
  --controller_->total_stopped_;
109
0
}
110
111
31
DelayWriteToken::~DelayWriteToken() {
112
31
  controller_->total_delayed_--;
113
31
  assert(controller_->total_delayed_.load() >= 0);
114
31
}
115
116
195
CompactionPressureToken::~CompactionPressureToken() {
117
195
  controller_->total_compaction_pressure_--;
118
  assert(controller_->total_compaction_pressure_ >= 0);
119
195
}
120
121
}  // namespace ROCKSDB_NAMESPACE