1
#pragma once
2

            
3
#include <functional>
4
#include <string>
5

            
6
#include "envoy/buffer/buffer.h"
7
#include "envoy/common/optref.h"
8
#include "envoy/config/overload/v3/overload.pb.h"
9

            
10
#include "source/common/buffer/buffer_impl.h"
11

            
12
namespace Envoy {
13
namespace Buffer {
14

            
15
// A subclass of OwnedImpl which does watermark validation.
16
// Each time the buffer is resized (written to or drained), the watermarks are checked. As the
17
// buffer size transitions from under the low watermark to above the high watermark, the
18
// above_high_watermark function is called one time. It will not be called again until the buffer
19
// is drained below the low watermark, at which point the below_low_watermark function is called.
20
// If the buffer size is above the overflow watermark, above_overflow_watermark is called.
21
// It is only called on the first time the buffer overflows.
22
class WatermarkBuffer : public OwnedImpl {
23
public:
24
  WatermarkBuffer(std::function<void()> below_low_watermark,
25
                  std::function<void()> above_high_watermark,
26
                  std::function<void()> above_overflow_watermark)
27
597890
      : below_low_watermark_(below_low_watermark), above_high_watermark_(above_high_watermark),
28
597890
        above_overflow_watermark_(above_overflow_watermark) {}
29

            
30
  // Override all functions from Instance which can result in changing the size
31
  // of the underlying buffer.
32
  void add(const void* data, uint64_t size) override;
33
  void add(absl::string_view data) override;
34
  void add(const Instance& data) override;
35
  void prepend(absl::string_view data) override;
36
  void prepend(Instance& data) override;
37
  size_t addFragments(absl::Span<const absl::string_view> fragments) override;
38
  void drain(uint64_t size) override;
39
  void move(Instance& rhs) override;
40
  void move(Instance& rhs, uint64_t length) override;
41
  void move(Instance& rhs, uint64_t length, bool reset_drain_trackers_and_accounting) override;
42
  SliceDataPtr extractMutableFrontSlice() override;
43
  Reservation reserveForRead() override;
44
1311273
  void postProcess() override { checkLowWatermark(); }
45
  void appendSliceForTest(const void* data, uint64_t size) override;
46
  void appendSliceForTest(absl::string_view data) override;
47

            
48
  void setWatermarks(uint64_t high_watermark, uint32_t overflow_watermark = 0) override;
49
75063
  uint64_t highWatermark() const override { return high_watermark_; }
50
  // Returns true if the high watermark callbacks have been called more recently
51
  // than the low watermark callbacks.
52
658903
  bool highWatermarkTriggered() const override { return above_high_watermark_called_; }
53

            
54
protected:
55
  virtual void checkHighAndOverflowWatermarks();
56
  virtual void checkLowWatermark();
57

            
58
2
  uint64_t overflowWatermarkForTestOnly() const { return overflow_watermark_; }
59

            
60
private:
61
  void commit(uint64_t length, absl::Span<RawSlice> slices,
62
              ReservationSlicesOwnerPtr slices_owner) override;
63

            
64
  std::function<void()> below_low_watermark_;
65
  std::function<void()> above_high_watermark_;
66
  std::function<void()> above_overflow_watermark_;
67

            
68
  // Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to
69
  // setWatermarks() the watermark callbacks will be called as described above.
70
  uint64_t high_watermark_{0};
71
  uint64_t low_watermark_{0};
72
  uint64_t overflow_watermark_{0};
73
  // Tracks the latest state of watermark callbacks.
74
  // True between the time above_high_watermark_ has been called until below_low_watermark_ has
75
  // been called.
76
  bool above_high_watermark_called_{false};
77
  // Set to true when above_overflow_watermark_ is called (and isn't cleared).
78
  bool above_overflow_watermark_called_{false};
79
};
80

            
81
using WatermarkBufferPtr = std::unique_ptr<WatermarkBuffer>;
82

            
83
class WatermarkBufferFactory;
84

            
85
/**
86
 * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and
87
 * slices that originate from those buffers, or are untagged and pass through an
88
 * associated buffer.
89
 *
90
 * This BufferMemoryAccount is produced by the *WatermarkBufferFactory*.
91
 */
92
class BufferMemoryAccountImpl : public BufferMemoryAccount {
93
public:
94
  // Used to create the account, and complete wiring with the factory
95
  // and shared_this_.
96
  static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory,
97
                                                    Http::StreamResetHandler& reset_handler);
98
332
  ~BufferMemoryAccountImpl() override {
99
    // The buffer_memory_allocated_ should always be zero on destruction, even
100
    // if we triggered a reset of the downstream. This is because the destructor
101
    // will only trigger when no entities have a pointer to the account, meaning
102
    // any slices which charge and credit the account should have credited the
103
    // account when they were deleted, maintaining this invariant.
104
332
    ASSERT(buffer_memory_allocated_ == 0);
105
332
    ASSERT(!reset_handler_.has_value());
106
332
  }
107

            
108
  // Make not copyable
109
  BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete;
110
  BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete;
111

            
112
  // Make not movable.
113
  BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete;
114
  BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete;
115

            
116
776
  uint64_t balance() const { return buffer_memory_allocated_; }
117
  void charge(uint64_t amount) override;
118
  void credit(uint64_t amount) override;
119

            
120
  // Clear the associated downstream, preparing the account to be destroyed.
121
  // This is idempotent.
122
  void clearDownstream() override;
123

            
124
224
  void resetDownstream() override {
125
224
    if (reset_handler_.has_value()) {
126
224
      reset_handler_->resetStream(Http::StreamResetReason::OverloadManager);
127
224
    }
128
224
  }
129

            
130
  // The number of memory classes the Account expects to exists. See
131
  // *WatermarkBufferFactory* for details on the memory classes.
132
  static constexpr uint32_t NUM_MEMORY_CLASSES_ = 8;
133

            
134
private:
135
  BufferMemoryAccountImpl(WatermarkBufferFactory* factory, Http::StreamResetHandler& reset_handler)
136
332
      : factory_(factory), reset_handler_(reset_handler) {}
137

            
138
  // Returns the class index based off of the buffer_memory_allocated_
139
  // This can differ with current_bucket_idx_ if buffer_memory_allocated_ was
140
  // just modified.
141
  // Returned class index, if present, is in the range [0, NUM_MEMORY_CLASSES_).
142
  absl::optional<uint32_t> balanceToClassIndex();
143
  void updateAccountClass();
144

            
145
  uint64_t buffer_memory_allocated_ = 0;
146
  // Current bucket index where the account is being tracked in.
147
  absl::optional<uint32_t> current_bucket_idx_{};
148

            
149
  WatermarkBufferFactory* factory_ = nullptr;
150

            
151
  OptRef<Http::StreamResetHandler> reset_handler_;
152
  // Keep a copy of the shared_ptr pointing to this account. We opted to go this
153
  // route rather than enable_shared_from_this to avoid wasteful atomic
154
  // operations e.g. when updating the tracking of the account.
155
  // This is set through the createAccount static method which is the only way to
156
  // instantiate an instance of this class. This should is cleared when
157
  // unregistering from the factory.
158
  BufferMemoryAccountSharedPtr shared_this_ = nullptr;
159
};
160

            
161
/**
162
 * The WatermarkBufferFactory creates *WatermarkBuffer*s and
163
 * *BufferMemoryAccountImpl* that can be used to bind to the created buffers
164
 * from a given downstream (and corresponding upstream, if one exists). The
165
 * accounts can then be used to reset the underlying stream.
166
 *
167
 * Any account produced by this factory might be tracked by the factory using the
168
 * following scheme:
169
 *
170
 * 1) Is the account balance >= 1MB? If not don't track.
171
 * 2) For all accounts above the minimum threshold for tracking, put the account
172
 *    into one of the *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets.
173
 *
174
 *    We keep buckets containing accounts within a "memory class", which are
175
 *    power of two buckets. For example, with a minimum threshold of 1MB, our
176
 *    first bucket contains [1MB, 2MB) accounts, the second bucket contains
177
 *    [2MB, 4MB), and so forth for
178
 *    *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. These buckets
179
 *    allow us to coarsely track accounts, and if overloaded we can easily
180
 *    target more expensive streams.
181
 *
182
 *    As the account balance changes, the account informs the Watermark Factory
183
 *    if the bucket for that account has changed. See
184
 *    *BufferMemoryAccountImpl::balanceToClassIndex()* for details on the memory
185
 *    class for a given account balance.
186
 *
187
 * TODO(kbaichoo): Update this documentation when we make the minimum account
188
 * threshold configurable.
189
 *
190
 */
191
class WatermarkBufferFactory : public WatermarkFactory {
192
public:
193
  WatermarkBufferFactory(const envoy::config::overload::v3::BufferFactoryConfig& config);
194

            
195
  // Buffer::WatermarkFactory
196
  ~WatermarkBufferFactory() override;
197
  InstancePtr createBuffer(std::function<void()> below_low_watermark,
198
                           std::function<void()> above_high_watermark,
199
479707
                           std::function<void()> above_overflow_watermark) override {
200
479707
    return std::make_unique<WatermarkBuffer>(below_low_watermark, above_high_watermark,
201
479707
                                             above_overflow_watermark);
202
479707
  }
203

            
204
  BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override;
205
  uint64_t resetAccountsGivenPressure(float pressure) override;
206

            
207
  // Called by BufferMemoryAccountImpls created by the factory on account class
208
  // updated.
209
  void updateAccountClass(const BufferMemoryAccountSharedPtr& account,
210
                          absl::optional<uint32_t> current_class,
211
                          absl::optional<uint32_t> new_class);
212

            
213
1033
  uint32_t bitshift() const { return bitshift_; }
214

            
215
  // Unregister a buffer memory account.
216
  virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account,
217
                                 absl::optional<uint32_t> current_class);
218

            
219
protected:
220
  // Enable subclasses to inspect the mapping.
221
  using MemoryClassesToAccountsSet = std::array<absl::flat_hash_set<BufferMemoryAccountSharedPtr>,
222
                                                BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>;
223
  MemoryClassesToAccountsSet size_class_account_sets_;
224
  // How much to bit shift right balances to test whether the account should be
225
  // tracked in *size_class_account_sets_*.
226
  const uint32_t bitshift_;
227
};
228

            
229
} // namespace Buffer
230
} // namespace Envoy