/src/rocksdb/util/rate_limiter.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include <algorithm> |
11 | | |
12 | | #include "monitoring/statistics_impl.h" |
13 | | #include "port/port.h" |
14 | | #include "rocksdb/system_clock.h" |
15 | | #include "test_util/sync_point.h" |
16 | | #include "util/aligned_buffer.h" |
17 | | #include "util/rate_limiter_impl.h" |
18 | | |
19 | | namespace ROCKSDB_NAMESPACE { |
20 | | size_t RateLimiter::RequestToken(size_t bytes, size_t alignment, |
21 | | Env::IOPriority io_priority, Statistics* stats, |
22 | 0 | RateLimiter::OpType op_type) { |
23 | 0 | if (io_priority < Env::IO_TOTAL && IsRateLimited(op_type)) { |
24 | 0 | bytes = std::min(bytes, static_cast<size_t>(GetSingleBurstBytes())); |
25 | |
|
26 | 0 | if (alignment > 0) { |
27 | | // Here we may actually require more than burst and block |
28 | | // as we can not write/read less than one page at a time on direct I/O |
29 | | // thus we do not want to be strictly constrained by burst |
30 | 0 | bytes = std::max(alignment, TruncateToPageBoundary(alignment, bytes)); |
31 | 0 | } |
32 | 0 | Request(bytes, io_priority, stats, op_type); |
33 | 0 | } |
34 | 0 | return bytes; |
35 | 0 | } |
36 | | |
37 | | // Pending request |
38 | | struct GenericRateLimiter::Req { |
39 | | explicit Req(int64_t _bytes, port::Mutex* _mu) |
40 | 0 | : request_bytes(_bytes), bytes(_bytes), cv(_mu) {} |
41 | | int64_t request_bytes; |
42 | | int64_t bytes; |
43 | | port::CondVar cv; |
44 | | }; |
45 | | |
46 | | GenericRateLimiter::GenericRateLimiter( |
47 | | int64_t rate_bytes_per_sec, int64_t refill_period_us, int32_t fairness, |
48 | | RateLimiter::Mode mode, const std::shared_ptr<SystemClock>& clock, |
49 | | bool auto_tuned, int64_t single_burst_bytes) |
50 | 48.4k | : RateLimiter(mode), |
51 | 48.4k | refill_period_us_(refill_period_us), |
52 | 48.4k | rate_bytes_per_sec_(auto_tuned ? rate_bytes_per_sec / 2 |
53 | 48.4k | : rate_bytes_per_sec), |
54 | 48.4k | refill_bytes_per_period_( |
55 | 48.4k | CalculateRefillBytesPerPeriodLocked(rate_bytes_per_sec_)), |
56 | 48.4k | raw_single_burst_bytes_(single_burst_bytes), |
57 | 48.4k | clock_(clock), |
58 | 48.4k | stop_(false), |
59 | 48.4k | exit_cv_(&request_mutex_), |
60 | 48.4k | requests_to_wait_(0), |
61 | 48.4k | available_bytes_(0), |
62 | 48.4k | next_refill_us_(NowMicrosMonotonicLocked()), |
63 | 48.4k | fairness_(fairness > 100 ? 100 : fairness), |
64 | 48.4k | rnd_((uint32_t)time(nullptr)), |
65 | 48.4k | wait_until_refill_pending_(false), |
66 | 48.4k | auto_tuned_(auto_tuned), |
67 | 48.4k | num_drains_(0), |
68 | 48.4k | max_bytes_per_sec_(rate_bytes_per_sec), |
69 | 48.4k | tuned_time_(NowMicrosMonotonicLocked()) { |
70 | 242k | for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { |
71 | 193k | total_requests_[i] = 0; |
72 | 193k | total_bytes_through_[i] = 0; |
73 | 193k | } |
74 | 48.4k | } |
75 | | |
76 | 48.4k | GenericRateLimiter::~GenericRateLimiter() { |
77 | 48.4k | MutexLock g(&request_mutex_); |
78 | 48.4k | stop_ = true; |
79 | 48.4k | std::deque<Req*>::size_type queues_size_sum = 0; |
80 | 242k | for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { |
81 | 193k | queues_size_sum += queue_[i].size(); |
82 | 193k | } |
83 | 48.4k | requests_to_wait_ = static_cast<int32_t>(queues_size_sum); |
84 | | |
85 | 242k | for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) { |
86 | 193k | std::deque<Req*> queue = queue_[i]; |
87 | 193k | for (auto& r : queue) { |
88 | 0 | r->cv.Signal(); |
89 | 0 | } |
90 | 193k | } |
91 | | |
92 | 48.4k | while (requests_to_wait_ > 0) { |
93 | 0 | exit_cv_.Wait(); |
94 | 0 | } |
95 | 48.4k | } |
96 | | |
97 | | // This API allows user to dynamically change rate limiter's bytes per second. |
98 | 1 | void GenericRateLimiter::SetBytesPerSecond(int64_t bytes_per_second) { |
99 | 1 | MutexLock g(&request_mutex_); |
100 | 1 | SetBytesPerSecondLocked(bytes_per_second); |
101 | 1 | } |
102 | | |
103 | 1 | void GenericRateLimiter::SetBytesPerSecondLocked(int64_t bytes_per_second) { |
104 | 1 | assert(bytes_per_second > 0); |
105 | 1 | rate_bytes_per_sec_.store(bytes_per_second, std::memory_order_relaxed); |
106 | 1 | refill_bytes_per_period_.store( |
107 | 1 | CalculateRefillBytesPerPeriodLocked(bytes_per_second), |
108 | 1 | std::memory_order_relaxed); |
109 | 1 | } |
110 | | |
111 | 0 | Status GenericRateLimiter::SetSingleBurstBytes(int64_t single_burst_bytes) { |
112 | 0 | if (single_burst_bytes < 0) { |
113 | 0 | return Status::InvalidArgument( |
114 | 0 | "`single_burst_bytes` must be greater than or equal to 0"); |
115 | 0 | } |
116 | | |
117 | 0 | MutexLock g(&request_mutex_); |
118 | 0 | raw_single_burst_bytes_.store(single_burst_bytes, std::memory_order_relaxed); |
119 | 0 | return Status::OK(); |
120 | 0 | } |
121 | | |
122 | | void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri, |
123 | 0 | Statistics* stats) { |
124 | 0 | assert(bytes <= GetSingleBurstBytes()); |
125 | 0 | bytes = std::max(static_cast<int64_t>(0), bytes); |
126 | 0 | TEST_SYNC_POINT("GenericRateLimiter::Request"); |
127 | 0 | TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:1", |
128 | 0 | &rate_bytes_per_sec_); |
129 | 0 | MutexLock g(&request_mutex_); |
130 | |
|
131 | 0 | if (auto_tuned_) { |
132 | 0 | static const int kRefillsPerTune = 100; |
133 | 0 | std::chrono::microseconds now(NowMicrosMonotonicLocked()); |
134 | 0 | if (now - tuned_time_ >= |
135 | 0 | kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) { |
136 | 0 | Status s = TuneLocked(); |
137 | 0 | s.PermitUncheckedError(); //**TODO: What to do on error? |
138 | 0 | } |
139 | 0 | } |
140 | |
|
141 | 0 | if (stop_) { |
142 | | // It is now in the clean-up of ~GenericRateLimiter(). |
143 | | // Therefore any new incoming request will exit from here |
144 | | // and not get satiesfied. |
145 | 0 | return; |
146 | 0 | } |
147 | | |
148 | 0 | ++total_requests_[pri]; |
149 | |
|
150 | 0 | if (available_bytes_ > 0) { |
151 | 0 | int64_t bytes_through = std::min(available_bytes_, bytes); |
152 | 0 | total_bytes_through_[pri] += bytes_through; |
153 | 0 | available_bytes_ -= bytes_through; |
154 | 0 | bytes -= bytes_through; |
155 | 0 | } |
156 | |
|
157 | 0 | if (bytes == 0) { |
158 | 0 | return; |
159 | 0 | } |
160 | | |
161 | | // Request cannot be satisfied at this moment, enqueue |
162 | 0 | Req r(bytes, &request_mutex_); |
163 | 0 | queue_[pri].push_back(&r); |
164 | 0 | TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostEnqueueRequest", |
165 | 0 | &request_mutex_); |
166 | | // A thread representing a queued request coordinates with other such threads. |
167 | | // There are two main duties. |
168 | | // |
169 | | // (1) Waiting for the next refill time. |
170 | | // (2) Refilling the bytes and granting requests. |
171 | 0 | do { |
172 | 0 | int64_t time_until_refill_us = next_refill_us_ - NowMicrosMonotonicLocked(); |
173 | 0 | if (time_until_refill_us > 0) { |
174 | 0 | if (wait_until_refill_pending_) { |
175 | | // Somebody is performing (1). Trust we'll be woken up when our request |
176 | | // is granted or we are needed for future duties. |
177 | 0 | r.cv.Wait(); |
178 | 0 | } else { |
179 | | // Whichever thread reaches here first performs duty (1) as described |
180 | | // above. |
181 | 0 | int64_t wait_until = clock_->NowMicros() + time_until_refill_us; |
182 | 0 | RecordTick(stats, NUMBER_RATE_LIMITER_DRAINS); |
183 | 0 | ++num_drains_; |
184 | 0 | wait_until_refill_pending_ = true; |
185 | 0 | clock_->TimedWait(&r.cv, std::chrono::microseconds(wait_until)); |
186 | 0 | TEST_SYNC_POINT_CALLBACK("GenericRateLimiter::Request:PostTimedWait", |
187 | 0 | &time_until_refill_us); |
188 | 0 | wait_until_refill_pending_ = false; |
189 | 0 | } |
190 | 0 | } else { |
191 | | // Whichever thread reaches here first performs duty (2) as described |
192 | | // above. |
193 | 0 | RefillBytesAndGrantRequestsLocked(); |
194 | 0 | } |
195 | 0 | if (r.request_bytes == 0) { |
196 | | // If there is any remaining requests, make sure there exists at least |
197 | | // one candidate is awake for future duties by signaling a front request |
198 | | // of a queue. |
199 | 0 | for (int i = Env::IO_TOTAL - 1; i >= Env::IO_LOW; --i) { |
200 | 0 | auto& queue = queue_[i]; |
201 | 0 | if (!queue.empty()) { |
202 | 0 | queue.front()->cv.Signal(); |
203 | 0 | break; |
204 | 0 | } |
205 | 0 | } |
206 | 0 | } |
207 | | // Invariant: non-granted request is always in one queue, and granted |
208 | | // request is always in zero queues. |
209 | | #ifndef NDEBUG |
210 | | int num_found = 0; |
211 | | for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { |
212 | | if (std::find(queue_[i].begin(), queue_[i].end(), &r) != |
213 | | queue_[i].end()) { |
214 | | ++num_found; |
215 | | } |
216 | | } |
217 | | if (r.request_bytes == 0) { |
218 | | assert(num_found == 0); |
219 | | } else { |
220 | | assert(num_found == 1); |
221 | | } |
222 | | #endif // NDEBUG |
223 | 0 | } while (!stop_ && r.request_bytes > 0); |
224 | |
|
225 | 0 | if (stop_) { |
226 | | // It is now in the clean-up of ~GenericRateLimiter(). |
227 | | // Therefore any woken-up request will have come out of the loop and then |
228 | | // exit here. It might or might not have been satisfied. |
229 | 0 | --requests_to_wait_; |
230 | 0 | exit_cv_.Signal(); |
231 | 0 | } |
232 | 0 | } |
233 | | |
234 | | std::vector<Env::IOPriority> |
235 | 0 | GenericRateLimiter::GeneratePriorityIterationOrderLocked() { |
236 | 0 | std::vector<Env::IOPriority> pri_iteration_order(Env::IO_TOTAL /* 4 */); |
237 | | // We make Env::IO_USER a superior priority by always iterating its queue |
238 | | // first |
239 | 0 | pri_iteration_order[0] = Env::IO_USER; |
240 | |
|
241 | 0 | bool high_pri_iterated_after_mid_low_pri = rnd_.OneIn(fairness_); |
242 | 0 | TEST_SYNC_POINT_CALLBACK( |
243 | 0 | "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" |
244 | 0 | "PostRandomOneInFairnessForHighPri", |
245 | 0 | &high_pri_iterated_after_mid_low_pri); |
246 | 0 | bool mid_pri_itereated_after_low_pri = rnd_.OneIn(fairness_); |
247 | 0 | TEST_SYNC_POINT_CALLBACK( |
248 | 0 | "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" |
249 | 0 | "PostRandomOneInFairnessForMidPri", |
250 | 0 | &mid_pri_itereated_after_low_pri); |
251 | |
|
252 | 0 | if (high_pri_iterated_after_mid_low_pri) { |
253 | 0 | pri_iteration_order[3] = Env::IO_HIGH; |
254 | 0 | pri_iteration_order[2] = |
255 | 0 | mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW; |
256 | 0 | pri_iteration_order[1] = |
257 | 0 | (pri_iteration_order[2] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID; |
258 | 0 | } else { |
259 | 0 | pri_iteration_order[1] = Env::IO_HIGH; |
260 | 0 | pri_iteration_order[3] = |
261 | 0 | mid_pri_itereated_after_low_pri ? Env::IO_MID : Env::IO_LOW; |
262 | 0 | pri_iteration_order[2] = |
263 | 0 | (pri_iteration_order[3] == Env::IO_MID) ? Env::IO_LOW : Env::IO_MID; |
264 | 0 | } |
265 | |
|
266 | 0 | TEST_SYNC_POINT_CALLBACK( |
267 | 0 | "GenericRateLimiter::GeneratePriorityIterationOrderLocked::" |
268 | 0 | "PreReturnPriIterationOrder", |
269 | 0 | &pri_iteration_order); |
270 | 0 | return pri_iteration_order; |
271 | 0 | } |
272 | | |
273 | 0 | void GenericRateLimiter::RefillBytesAndGrantRequestsLocked() { |
274 | 0 | TEST_SYNC_POINT_CALLBACK( |
275 | 0 | "GenericRateLimiter::RefillBytesAndGrantRequestsLocked", &request_mutex_); |
276 | 0 | next_refill_us_ = NowMicrosMonotonicLocked() + refill_period_us_; |
277 | | // Carry over the left over quota from the last period |
278 | 0 | auto refill_bytes_per_period = |
279 | 0 | refill_bytes_per_period_.load(std::memory_order_relaxed); |
280 | 0 | assert(available_bytes_ == 0); |
281 | 0 | available_bytes_ = refill_bytes_per_period; |
282 | |
|
283 | 0 | std::vector<Env::IOPriority> pri_iteration_order = |
284 | 0 | GeneratePriorityIterationOrderLocked(); |
285 | |
|
286 | 0 | for (int i = Env::IO_LOW; i < Env::IO_TOTAL; ++i) { |
287 | 0 | assert(!pri_iteration_order.empty()); |
288 | 0 | Env::IOPriority current_pri = pri_iteration_order[i]; |
289 | 0 | auto* queue = &queue_[current_pri]; |
290 | 0 | while (!queue->empty()) { |
291 | 0 | auto* next_req = queue->front(); |
292 | 0 | if (available_bytes_ < next_req->request_bytes) { |
293 | | // Grant partial request_bytes even if request is for more than |
294 | | // `available_bytes_`, which can happen in a few situations: |
295 | | // |
296 | | // - The available bytes were partially consumed by other request(s) |
297 | | // - The rate was dynamically reduced while requests were already |
298 | | // enqueued |
299 | | // - The burst size was explicitly set to be larger than the refill size |
300 | 0 | next_req->request_bytes -= available_bytes_; |
301 | 0 | available_bytes_ = 0; |
302 | 0 | break; |
303 | 0 | } |
304 | 0 | available_bytes_ -= next_req->request_bytes; |
305 | 0 | next_req->request_bytes = 0; |
306 | 0 | total_bytes_through_[current_pri] += next_req->bytes; |
307 | 0 | queue->pop_front(); |
308 | | |
309 | | // Quota granted, signal the thread to exit |
310 | 0 | next_req->cv.Signal(); |
311 | 0 | } |
312 | 0 | } |
313 | 0 | } |
314 | | |
315 | | int64_t GenericRateLimiter::CalculateRefillBytesPerPeriodLocked( |
316 | 48.4k | int64_t rate_bytes_per_sec) { |
317 | 48.4k | if (std::numeric_limits<int64_t>::max() / rate_bytes_per_sec < |
318 | 48.4k | refill_period_us_) { |
319 | | // Avoid unexpected result in the overflow case. The result now is still |
320 | | // inaccurate but is a number that is large enough. |
321 | 0 | return std::numeric_limits<int64_t>::max() / kMicrosecondsPerSecond; |
322 | 48.4k | } else { |
323 | 48.4k | return rate_bytes_per_sec * refill_period_us_ / kMicrosecondsPerSecond; |
324 | 48.4k | } |
325 | 48.4k | } |
326 | | |
327 | 0 | Status GenericRateLimiter::TuneLocked() { |
328 | 0 | const int kLowWatermarkPct = 50; |
329 | 0 | const int kHighWatermarkPct = 90; |
330 | 0 | const int kAdjustFactorPct = 5; |
331 | | // computed rate limit will be in |
332 | | // `[max_bytes_per_sec_ / kAllowedRangeFactor, max_bytes_per_sec_]`. |
333 | 0 | const int kAllowedRangeFactor = 20; |
334 | |
|
335 | 0 | std::chrono::microseconds prev_tuned_time = tuned_time_; |
336 | 0 | tuned_time_ = std::chrono::microseconds(NowMicrosMonotonicLocked()); |
337 | |
|
338 | 0 | int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time + |
339 | 0 | std::chrono::microseconds(refill_period_us_) - |
340 | 0 | std::chrono::microseconds(1)) / |
341 | 0 | std::chrono::microseconds(refill_period_us_); |
342 | | // We tune every kRefillsPerTune intervals, so the overflow and division-by- |
343 | | // zero conditions should never happen. |
344 | 0 | assert(num_drains_ <= std::numeric_limits<int64_t>::max() / 100); |
345 | 0 | assert(elapsed_intervals > 0); |
346 | 0 | int64_t drained_pct = num_drains_ * 100 / elapsed_intervals; |
347 | |
|
348 | 0 | int64_t prev_bytes_per_sec = GetBytesPerSecond(); |
349 | 0 | int64_t new_bytes_per_sec; |
350 | 0 | if (drained_pct == 0) { |
351 | 0 | new_bytes_per_sec = max_bytes_per_sec_ / kAllowedRangeFactor; |
352 | 0 | } else if (drained_pct < kLowWatermarkPct) { |
353 | | // sanitize to prevent overflow |
354 | 0 | int64_t sanitized_prev_bytes_per_sec = |
355 | 0 | std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / 100); |
356 | 0 | new_bytes_per_sec = |
357 | 0 | std::max(max_bytes_per_sec_ / kAllowedRangeFactor, |
358 | 0 | sanitized_prev_bytes_per_sec * 100 / (100 + kAdjustFactorPct)); |
359 | 0 | } else if (drained_pct > kHighWatermarkPct) { |
360 | | // sanitize to prevent overflow |
361 | 0 | int64_t sanitized_prev_bytes_per_sec = |
362 | 0 | std::min(prev_bytes_per_sec, std::numeric_limits<int64_t>::max() / |
363 | 0 | (100 + kAdjustFactorPct)); |
364 | 0 | new_bytes_per_sec = |
365 | 0 | std::min(max_bytes_per_sec_, |
366 | 0 | sanitized_prev_bytes_per_sec * (100 + kAdjustFactorPct) / 100); |
367 | 0 | } else { |
368 | 0 | new_bytes_per_sec = prev_bytes_per_sec; |
369 | 0 | } |
370 | 0 | if (new_bytes_per_sec != prev_bytes_per_sec) { |
371 | 0 | SetBytesPerSecondLocked(new_bytes_per_sec); |
372 | 0 | } |
373 | 0 | num_drains_ = 0; |
374 | 0 | return Status::OK(); |
375 | 0 | } |
376 | | |
377 | | RateLimiter* NewGenericRateLimiter( |
378 | | int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, |
379 | | int32_t fairness /* = 10 */, |
380 | | RateLimiter::Mode mode /* = RateLimiter::Mode::kWritesOnly */, |
381 | 48.4k | bool auto_tuned /* = false */, int64_t single_burst_bytes /* = 0 */) { |
382 | 48.4k | assert(rate_bytes_per_sec > 0); |
383 | 48.4k | assert(refill_period_us > 0); |
384 | | assert(fairness > 0); |
385 | 48.4k | std::unique_ptr<RateLimiter> limiter(new GenericRateLimiter( |
386 | 48.4k | rate_bytes_per_sec, refill_period_us, fairness, mode, |
387 | 48.4k | SystemClock::Default(), auto_tuned, single_burst_bytes)); |
388 | 48.4k | return limiter.release(); |
389 | 48.4k | } |
390 | | |
391 | | } // namespace ROCKSDB_NAMESPACE |