Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/util/rate_limiter.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include <algorithm>
11
12
#include "monitoring/statistics_impl.h"
13
#include "port/port.h"
14
#include "rocksdb/system_clock.h"
15
#include "test_util/sync_point.h"
16
#include "util/aligned_buffer.h"
17
#include "util/rate_limiter_impl.h"
18
19
namespace ROCKSDB_NAMESPACE {
20
size_t RateLimiter::RequestToken(size_t bytes, size_t alignment,
21
                                 Env::IOPriority io_priority, Statistics* stats,
22
0
                                 RateLimiter::OpType op_type) {
23
0
  if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) {
24
0
    bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes()));
25
26
0
    if (alignment > 0) {
27
      // Here we may actually require more than burst and block
28
      // as we can not write/read less than one page at a time on direct I/O
29
      // thus we do not want to be strictly constrained by burst
30
0
      bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes));
31
0
    }
32
0
    Request(bytes, io_priority, stats, op_type);
33
0
  }
34
0
  return bytes;
35
0
}
36
37
// Pending request
38
struct GenericRateLimiter::Req {
39
  explicit Req(int64_t _bytes, port::Mutex* _mu)
40
0
      : request_bytes(_bytes), bytes(_bytes), cv(_mu) {}
41
  int64_t request_bytes;
42
  int64_t bytes;
43
  port::CondVar cv;
44
};
45
46
GenericRateLimiter::GenericRateLimiter(
47
    int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness,
48
    RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock,
49
    bool auto_tuned, int64_t single_burst_bytes)
50
48.4k
    : RateLimiter(mode),
51
48.4k
      refill_period_us_(refill_period_us),
52
48.4k
      rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2
53
48.4k
                                     : rate_bytes_per_sec),
54
48.4k
      refill_bytes_per_period_(
55
48.4k
          CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)),
56
48.4k
      raw_single_burst_bytes_(single_burst_bytes),
57
48.4k
      clock_(clock),
58
48.4k
      stop_(false),
59
48.4k
      exit_cv_(&request_mutex_),
60
48.4k
      requests_to_wait_(0),
61
48.4k
      available_bytes_(0),
62
48.4k
      next_refill_us_(NowMicrosMonotonicLocked()),
63
48.4k
      fairness_(fairness > 100 ? 100 : fairness),
64
48.4k
      rnd_((uint32_t)time(nullptr)),
65
48.4k
      wait_until_refill_pending_(false),
66
48.4k
      auto_tuned_(auto_tuned),
67
48.4k
      num_drains_(0),
68
48.4k
      max_bytes_per_sec_(rate_bytes_per_sec),
69
48.4k
      tuned_time_(NowMicrosMonotonicLocked()) {
70
242k
  for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
71
193k
    total_requests_[i] = 0;
72
193k
    total_bytes_through_[i] = 0;
73
193k
  }
74
48.4k
}
75
76
48.4k
GenericRateLimiter::~GenericRateLimiter() {
77
48.4k
  MutexLock g(&request_mutex_);
78
48.4k
  stop_ = true;
79
48.4k
  std::deque<Req*>::size_type queues_size_sum = 0;
80
242k
  for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
81
193k
    queues_size_sum += queue_[i].size();
82
193k
  }
83
48.4k
  requests_to_wait_ = static_cast<int32_t>(queues_size_sum);
84
85
242k
  for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
86
193k
    std::deque<Req*> queue = queue_[i];
87
193k
    for (auto& r : queue) {
88
0
      r->cv.Signal();
89
0
    }
90
193k
  }
91
92
48.4k
  while (requests_to_wait_ > 0) {
93
0
    exit_cv_.Wait();
94
0
  }
95
48.4k
}
96
97
// This API allows user to dynamically change rate limiter's bytes per second.
98
1
void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) {
99
1
  MutexLock g(&request_mutex_);
100
1
  SetBytesPerSecondLocked(bytes_per_second);
101
1
}
102
103
1
void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) {
104
1
  assert(bytes_per_second > 0);
105
1
  rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed);
