Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/util/rate_limiter_impl.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
12
#include <algorithm>
13
#include <atomic>
14
#include <chrono>
15
#include <deque>
16
17
#include "port/port.h"
18
#include "rocksdb/env.h"
19
#include "rocksdb/rate_limiter.h"
20
#include "rocksdb/status.h"
21
#include "rocksdb/system_clock.h"
22
#include "util/mutexlock.h"
23
#include "util/random.h"
24
25
namespace ROCKSDB_NAMESPACE {
26
27
class GenericRateLimiter : public RateLimiter {
28
 public:
29
  GenericRateLimiter(int64_t refill_bytes, int64_t refill_period_us,
30
                     int32_t fairness, RateLimiter::Mode mode,
31
                     const std::shared_ptr<SystemClock>& clock, bool auto_tuned,
32
                     int64_t single_burst_bytes);
33
34
  virtual ~GenericRateLimiter();
35
36
  // This API allows user to dynamically change rate limiter's bytes per second.
37
  void SetBytesPerSecond(int64_t bytes_per_second) override;
38
39
  Status SetSingleBurstBytes(int64_t single_burst_bytes) override;
40
41
  // Request for token to write bytes. If this request can not be satisfied,
42
  // the call is blocked. Caller is responsible to make sure
43
  // bytes <= GetSingleBurstBytes() and bytes >= 0. Negative bytes
44
  // passed in will be rounded up to 0.
45
  using RateLimiter::Request;
46
  void Request(const int64_t bytes, const Env::IOPriority pri,
47
               Statistics* stats) override;
48
49
0
  int64_t GetSingleBurstBytes() const override {
50
0
    int64_t raw_single_burst_bytes =
51
0
        raw_single_burst_bytes_.load(std::memory_order_relaxed);
52
0
    if (raw_single_burst_bytes == 0) {
53
0
      return refill_bytes_per_period_.load(std::memory_order_relaxed);
54
0
    }
55
0
    return raw_single_burst_bytes;
56
0
  }
57
58
  int64_t GetTotalBytesThrough(
59
0
      const Env::IOPriority pri = Env::IO_TOTAL) const override {
60
0
    MutexLock g(&request_mutex_);
61
0
    if (pri == Env::IO_TOTAL) {
62
0
      int64_t total_bytes_through_sum = 0;
63
0
      for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
64
0
        total_bytes_through_sum += total_bytes_through_[i];
65
0
      }
66
0
      return total_bytes_through_sum;
67
0
    }
68
0
    return total_bytes_through_[pri];
69
0
  }
70
71
  int64_t GetTotalRequests(
72
0
      const Env::IOPriority pri = Env::IO_TOTAL) const override {
73
0
    MutexLock g(&request_mutex_);
74
0
    if (pri == Env::IO_TOTAL) {
75
0
      int64_t total_requests_sum = 0;
76
0
      for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
77
0
        total_requests_sum += total_requests_[i];
78
0
      }
79
0
      return total_requests_sum;
80
0
    }
81
0
    return total_requests_[pri];
82
0
  }
83
84
  Status GetTotalPendingRequests(
85
      int64_t* total_pending_requests,
86
0
      const Env::IOPriority pri = Env::IO_TOTAL) const override {
87
0
    assert(total_pending_requests != nullptr);
88
0
    MutexLock g(&request_mutex_);
89
0
    if (pri == Env::IO_TOTAL) {
90
0
      int64_t total_pending_requests_sum = 0;
91
0
      for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
92
0
        total_pending_requests_sum += static_cast<int64_t>(queue_[i].size());
93
0
      }
94
0
      *total_pending_requests = total_pending_requests_sum;
95
0
    } else {
96
0
      *total_pending_requests = static_cast<int64_t>(queue_[pri].size());
97
0
    }
98
0
    return Status::OK();
99
0
  }
100
101
0
  int64_t GetBytesPerSecond() const override {
102
0
    return rate_bytes_per_sec_.load(std::memory_order_relaxed);
103
0
  }
104
105
0
  virtual void TEST_SetClock(std::shared_ptr<SystemClock> clock) {
106
0
    MutexLock g(&request_mutex_);
107
0
    clock_ = std::move(clock);
108
0
    next_refill_us_ = NowMicrosMonotonicLocked();
109
0
  }
110
111
 private:
112
  static constexpr int kMicrosecondsPerSecond = 1000000;
113
  void RefillBytesAndGrantRequestsLocked();
114
  std::vector<Env::IOPriority> GeneratePriorityIterationOrderLocked();
115
  int64_t CalculateRefillBytesPerPeriodLocked(int64_t rate_bytes_per_sec);
116
  Status TuneLocked();
117
  void SetBytesPerSecondLocked(int64_t bytes_per_second);
118
119
80.4k
  uint64_t NowMicrosMonotonicLocked() {
120
80.4k
    return clock_->NowNanos() / std::milli::den;
121
80.4k
  }
122
123
  // This mutex guard all internal states
124
  mutable port::Mutex request_mutex_;
125
126
  const int64_t refill_period_us_;
127
128
  std::atomic<int64_t> rate_bytes_per_sec_;
129
  std::atomic<int64_t> refill_bytes_per_period_;
130
  // This value is validated but unsanitized (may be zero).
131
  std::atomic<int64_t> raw_single_burst_bytes_;
132
  std::shared_ptr<SystemClock> clock_;
133
134
  bool stop_;
135
  port::CondVar exit_cv_;
136
  int32_t requests_to_wait_;
137
138
  int64_t total_requests_[Env::IO_TOTAL];
139
  int64_t total_bytes_through_[Env::IO_TOTAL];
140
  int64_t available_bytes_;
141
  int64_t next_refill_us_;
142
143
  int32_t fairness_;
144
  Random rnd_;
145
146
  struct Req;
147
  std::deque<Req*> queue_[Env::IO_TOTAL];
148
  bool wait_until_refill_pending_;
149
150
  bool auto_tuned_;
151
  int64_t num_drains_;
152
  const int64_t max_bytes_per_sec_;
153
  std::chrono::microseconds tuned_time_;
154
};
155
156
}  // namespace ROCKSDB_NAMESPACE