/proc/self/cwd/source/common/buffer/watermark_buffer.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <functional> |
4 | | #include <string> |
5 | | |
6 | | #include "envoy/buffer/buffer.h" |
7 | | #include "envoy/common/optref.h" |
8 | | #include "envoy/config/overload/v3/overload.pb.h" |
9 | | |
10 | | #include "source/common/buffer/buffer_impl.h" |
11 | | |
12 | | namespace Envoy { |
13 | | namespace Buffer { |
14 | | |
15 | | // A subclass of OwnedImpl which does watermark validation. |
16 | | // Each time the buffer is resized (written to or drained), the watermarks are checked. As the |
17 | | // buffer size transitions from under the low watermark to above the high watermark, the |
18 | | // above_high_watermark function is called one time. It will not be called again until the buffer |
19 | | // is drained below the low watermark, at which point the below_low_watermark function is called. |
20 | | // If the buffer size is above the overflow watermark, above_overflow_watermark is called. |
21 | | // It is only called on the first time the buffer overflows. |
22 | | class WatermarkBuffer : public OwnedImpl { |
23 | | public: |
24 | | WatermarkBuffer(std::function<void()> below_low_watermark, |
25 | | std::function<void()> above_high_watermark, |
26 | | std::function<void()> above_overflow_watermark) |
27 | | : below_low_watermark_(below_low_watermark), above_high_watermark_(above_high_watermark), |
28 | 424k | above_overflow_watermark_(above_overflow_watermark) {} |
29 | | |
30 | | // Override all functions from Instance which can result in changing the size |
31 | | // of the underlying buffer. |
32 | | void add(const void* data, uint64_t size) override; |
33 | | void add(absl::string_view data) override; |
34 | | void add(const Instance& data) override; |
35 | | void prepend(absl::string_view data) override; |
36 | | void prepend(Instance& data) override; |
37 | | size_t addFragments(absl::Span<const absl::string_view> fragments) override; |
38 | | void drain(uint64_t size) override; |
39 | | void move(Instance& rhs) override; |
40 | | void move(Instance& rhs, uint64_t length) override; |
41 | | void move(Instance& rhs, uint64_t length, bool reset_drain_trackers_and_accounting) override; |
42 | | SliceDataPtr extractMutableFrontSlice() override; |
43 | | Reservation reserveForRead() override; |
44 | 385k | void postProcess() override { checkLowWatermark(); } |
45 | | void appendSliceForTest(const void* data, uint64_t size) override; |
46 | | void appendSliceForTest(absl::string_view data) override; |
47 | | |
48 | | void setWatermarks(uint32_t high_watermark, uint32_t overflow_watermark = 0) override; |
49 | 1.81k | uint32_t highWatermark() const override { return high_watermark_; } |
50 | | // Returns true if the high watermark callbacks have been called more recently |
51 | | // than the low watermark callbacks. |
52 | 638k | bool highWatermarkTriggered() const override { return above_high_watermark_called_; } |
53 | | |
54 | | protected: |
55 | | virtual void checkHighAndOverflowWatermarks(); |
56 | | virtual void checkLowWatermark(); |
57 | | |
58 | | private: |
59 | | void commit(uint64_t length, absl::Span<RawSlice> slices, |
60 | | ReservationSlicesOwnerPtr slices_owner) override; |
61 | | |
62 | | std::function<void()> below_low_watermark_; |
63 | | std::function<void()> above_high_watermark_; |
64 | | std::function<void()> above_overflow_watermark_; |
65 | | |
66 | | // Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to |
67 | | // setWatermarks() the watermark callbacks will be called as described above. |
68 | | uint32_t high_watermark_{0}; |
69 | | uint32_t low_watermark_{0}; |
70 | | uint32_t overflow_watermark_{0}; |
71 | | // Tracks the latest state of watermark callbacks. |
72 | | // True between the time above_high_watermark_ has been called until below_low_watermark_ has |
73 | | // been called. |
74 | | bool above_high_watermark_called_{false}; |
75 | | // Set to true when above_overflow_watermark_ is called (and isn't cleared). |
76 | | bool above_overflow_watermark_called_{false}; |
77 | | }; |
78 | | |
79 | | using WatermarkBufferPtr = std::unique_ptr<WatermarkBuffer>; |
80 | | |
81 | | class WatermarkBufferFactory; |
82 | | |
83 | | /** |
84 | | * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and |
85 | | * slices that originate from those buffers, or are untagged and pass through an |
86 | | * associated buffer. |
87 | | * |
88 | | * This BufferMemoryAccount is produced by the *WatermarkBufferFactory*. |
89 | | */ |
90 | | class BufferMemoryAccountImpl : public BufferMemoryAccount { |
91 | | public: |
92 | | // Used to create the account, and complete wiring with the factory |
93 | | // and shared_this_. |
94 | | static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory, |
95 | | Http::StreamResetHandler& reset_handler); |
96 | 0 | ~BufferMemoryAccountImpl() override { |
97 | | // The buffer_memory_allocated_ should always be zero on destruction, even |
98 | | // if we triggered a reset of the downstream. This is because the destructor |
99 | | // will only trigger when no entities have a pointer to the account, meaning |
100 | | // any slices which charge and credit the account should have credited the |
101 | | // account when they were deleted, maintaining this invariant. |
102 | 0 | ASSERT(buffer_memory_allocated_ == 0); |
103 | 0 | ASSERT(!reset_handler_.has_value()); |
104 | 0 | } |
105 | | |
106 | | // Make not copyable |
107 | | BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete; |
108 | | BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete; |
109 | | |
110 | | // Make not movable. |
111 | | BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete; |
112 | | BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete; |
113 | | |
114 | 0 | uint64_t balance() const { return buffer_memory_allocated_; } |
115 | | void charge(uint64_t amount) override; |
116 | | void credit(uint64_t amount) override; |
117 | | |
118 | | // Clear the associated downstream, preparing the account to be destroyed. |
119 | | // This is idempotent. |
120 | | void clearDownstream() override; |
121 | | |
122 | 0 | void resetDownstream() override { |
123 | 0 | if (reset_handler_.has_value()) { |
124 | 0 | reset_handler_->resetStream(Http::StreamResetReason::OverloadManager); |
125 | 0 | } |
126 | 0 | } |
127 | | |
128 | | // The number of memory classes the Account expects to exists. See |
129 | | // *WatermarkBufferFactory* for details on the memory classes. |
130 | | static constexpr uint32_t NUM_MEMORY_CLASSES_ = 8; |
131 | | |
132 | | private: |
133 | | BufferMemoryAccountImpl(WatermarkBufferFactory* factory, Http::StreamResetHandler& reset_handler) |
134 | 0 | : factory_(factory), reset_handler_(reset_handler) {} |
135 | | |
136 | | // Returns the class index based off of the buffer_memory_allocated_ |
137 | | // This can differ with current_bucket_idx_ if buffer_memory_allocated_ was |
138 | | // just modified. |
139 | | // Returned class index, if present, is in the range [0, NUM_MEMORY_CLASSES_). |
140 | | absl::optional<uint32_t> balanceToClassIndex(); |
141 | | void updateAccountClass(); |
142 | | |
143 | | uint64_t buffer_memory_allocated_ = 0; |
144 | | // Current bucket index where the account is being tracked in. |
145 | | absl::optional<uint32_t> current_bucket_idx_{}; |
146 | | |
147 | | WatermarkBufferFactory* factory_ = nullptr; |
148 | | |
149 | | OptRef<Http::StreamResetHandler> reset_handler_; |
150 | | // Keep a copy of the shared_ptr pointing to this account. We opted to go this |
151 | | // route rather than enable_shared_from_this to avoid wasteful atomic |
152 | | // operations e.g. when updating the tracking of the account. |
153 | | // This is set through the createAccount static method which is the only way to |
154 | | // instantiate an instance of this class. This should is cleared when |
155 | | // unregistering from the factory. |
156 | | BufferMemoryAccountSharedPtr shared_this_ = nullptr; |
157 | | }; |
158 | | |
159 | | /** |
160 | | * The WatermarkBufferFactory creates *WatermarkBuffer*s and |
161 | | * *BufferMemoryAccountImpl* that can be used to bind to the created buffers |
162 | | * from a given downstream (and corresponding upstream, if one exists). The |
163 | | * accounts can then be used to reset the underlying stream. |
164 | | * |
165 | | * Any account produced by this factory might be tracked by the factory using the |
166 | | * following scheme: |
167 | | * |
168 | | * 1) Is the account balance >= 1MB? If not don't track. |
169 | | * 2) For all accounts above the minimum threshold for tracking, put the account |
170 | | * into one of the *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. |
171 | | * |
172 | | * We keep buckets containing accounts within a "memory class", which are |
173 | | * power of two buckets. For example, with a minimum threshold of 1MB, our |
174 | | * first bucket contains [1MB, 2MB) accounts, the second bucket contains |
175 | | * [2MB, 4MB), and so forth for |
176 | | * *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. These buckets |
177 | | * allow us to coarsely track accounts, and if overloaded we can easily |
178 | | * target more expensive streams. |
179 | | * |
180 | | * As the account balance changes, the account informs the Watermark Factory |
181 | | * if the bucket for that account has changed. See |
182 | | * *BufferMemoryAccountImpl::balanceToClassIndex()* for details on the memory |
183 | | * class for a given account balance. |
184 | | * |
185 | | * TODO(kbaichoo): Update this documentation when we make the minimum account |
186 | | * threshold configurable. |
187 | | * |
188 | | */ |
189 | | class WatermarkBufferFactory : public WatermarkFactory { |
190 | | public: |
191 | | WatermarkBufferFactory(const envoy::config::overload::v3::BufferFactoryConfig& config); |
192 | | |
193 | | // Buffer::WatermarkFactory |
194 | | ~WatermarkBufferFactory() override; |
195 | | InstancePtr createBuffer(std::function<void()> below_low_watermark, |
196 | | std::function<void()> above_high_watermark, |
197 | 39.3k | std::function<void()> above_overflow_watermark) override { |
198 | 39.3k | return std::make_unique<WatermarkBuffer>(below_low_watermark, above_high_watermark, |
199 | 39.3k | above_overflow_watermark); |
200 | 39.3k | } |
201 | | |
202 | | BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; |
203 | | uint64_t resetAccountsGivenPressure(float pressure) override; |
204 | | |
205 | | // Called by BufferMemoryAccountImpls created by the factory on account class |
206 | | // updated. |
207 | | void updateAccountClass(const BufferMemoryAccountSharedPtr& account, |
208 | | absl::optional<uint32_t> current_class, |
209 | | absl::optional<uint32_t> new_class); |
210 | | |
211 | 0 | uint32_t bitshift() const { return bitshift_; } |
212 | | |
213 | | // Unregister a buffer memory account. |
214 | | virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, |
215 | | absl::optional<uint32_t> current_class); |
216 | | |
217 | | protected: |
218 | | // Enable subclasses to inspect the mapping. |
219 | | using MemoryClassesToAccountsSet = std::array<absl::flat_hash_set<BufferMemoryAccountSharedPtr>, |
220 | | BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>; |
221 | | MemoryClassesToAccountsSet size_class_account_sets_; |
222 | | // How much to bit shift right balances to test whether the account should be |
223 | | // tracked in *size_class_account_sets_*. |
224 | | const uint32_t bitshift_; |
225 | | }; |
226 | | |
227 | | } // namespace Buffer |
228 | | } // namespace Envoy |