106
1
  refill_bytes_per_period_.store(
107
1
      CalculateRefillBytesPerPeriodLocked(bytes_per_second),
108
1
      std::memory_order_relaxed);
109
1
}
110
111
0
Status GenericRateLimiter::SetSingleBurstBytes(int64_t single_burst_bytes) {
112
0
  if (single_burst_bytes < 0) {
113
0
    return Status::InvalidArgument(
114
0
        "`single_burst_bytes` must be greater than or equal to 0");
115
0
  }
116
117
0
  MutexLock g(&request_mutex_);
118
0
  raw_single_burst_bytes_.store(single_burst_bytes, std::memory_order_relaxed);
119
0
  return Status::OK();
120
0
}
121
122
void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
123
0
                                 Statistics* stats) {
124
0
  assert(bytes <= GetSingleBurstBytes());
125
0
  bytes = std::max(static_cast<int64_t>(0), bytes);
126
0
  TEST_SYNC_POINT("GenericRateLimiter::Request");
127
0
  TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1",
128
0
                           &rate_bytes_per_sec_);
129
0
  MutexLock g(&request_mutex_);
130
131
0
  if (auto_tuned_) {
132
0
    static const int kRefillsPerTune = 100;
133
0
    std::chrono::microseconds now(NowMicrosMonotonicLocked());
134
0
    if (now - tuned_time_ >=
135
0
        kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
136
0
      Status s = TuneLocked();
137
0
      s.PermitUncheckedError();  //**TODO: What to do on error?
138
0
    }
139
0
  }
140
141
0
  if (stop_) {
142
    // It is now in the clean-up of ~GenericRateLimiter().
143
    // Therefore any new incoming request will exit from here
144
    // and not get satiesfied.
145
0
    return;
146
0
  }
147
148
0
  ++total_requests_[pri];
149
150
0
  if (available_bytes_ > 0) {
151
0
    int64_t bytes_through = std::min(available_bytes_, bytes);
152
0
    total_bytes_through_[pri] += bytes_through;
153
0
    available_bytes_ -= bytes_through;
154
0
    bytes -= bytes_through;
155
0
  }
156
157
0
  if (bytes == 0) {
158
0
    return;
159
0
  }
160
161
  // Request cannot be satisfied at this moment, enqueue
162
0
  Req r(bytes, &request_mutex_);
163
0
  queue_[pri].push_back(&r);
164
0
  TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest",
165
0
                           &request_mutex_);
166
  // A thread representing a queued request coordinates with other such threads.
167
  // There are two main duties.
168
  //
169
  // (1) Waiting for the next refill time.
170
  // (2) Refilling the bytes and granting requests.
171
0
  do {
172
0
    int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked();
173
0
    if (time_until_refill_us > 0) {
174
0
      if (wait_until_refill_pending_) {
175
        // Somebody is performing (1). Trust we'll be woken up when our request
176
        // is granted or we are needed for future duties.
177
0
        r.cv.Wait();
178
0
      } else {
179
        // Whichever thread reaches here first performs duty (1) as described
180
        // above.
181
0
        int64_t wait_until = clock_->NowMicros() + time_until_refill_us;
182
0
        RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS);
183
0
        ++num_drains_;
184
0
        wait_until_refill_pending_ = true;
185
0
        clock_->TimedWait(&r.cv, std::chrono::microseconds(wait_until));
186
0
        TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait",
187
0
                                 &time_until_refill_us);
188
0
        wait_until_refill_pending_ = false;
189
0
      }
190
0
    } else {
191
      // Whichever thread reaches here first performs duty (2) as described
192
      // above.
193
0
      RefillBytesAndGrantRequestsLocked();
194
0
    }
195
0
    if (r.request_bytes == 0) {
196
      // If there is any remaining requests, make sure there exists at least
197
      // one candidate is awake for future duties by signaling a front request
198
      // of a queue.
199
0
      for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) {
200
0
        auto& queue = queue_[i];
201
0
        if (!queue.empty()) {
202
0
          queue.front()->cv.Signal();
203
0
          break;
204
0
        }
205
0
      }
