Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/db/write_controller.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <stdint.h>
9
10
#include <atomic>
11
#include <memory>
12
13
#include "rocksdb/rate_limiter.h"
14
15
namespace ROCKSDB_NAMESPACE {
16
17
class SystemClock;
18
class WriteControllerToken;
19
20
// WriteController is controlling write stalls in our write code-path. Write
21
// stalls happen when compaction can't keep up with write rate.
22
// All of the methods here (including WriteControllerToken's destructors) need
23
// to be called while holding DB mutex
24
class WriteController {
25
 public:
26
  explicit WriteController(uint64_t _delayed_write_rate = 1024u * 1024u * 32u,
27
                           int64_t low_pri_rate_bytes_per_sec = 1024 * 1024)
28
      : total_stopped_(0),
29
        total_delayed_(0),
30
        total_compaction_pressure_(0),
31
        credit_in_bytes_(0),
32
        next_refill_time_(0),
33
        low_pri_rate_limiter_(
34
13.9k
            NewGenericRateLimiter(low_pri_rate_bytes_per_sec)) {
35
13.9k
    set_max_delayed_write_rate(_delayed_write_rate);
36
13.9k
  }
37
13.9k
  ~WriteController() = default;
38
39
  // When an actor (column family) requests a stop token, all writes will be
40
  // stopped until the stop token is released (deleted)
41
  std::unique_ptr<WriteControllerToken> GetStopToken();
42
  // When an actor (column family) requests a delay token, total delay for all
43
  // writes to the DB will be controlled under the delayed write rate. Every
44
  // write needs to call GetDelay() with number of bytes writing to the DB,
45
  // which returns number of microseconds to sleep.
46
  std::unique_ptr<WriteControllerToken> GetDelayToken(
47
      uint64_t delayed_write_rate);
48
  // When an actor (column family) requests a moderate token, compaction
49
  // threads will be increased
50
  std::unique_ptr<WriteControllerToken> GetCompactionPressureToken();
51
52
  // these three metods are querying the state of the WriteController
53
  bool IsStopped() const;
54
93.7k
  bool NeedsDelay() const { return total_delayed_.load() > 0; }
55
34.9k
  bool NeedSpeedupCompaction() const {
56
34.9k
    return IsStopped() || NeedsDelay() || total_compaction_pressure_.load() > 0;
57
34.9k
  }
58
  // return how many microseconds the caller needs to sleep after the call
59
  // num_bytes: how many number of bytes to put into the DB.
60
  // Prerequisite: DB mutex held.
61
  uint64_t GetDelay(SystemClock* clock, uint64_t num_bytes);
62
7
  void set_delayed_write_rate(uint64_t write_rate) {
63
    // avoid divide 0
64
7
    if (write_rate == 0) {
65
0
      write_rate = 1u;
66
7
    } else if (write_rate > max_delayed_write_rate()) {
67
0
      write_rate = max_delayed_write_rate();
68
0
    }
69
7
    delayed_write_rate_ = write_rate;
70
7
  }
71
72
13.9k
  void set_max_delayed_write_rate(uint64_t write_rate) {
73
    // avoid divide 0
74
13.9k
    if (write_rate == 0) {
75
0
      write_rate = 1u;
76
0
    }
77
13.9k
    max_delayed_write_rate_ = write_rate;
78
    // update delayed_write_rate_ as well
79
13.9k
    delayed_write_rate_ = write_rate;
80
13.9k
  }
81
82
14
  uint64_t delayed_write_rate() const { return delayed_write_rate_; }
83
84
14
  uint64_t max_delayed_write_rate() const { return max_delayed_write_rate_; }
85
86
0
  RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
87
88
 private:
89
  uint64_t NowMicrosMonotonic(SystemClock* clock);
90
91
  friend class WriteControllerToken;
92
  friend class StopWriteToken;
93
  friend class DelayWriteToken;
94
  friend class CompactionPressureToken;
95
96
  std::atomic<int> total_stopped_;
97
  std::atomic<int> total_delayed_;
98
  std::atomic<int> total_compaction_pressure_;
99
100
  // Number of bytes allowed to write without delay
101
  uint64_t credit_in_bytes_;
102
  // Next time that we can add more credit of bytes
103
  uint64_t next_refill_time_;
104
  // Write rate set when initialization or by `DBImpl::SetDBOptions`
105
  uint64_t max_delayed_write_rate_;
106
  // Current write rate (bytes / second)
107
  uint64_t delayed_write_rate_;
108
109
  std::unique_ptr<RateLimiter> low_pri_rate_limiter_;
110
};
111
112
class WriteControllerToken {
113
 public:
114
  explicit WriteControllerToken(WriteController* controller)
115
611
      : controller_(controller) {}
116
611
  virtual ~WriteControllerToken() {}
117
118
 protected:
119
  WriteController* controller_;
120
121
 private:
122
  // no copying allowed
123
  WriteControllerToken(const WriteControllerToken&) = delete;
124
  void operator=(const WriteControllerToken&) = delete;
125
};
126
127
class StopWriteToken : public WriteControllerToken {
128
 public:
129
  explicit StopWriteToken(WriteController* controller)
130
0
      : WriteControllerToken(controller) {}
131
  virtual ~StopWriteToken();
132
};
133
134
class DelayWriteToken : public WriteControllerToken {
135
 public:
136
  explicit DelayWriteToken(WriteController* controller)
137
7
      : WriteControllerToken(controller) {}
138
  virtual ~DelayWriteToken();
139
};
140
141
class CompactionPressureToken : public WriteControllerToken {
142
 public:
143
  explicit CompactionPressureToken(WriteController* controller)
144
604
      : WriteControllerToken(controller) {}
145
  virtual ~CompactionPressureToken();
146
};
147
148
}  // namespace ROCKSDB_NAMESPACE