Line data Source code
1 : #pragma once 2 : 3 : #include <functional> 4 : #include <string> 5 : 6 : #include "envoy/buffer/buffer.h" 7 : #include "envoy/common/optref.h" 8 : #include "envoy/config/overload/v3/overload.pb.h" 9 : 10 : #include "source/common/buffer/buffer_impl.h" 11 : 12 : namespace Envoy { 13 : namespace Buffer { 14 : 15 : // A subclass of OwnedImpl which does watermark validation. 16 : // Each time the buffer is resized (written to or drained), the watermarks are checked. As the 17 : // buffer size transitions from under the low watermark to above the high watermark, the 18 : // above_high_watermark function is called one time. It will not be called again until the buffer 19 : // is drained below the low watermark, at which point the below_low_watermark function is called. 20 : // If the buffer size is above the overflow watermark, above_overflow_watermark is called. 21 : // It is only called on the first time the buffer overflows. 22 : class WatermarkBuffer : public OwnedImpl { 23 : public: 24 : WatermarkBuffer(std::function<void()> below_low_watermark, 25 : std::function<void()> above_high_watermark, 26 : std::function<void()> above_overflow_watermark) 27 : : below_low_watermark_(below_low_watermark), above_high_watermark_(above_high_watermark), 28 12015 : above_overflow_watermark_(above_overflow_watermark) {} 29 : 30 : // Override all functions from Instance which can result in changing the size 31 : // of the underlying buffer. 32 : void add(const void* data, uint64_t size) override; 33 : void add(absl::string_view data) override; 34 : void add(const Instance& data) override; 35 : void prepend(absl::string_view data) override; 36 : void prepend(Instance& data) override; 37 : size_t addFragments(absl::Span<const absl::string_view> fragments) override; 38 : void drain(uint64_t size) override; 39 : void move(Instance& rhs) override; 40 : void move(Instance& rhs, uint64_t length) override; 41 : void move(Instance& rhs, uint64_t length, bool reset_drain_trackers_and_accounting) override; 42 : SliceDataPtr extractMutableFrontSlice() override; 43 : Reservation reserveForRead() override; 44 23658 : void postProcess() override { checkLowWatermark(); } 45 : void appendSliceForTest(const void* data, uint64_t size) override; 46 : void appendSliceForTest(absl::string_view data) override; 47 : 48 : void setWatermarks(uint32_t high_watermark, uint32_t overflow_watermark = 0) override; 49 212 : uint32_t highWatermark() const override { return high_watermark_; } 50 : // Returns true if the high watermark callbacks have been called more recently 51 : // than the low watermark callbacks. 52 38892 : bool highWatermarkTriggered() const override { return above_high_watermark_called_; } 53 : 54 : protected: 55 : virtual void checkHighAndOverflowWatermarks(); 56 : virtual void checkLowWatermark(); 57 : 58 : private: 59 : void commit(uint64_t length, absl::Span<RawSlice> slices, 60 : ReservationSlicesOwnerPtr slices_owner) override; 61 : 62 : std::function<void()> below_low_watermark_; 63 : std::function<void()> above_high_watermark_; 64 : std::function<void()> above_overflow_watermark_; 65 : 66 : // Used for enforcing buffer limits (off by default). If these are set to non-zero by a call to 67 : // setWatermarks() the watermark callbacks will be called as described above. 68 : uint32_t high_watermark_{0}; 69 : uint32_t low_watermark_{0}; 70 : uint32_t overflow_watermark_{0}; 71 : // Tracks the latest state of watermark callbacks. 72 : // True between the time above_high_watermark_ has been called until below_low_watermark_ has 73 : // been called. 74 : bool above_high_watermark_called_{false}; 75 : // Set to true when above_overflow_watermark_ is called (and isn't cleared). 76 : bool above_overflow_watermark_called_{false}; 77 : }; 78 : 79 : using WatermarkBufferPtr = std::unique_ptr<WatermarkBuffer>; 80 : 81 : class WatermarkBufferFactory; 82 : 83 : /** 84 : * A BufferMemoryAccountImpl tracks allocated bytes across associated buffers and 85 : * slices that originate from those buffers, or are untagged and pass through an 86 : * associated buffer. 87 : * 88 : * This BufferMemoryAccount is produced by the *WatermarkBufferFactory*. 89 : */ 90 : class BufferMemoryAccountImpl : public BufferMemoryAccount { 91 : public: 92 : // Used to create the account, and complete wiring with the factory 93 : // and shared_this_. 94 : static BufferMemoryAccountSharedPtr createAccount(WatermarkBufferFactory* factory, 95 : Http::StreamResetHandler& reset_handler); 96 0 : ~BufferMemoryAccountImpl() override { 97 : // The buffer_memory_allocated_ should always be zero on destruction, even 98 : // if we triggered a reset of the downstream. This is because the destructor 99 : // will only trigger when no entities have a pointer to the account, meaning 100 : // any slices which charge and credit the account should have credited the 101 : // account when they were deleted, maintaining this invariant. 102 0 : ASSERT(buffer_memory_allocated_ == 0); 103 0 : ASSERT(!reset_handler_.has_value()); 104 0 : } 105 : 106 : // Make not copyable 107 : BufferMemoryAccountImpl(const BufferMemoryAccountImpl&) = delete; 108 : BufferMemoryAccountImpl& operator=(const BufferMemoryAccountImpl&) = delete; 109 : 110 : // Make not movable. 111 : BufferMemoryAccountImpl(BufferMemoryAccountImpl&&) = delete; 112 : BufferMemoryAccountImpl& operator=(BufferMemoryAccountImpl&&) = delete; 113 : 114 0 : uint64_t balance() const { return buffer_memory_allocated_; } 115 : void charge(uint64_t amount) override; 116 : void credit(uint64_t amount) override; 117 : 118 : // Clear the associated downstream, preparing the account to be destroyed. 119 : // This is idempotent. 120 : void clearDownstream() override; 121 : 122 0 : void resetDownstream() override { 123 0 : if (reset_handler_.has_value()) { 124 0 : reset_handler_->resetStream(Http::StreamResetReason::OverloadManager); 125 0 : } 126 0 : } 127 : 128 : // The number of memory classes the Account expects to exists. See 129 : // *WatermarkBufferFactory* for details on the memory classes. 130 : static constexpr uint32_t NUM_MEMORY_CLASSES_ = 8; 131 : 132 : private: 133 : BufferMemoryAccountImpl(WatermarkBufferFactory* factory, Http::StreamResetHandler& reset_handler) 134 0 : : factory_(factory), reset_handler_(reset_handler) {} 135 : 136 : // Returns the class index based off of the buffer_memory_allocated_ 137 : // This can differ with current_bucket_idx_ if buffer_memory_allocated_ was 138 : // just modified. 139 : // Returned class index, if present, is in the range [0, NUM_MEMORY_CLASSES_). 140 : absl::optional<uint32_t> balanceToClassIndex(); 141 : void updateAccountClass(); 142 : 143 : uint64_t buffer_memory_allocated_ = 0; 144 : // Current bucket index where the account is being tracked in. 145 : absl::optional<uint32_t> current_bucket_idx_{}; 146 : 147 : WatermarkBufferFactory* factory_ = nullptr; 148 : 149 : OptRef<Http::StreamResetHandler> reset_handler_; 150 : // Keep a copy of the shared_ptr pointing to this account. We opted to go this 151 : // route rather than enable_shared_from_this to avoid wasteful atomic 152 : // operations e.g. when updating the tracking of the account. 153 : // This is set through the createAccount static method which is the only way to 154 : // instantiate an instance of this class. This should is cleared when 155 : // unregistering from the factory. 156 : BufferMemoryAccountSharedPtr shared_this_ = nullptr; 157 : }; 158 : 159 : /** 160 : * The WatermarkBufferFactory creates *WatermarkBuffer*s and 161 : * *BufferMemoryAccountImpl* that can be used to bind to the created buffers 162 : * from a given downstream (and corresponding upstream, if one exists). The 163 : * accounts can then be used to reset the underlying stream. 164 : * 165 : * Any account produced by this factory might be tracked by the factory using the 166 : * following scheme: 167 : * 168 : * 1) Is the account balance >= 1MB? If not don't track. 169 : * 2) For all accounts above the minimum threshold for tracking, put the account 170 : * into one of the *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. 171 : * 172 : * We keep buckets containing accounts within a "memory class", which are 173 : * power of two buckets. For example, with a minimum threshold of 1MB, our 174 : * first bucket contains [1MB, 2MB) accounts, the second bucket contains 175 : * [2MB, 4MB), and so forth for 176 : * *BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_* buckets. These buckets 177 : * allow us to coarsely track accounts, and if overloaded we can easily 178 : * target more expensive streams. 179 : * 180 : * As the account balance changes, the account informs the Watermark Factory 181 : * if the bucket for that account has changed. See 182 : * *BufferMemoryAccountImpl::balanceToClassIndex()* for details on the memory 183 : * class for a given account balance. 184 : * 185 : * TODO(kbaichoo): Update this documentation when we make the minimum account 186 : * threshold configurable. 187 : * 188 : */ 189 : class WatermarkBufferFactory : public WatermarkFactory { 190 : public: 191 : WatermarkBufferFactory(const envoy::config::overload::v3::BufferFactoryConfig& config); 192 : 193 : // Buffer::WatermarkFactory 194 : ~WatermarkBufferFactory() override; 195 : InstancePtr createBuffer(std::function<void()> below_low_watermark, 196 : std::function<void()> above_high_watermark, 197 4793 : std::function<void()> above_overflow_watermark) override { 198 4793 : return std::make_unique<WatermarkBuffer>(below_low_watermark, above_high_watermark, 199 4793 : above_overflow_watermark); 200 4793 : } 201 : 202 : BufferMemoryAccountSharedPtr createAccount(Http::StreamResetHandler& reset_handler) override; 203 : uint64_t resetAccountsGivenPressure(float pressure) override; 204 : 205 : // Called by BufferMemoryAccountImpls created by the factory on account class 206 : // updated. 207 : void updateAccountClass(const BufferMemoryAccountSharedPtr& account, 208 : absl::optional<uint32_t> current_class, 209 : absl::optional<uint32_t> new_class); 210 : 211 0 : uint32_t bitshift() const { return bitshift_; } 212 : 213 : // Unregister a buffer memory account. 214 : virtual void unregisterAccount(const BufferMemoryAccountSharedPtr& account, 215 : absl::optional<uint32_t> current_class); 216 : 217 : protected: 218 : // Enable subclasses to inspect the mapping. 219 : using MemoryClassesToAccountsSet = std::array<absl::flat_hash_set<BufferMemoryAccountSharedPtr>, 220 : BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_>; 221 : MemoryClassesToAccountsSet size_class_account_sets_; 222 : // How much to bit shift right balances to test whether the account should be 223 : // tracked in *size_class_account_sets_*. 224 : const uint32_t bitshift_; 225 : }; 226 : 227 : } // namespace Buffer 228 : } // namespace Envoy