206
0
    }
207
    // Invariant: non-granted request is always in one queue, and granted
208
    // request is always in zero queues.
209
#ifndef NDEBUG
210
    int num_found = 0;
211
    for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
212
      if (std::find(queue_[i].begin(), queue_[i].end(), &r) !=
213
          queue_[i].end()) {
214
        ++num_found;
215
      }
216
    }
217
    if (r.request_bytes == 0) {
218
      assert(num_found == 0);
219
    } else {
220
      assert(num_found == 1);
221
    }
222
#endif  // NDEBUG
223
0
  } while (!stop_ && r.request_bytes > 0);
224
225
0
  if (stop_) {
226
    // It is now in the clean-up of ~GenericRateLimiter().
227
    // Therefore any woken-up request will have come out of the loop and then
228
    // exit here. It might or might not have been satisfied.
229
0
    --requests_to_wait_;
230
0
    exit_cv_.Signal();
231
0
  }
232
0
}
233
234
std::vector<Env::IOPriority>
235
0
GenericRateLimiter::GeneratePriorityIterationOrderLocked() {
236
0
  std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */);
237
  // We make Env::IO_USER a superior priority by always iterating its queue
238
  // first
239
0
  pri_iteration_order[0] = Env::IO_USER;
240
241
0
  bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_);
242
0
  TEST_SYNC_POINT_CALLBACK(
243
0
      "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
244
0
      "PostRandomOneInFairnessForHighPri",
245
0
      &high_pri_iterated_after_mid_low_pri);
246
0
  bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_);
247
0
  TEST_SYNC_POINT_CALLBACK(
248
0
      "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
249
0
      "PostRandomOneInFairnessForMidPri",
250
0
      &mid_pri_itereated_after_low_pri);
251
252
0
  if (high_pri_iterated_after_mid_low_pri) {
253
0
    pri_iteration_order[3] = Env::IO_HIGH;
254
0
    pri_iteration_order[2] =
255
0
        mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
256
0
    pri_iteration_order[1] =
257
0
        (pri_iteration_order[2] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
258
0
  } else {
259
0
    pri_iteration_order[1] = Env::IO_HIGH;
260
0
    pri_iteration_order[3] =
261
0
        mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW;
262
0
    pri_iteration_order[2] =
263
0
        (pri_iteration_order[3] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID;
264
0
  }
265
266
0
  TEST_SYNC_POINT_CALLBACK(
267
0
      "GenericRateLimiter::GeneratePriorityIterationOrderLocked::"
268
0
      "PreReturnPriIterationOrder",
269
0
      &pri_iteration_order);
270
0
  return pri_iteration_order;
271
0
}
272
273
0
void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() {
274
0
  TEST_SYNC_POINT_CALLBACK(
275
0
      "GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_);
276
0
  next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_;
277
  // Carry over the left over quota from the last period
278
0
  auto refill_bytes_per_period =
279
0
      refill_bytes_per_period_.load(std::memory_order_relaxed);
280
0
  assert(available_bytes_ == 0);
281
0
  available_bytes_ = refill_bytes_per_period;
282
283
0
  std::vector<Env::IOPriority> pri_iteration_order =
284
0
      GeneratePriorityIterationOrderLocked();
285
286
0
  for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) {
287
0
    assert(!pri_iteration_order.empty());
288
0
    Env::IOPriority current_pri = pri_iteration_order[i];
289
0
    auto* queue = &queue_[current_pri];
290
0
    while (!queue->empty()) {
291
0
      auto* next_req = queue->front();
292
0
      if (available_bytes_ < next_req->request_bytes) {
293
        // Grant partial request_bytes even if request is for more than
294
        // `available_bytes_`, which can happen in a few situations:
295
        //
296
        // - The available bytes were partially consumed by other request(s)
297
        // - The rate was dynamically reduced while requests were already
298
        //   enqueued
299
        // - The burst size was explicitly set to be larger than the refill size
300
0
        next_req->request_bytes -= available_bytes_;
301
0
        available_bytes_ = 0;
302
0
        break;
303
0
      }
304
0
      available_bytes_ -= next_req->request_bytes;
305
0
      next_req->request_bytes = 0;
306
0
      total_bytes_through_[current_pri] += next_req->bytes;
307
0
      queue->pop_front();
308
309
      // Quota granted, signal the thread to exit
310
0
      next_req->cv.Signal();
311
0
    }
312
0
  }
313
0
}
314
315
int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked(
316
48.4k
    int64_t rate_bytes_per_sec) {
317
48.4k
  if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec <
318
48.4k
      refill_period_us_) {
319
    // Avoid unexpected result in the overflow case. The result now is still
320
    // inaccurate but is a number that is large enough.
321
0
    return std::numeric_limits<int64_t>::max() / kMicrosecondsPerSecond;
322
48.4k
  } else {
323
48.4k
    return rate_bytes_per_sec * refill_period_us_ / kMicrosecondsPerSecond;
324
48.4k
  }
325
48.4k
}
326
327
0
Status GenericRateLimiter::TuneLocked() {
328
0
  const int kLowWatermarkPct = 50;
329
0
  const int kHighWatermarkPct = 90;
330
0
  const int kAdjustFactorPct = 5;
331
  // computed rate limit will be in
332
  // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`.
333
0
  const int kAllowedRangeFactor = 20;
334
335
0
  std::chrono::microseconds prev_tuned_time = tuned_time_;
336
0
  tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked());
