/src/rocksdb/db/write_controller.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/write_controller.h" |
7 | | |
8 | | #include <algorithm> |
9 | | #include <atomic> |
10 | | #include <cassert> |
11 | | #include <ratio> |
12 | | |
13 | | #include "rocksdb/system_clock.h" |
14 | | |
15 | | namespace ROCKSDB_NAMESPACE { |
16 | | |
17 | 0 | std::unique_ptr<WriteControllerToken> WriteController::GetStopToken() { |
18 | 0 | ++total_stopped_; |
19 | 0 | return std::unique_ptr<WriteControllerToken>(new StopWriteToken(this)); |
20 | 0 | } |
21 | | |
22 | | std::unique_ptr<WriteControllerToken> WriteController::GetDelayToken( |
23 | 31 | uint64_t write_rate) { |
24 | 31 | if (0 == total_delayed_++) { |
25 | | // Starting delay, so reset counters. |
26 | 31 | next_refill_time_ = 0; |
27 | 31 | credit_in_bytes_ = 0; |
28 | 31 | } |
29 | | // NOTE: for simplicity, any current credit_in_bytes_ or "debt" in |
30 | | // next_refill_time_ will be based on an old rate. This rate will apply |
31 | | // for subsequent additional debts and for the next refill. |
32 | 31 | set_delayed_write_rate(write_rate); |
33 | 31 | return std::unique_ptr<WriteControllerToken>(new DelayWriteToken(this)); |
34 | 31 | } |
35 | | |
36 | | std::unique_ptr<WriteControllerToken> |
37 | 195 | WriteController::GetCompactionPressureToken() { |
38 | 195 | ++total_compaction_pressure_; |
39 | 195 | return std::unique_ptr<WriteControllerToken>( |
40 | 195 | new CompactionPressureToken(this)); |
41 | 195 | } |
42 | | |
43 | 1.20M | bool WriteController::IsStopped() const { |
44 | 1.20M | return total_stopped_.load(std::memory_order_relaxed) > 0; |
45 | 1.20M | } |
46 | | // This is inside DB mutex, so we can't sleep and need to minimize |
47 | | // frequency to get time. |
48 | | // If it turns out to be a performance issue, we can redesign the thread |
49 | | // synchronization model here. |
50 | | // The function trust caller will sleep micros returned. |
51 | 31 | uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) { |
52 | 31 | if (total_stopped_.load(std::memory_order_relaxed) > 0) { |
53 | 0 | return 0; |
54 | 0 | } |
55 | 31 | if (total_delayed_.load(std::memory_order_relaxed) == 0) { |
56 | 0 | return 0; |
57 | 0 | } |
58 | | |
59 | 31 | if (credit_in_bytes_ >= num_bytes) { |
60 | 29 | credit_in_bytes_ -= num_bytes; |
61 | 29 | return 0; |
62 | 29 | } |
63 | | // The frequency to get time inside DB mutex is less than one per refill |
64 | | // interval. |
65 | 2 | auto time_now = NowMicrosMonotonic(clock); |
66 | | |
67 | 2 | const uint64_t kMicrosPerSecond = 1000000; |
68 | | // Refill every 1 ms |
69 | 2 | const uint64_t kMicrosPerRefill = 1000; |
70 | | |
71 | 2 | if (next_refill_time_ == 0) { |
72 | | // Start with an initial allotment of bytes for one interval |
73 | 2 | next_refill_time_ = time_now; |
74 | 2 | } |
75 | 2 | if (next_refill_time_ <= time_now) { |
76 | | // Refill based on time interval plus any extra elapsed |
77 | 2 | uint64_t elapsed = time_now - next_refill_time_ + kMicrosPerRefill; |
78 | 2 | credit_in_bytes_ += static_cast<uint64_t>( |
79 | 2 | 1.0 * elapsed / kMicrosPerSecond * delayed_write_rate_ + 0.999999); |
80 | 2 | next_refill_time_ = time_now + kMicrosPerRefill; |
81 | | |
82 | 2 | if (credit_in_bytes_ >= num_bytes) { |
83 | | // Avoid delay if possible, to reduce DB mutex release & re-aquire. |
84 | 2 | credit_in_bytes_ -= num_bytes; |
85 | 2 | return 0; |
86 | 2 | } |
87 | 2 | } |
88 | | |
89 | | // We need to delay to avoid exceeding write rate. |
90 | 2 | assert(num_bytes > credit_in_bytes_); |
91 | 0 | uint64_t bytes_over_budget = num_bytes - credit_in_bytes_; |
92 | 0 | uint64_t needed_delay = static_cast<uint64_t>( |
93 | 0 | 1.0 * bytes_over_budget / delayed_write_rate_ * kMicrosPerSecond); |
94 | |
|
95 | 0 | credit_in_bytes_ = 0; |
96 | 0 | next_refill_time_ += needed_delay; |
97 | | |
98 | | // Minimum delay of refill interval, to reduce DB mutex contention. |
99 | 0 | return std::max(next_refill_time_ - time_now, kMicrosPerRefill); |
100 | 2 | } |
101 | | |
102 | 2 | uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) { |
103 | 2 | return clock->NowNanos() / std::milli::den; |
104 | 2 | } |
105 | | |
106 | 0 | StopWriteToken::~StopWriteToken() { |
107 | 0 | assert(controller_->total_stopped_ >= 1); |
108 | 0 | --controller_->total_stopped_; |
109 | 0 | } |
110 | | |
111 | 31 | DelayWriteToken::~DelayWriteToken() { |
112 | 31 | controller_->total_delayed_--; |
113 | 31 | assert(controller_->total_delayed_.load() >= 0); |
114 | 31 | } |
115 | | |
116 | 195 | CompactionPressureToken::~CompactionPressureToken() { |
117 | 195 | controller_->total_compaction_pressure_--; |
118 | | assert(controller_->total_compaction_pressure_ >= 0); |
119 | 195 | } |
120 | | |
121 | | } // namespace ROCKSDB_NAMESPACE |