Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/buffer/watermark_buffer.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <functional>
4
#include <string>
5
6
#include "envoy/buffer/buffer.h"
7
#include "envoy/common/optref.h"
8
#include "envoy/config/overload/v3/overload.pb.h"
9
10
#include "source/common/buffer/buffer_impl.h"
11
12
namespace Envoy {
13
namespace Buffer {
14
15
// A subclass of OwnedImpl which does watermark validation.
16
// Each time the buffer is resized (written to or drained), the watermarks are checked. As the
17
// buffer size transitions from under the low watermark to above the high watermark, the
18
// above_high_watermark function is called one time. It will not be called again until the buffer
19
// is drained below the low watermark, at which point the below_low_watermark function is called.
20
// If the buffer size is above the overflow watermark, above_overflow_watermark is called.
21
// It is only called on the first time the buffer overflows.
22
class WatermarkBuffer : public OwnedImpl {
23
public:
24
  WatermarkBuffer(std::function<void()> below_low_watermark,
25
                  std::function<void()> above_high_watermark,
26
                  std::function<void()> above_overflow_watermark)
27
      : below_low_watermark_(below_low_watermark), above_high_watermark_(above_high_watermark),
28
424k
        above_overflow_watermark_(above_overflow_watermark) {}
29
30
  // Override all functions from Instance which can result in changing the size
31
  // of the underlying buffer.
32
  void add(const void* data, uint64_t size) override;
33
  void add(absl::string_view data) override;
34
  void add(const Instance& data) override;
35
  void prepend(absl::string_view data) override;
36
  void prepend(Instance& data) override;
37
  size_t addFragments(absl::Span<const absl::string_view> fragments) override;
38
  void drain(uint64_t size) override;
39
  void move(Instance& rhs) override;
40
  void move(Instance& rhs, uint64_t length) override;
41
  void move(Instance& rhs, uint64_t length, bool reset_drain_trackers_and_accounting) override;
42
  SliceDataPtr extractMutableFrontSlice() override;
43
  Reservation reserveForRead() override;
44
385k
  void postProcess() override { checkLowWatermark(); }
45
  void appendSliceForTest(const void* data, uint64_t size) override;
46
  void appendSliceForTest(absl::string_view data) override;
47
48
  void setWatermarks(uint32_t high_watermark, uint32_t overflow_watermark = 0) override;
49
1.81k
  uint32_t highWatermark() const override { return high_watermark_; }
50
  // Returns true if the high watermark callbacks have been called more recently
51
  // than the low watermark callbacks.
52
638k
  bool highWatermarkTriggered() const override { return above_high_watermark_called_; }
53
54
protected:
55
  virtual void checkHighAndOverflowWatermarks();
56
  virtual void checkLowWatermark();
57
58
private:
59
  void commit(uint64_t length, absl::Span<RawSlice> slices,
60
              ReservationSlicesOwnerPtr slices_owner) override;
61
62
  std::function<void()> below_low_watermark_;
63
  std::function<void()> above_high_watermark_;
64
  std::function<void()> above_overflow_watermark_;
65
66
  // Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to
67
  // setWatermarks() the watermark callbacks will be called as described above.
68
  uint32_t high_watermark_{0};
69
  uint32_t low_watermark_{0};
70
  uint32_t overflow_watermark_{0};
71
  // Tracks the latest state of watermark callbacks.
72
  // True between the time above_high_watermark_ has been called until below_low_watermark_ has
73
  // been called.
74
  bool above_high_watermark_called_{false};
75
  // Set to true when above_overflow_watermark_ is called (and isn't cleared).
76
  bool above_overflow_watermark_called_{false};
77
};
78
79
using WatermarkBufferPtr = std::unique_ptr<WatermarkBuffer>;
80
81
class WatermarkBufferFactory;
82
83
/**
84
 * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and
85
 * slices that originate from those buffers, or are untagged and pass through an
86
 * associated buffer.
87
 *
88
 * This BufferMemoryAccount is produced by the *WatermarkBufferFactory*.
89
 */
90
class BufferMemoryAccountImpl : public BufferMemoryAccount {
91
public:
92
  // Used to create the account, and complete wiring with the factory
93
  // and shared_this_.
94
  static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory,
95
                                                    Http::StreamResetHandler& reset_handler);
96
0
  ~BufferMemoryAccountImpl() override {
97
    // The buffer_memory_allocated_ should always be zero on destruction, even
98
    // if we triggered a reset of the downstream. This is because the destructor
99
    // will only trigger when no entities have a pointer to the account, meaning
100
    // any slices which charge and credit the account should have credited the
101
    // account when they were deleted, maintaining this invariant.
102
0
    ASSERT(buffer_memory_allocated_ == 0);
103
0
    ASSERT(!reset_handler_.has_value());
104
0
  }
105
106
  // Make not copyable
107
  BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete;
108
  BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete;
109
110
  // Make not movable.
111
  BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete;
112
  BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete;
113
114
0
  uint64_t balance() const { return buffer_memory_allocated_; }
115
  void charge(uint64_t amount) override;
116
  void credit(uint64_t amount) override;
117
118
  // Clear the associated downstream, preparing the account to be destroyed.
119
  // This is idempotent.
120
  void clearDownstream() override;
121
122
0
  void resetDownstream() override {
123
0
    if (reset_handler_.has_value()) {
124
0
      reset_handler_->resetStream(Http::StreamResetReason::OverloadManager);
125
0
    }
126
0
  }
127
128
  // The number of memory classes the Account expects to exists. See
129
  // *WatermarkBufferFactory* for details on the memory classes.
130
  static constexpr uint32_t NUM_MEMORY_CLASSES_ = 8;
131
132
private:
133
  BufferMemoryAccountImpl(WatermarkBufferFactory* factory, Http::StreamResetHandler& reset_handler)
134
0
      : factory_(factory), reset_handler_(reset_handler) {}
135
136
  // Returns the class index based off of the buffer_memory_allocated_
137
  // This can differ with current_bucket_idx_ if buffer_memory_allocated_ was
138
  // just modified.
139
  // Returned class index, if present, is in the range [0, NUM_MEMORY_CLASSES_).
140
  absl::optional<uint32_t> balanceToClassIndex();
141
  void updateAccountClass();
142
143
  uint64_t buffer_memory_allocated_ = 0;
144
  // Current bucket index where the account is being tracked in.
145
  absl::optional<uint32_t> current_bucket_idx_{};
146
147
  WatermarkBufferFactory* factory_ = nullptr;
148
149
  OptRef<Http::StreamResetHandler> reset_handler_;
150
  // Keep a copy of the shared_ptr pointing to this account. We opted to go this
151
  // route rather than enable_shared_from_this to avoid wasteful atomic
152
  // operations e.g. when updating the tracking of the account.
153
  // This is set through the createAccount static method which is the only way to
154
  // instantiate an instance of this class. This should is cleared when
155
  // unregistering from the factory.
156
  BufferMemoryAccountSharedPtr shared_this_ = nullptr;
157
};
158
159
/**
160
 * The WatermarkBufferFactory creates *WatermarkBuffer*s and
161
 * *BufferMemoryAccountImpl* that can be used to bind to the created buffers
162
 * from a given downstream (and corresponding upstream, if one exists). The
163
 * accounts can then be used to reset the underlying stream.
164
 *
165
 * Any account produced by this factory might be tracked by the factory using the
166
 * following scheme:
167
 *
168
 * 1) Is the account balance >= 1MB? If not don't track.
169
 * 2) For all accounts above the minimum threshold for tracking, put the account
170
 *    into one of the *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets.
171
 *
172
 *    We keep buckets containing accounts within a "memory class", which are
173
 *    power of two buckets. For example, with a minimum threshold of 1MB, our
174
 *    first bucket contains [1MB, 2MB) accounts, the second bucket contains
175
 *    [2MB, 4MB), and so forth for
176
 *    *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. These buckets
177
 *    allow us to coarsely track accounts, and if overloaded we can easily
178
 *    target more expensive streams.
179
 *
180
 *    As the account balance changes, the account informs the Watermark Factory
181
 *    if the bucket for that account has changed. See
182
 *    *BufferMemoryAccountImpl::balanceToClassIndex()* for details on the memory
183
 *    class for a given account balance.
184
 *
185
 * TODO(kbaichoo): Update this documentation when we make the minimum account
186
 * threshold configurable.
187
 *
188
 */
189
class WatermarkBufferFactory : public WatermarkFactory {
190
public:
191
  WatermarkBufferFactory(const envoy::config::overload::v3::BufferFactoryConfig& config);
192
193
  // Buffer::WatermarkFactory
194
  ~WatermarkBufferFactory() override;
195
  InstancePtr createBuffer(std::function<void()> below_low_watermark,
196
                           std::function<void()> above_high_watermark,
197
39.3k
                           std::function<void()> above_overflow_watermark) override {
198
39.3k
    return std::make_unique<WatermarkBuffer>(below_low_watermark, above_high_watermark,
199
39.3k
                                             above_overflow_watermark);
200
39.3k
  }
201
202
  BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override;
203
  uint64_t resetAccountsGivenPressure(float pressure) override;
204
205
  // Called by BufferMemoryAccountImpls created by the factory on account class
206
  // updated.
207
  void updateAccountClass(const BufferMemoryAccountSharedPtr& account,
208
                          absl::optional<uint32_t> current_class,
209
                          absl::optional<uint32_t> new_class);
210
211
0
  uint32_t bitshift() const { return bitshift_; }
212
213
  // Unregister a buffer memory account.
214
  virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account,
215
                                 absl::optional<uint32_t> current_class);
216
217
protected:
218
  // Enable subclasses to inspect the mapping.
219
  using MemoryClassesToAccountsSet = std::array<absl::flat_hash_set<BufferMemoryAccountSharedPtr>,
220
                                                BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>;
221
  MemoryClassesToAccountsSet size_class_account_sets_;
222
  // How much to bit shift right balances to test whether the account should be
223
  // tracked in *size_class_account_sets_*.
224
  const uint32_t bitshift_;
225
};
226
227
} // namespace Buffer
228
} // namespace Envoy