337
338
0
  int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
339
0
                               std::chrono::microseconds(refill_period_us_) -
340
0
                               std::chrono::microseconds(1)) /
341
0
                              std::chrono::microseconds(refill_period_us_);
342
  // We tune every kRefillsPerTune intervals, so the overflow and division-by-
343
  // zero conditions should never happen.
344
0
  assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100);
345
0
  assert(elapsed_intervals > 0);
346
0
  int64_t drained_pct = num_drains_ * 100 / elapsed_intervals;
347
348
0
  int64_t prev_bytes_per_sec = GetBytesPerSecond();
349
0
  int64_t new_bytes_per_sec;
350
0
  if (drained_pct == 0) {
351
0
    new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor;
352
0
  } else if (drained_pct < kLowWatermarkPct) {
353
    // sanitize to prevent overflow
354
0
    int64_t sanitized_prev_bytes_per_sec =
355
0
        std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100);
356
0
    new_bytes_per_sec =
357
0
        std::max(max_bytes_per_sec_ / kAllowedRangeFactor,
358
0
                 sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct));
359
0
  } else if (drained_pct > kHighWatermarkPct) {
360
    // sanitize to prevent overflow
361
0
    int64_t sanitized_prev_bytes_per_sec =
362
0
        std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() /
363
0
                                         (100 + kAdjustFactorPct));
364
0
    new_bytes_per_sec =
365
0
        std::min(max_bytes_per_sec_,
366
0
                 sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100);
367
0
  } else {
368
0
    new_bytes_per_sec = prev_bytes_per_sec;
369
0
  }
370
0
  if (new_bytes_per_sec != prev_bytes_per_sec) {
371
0
    SetBytesPerSecondLocked(new_bytes_per_sec);
372
0
  }
373
0
  num_drains_ = 0;
374
0
  return Status::OK();
375
0
}
376
377
RateLimiter* NewGenericRateLimiter(
378
    int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
379
    int32_t fairness /* = 10 */,
380
    RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */,
381
48.4k
    bool auto_tuned /* = false */, int64_t single_burst_bytes /* = 0 */) {
382
48.4k
  assert(rate_bytes_per_sec > 0);
383
48.4k
  assert(refill_period_us > 0);
384
  assert(fairness > 0);
385
48.4k
  std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter(
386
48.4k
      rate_bytes_per_sec, refill_period_us, fairness, mode,
387
48.4k
      SystemClock::Default(), auto_tuned, single_burst_bytes));
388
48.4k
  return limiter.release();
389
48.4k
}
390
391
}  // namespace ROCKSDB_NAMESPACE