Line data Source code
1 : #include "source/common/buffer/watermark_buffer.h" 2 : #include "watermark_buffer.h" 3 : 4 : #include <cstdint> 5 : #include <memory> 6 : 7 : #include "envoy/buffer/buffer.h" 8 : 9 : #include "source/common/common/assert.h" 10 : #include "source/common/common/logger.h" 11 : #include "source/common/runtime/runtime_features.h" 12 : 13 : namespace Envoy { 14 : namespace Buffer { 15 : namespace { 16 : // Effectively disables tracking as this should zero out all reasonable account 17 : // balances when shifted by this amount. 18 : constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63; 19 : // 50 is an arbitrary limit, and is meant to both limit the number of streams 20 : // Envoy ends up resetting and avoid triggering the Watchdog system. 21 : constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50; 22 : } // end namespace 23 : 24 39943 : void WatermarkBuffer::add(const void* data, uint64_t size) { 25 39943 : OwnedImpl::add(data, size); 26 39943 : checkHighAndOverflowWatermarks(); 27 39943 : } 28 : 29 1069 : void WatermarkBuffer::add(absl::string_view data) { 30 1069 : OwnedImpl::add(data); 31 1069 : checkHighAndOverflowWatermarks(); 32 1069 : } 33 : 34 0 : void WatermarkBuffer::add(const Instance& data) { 35 0 : OwnedImpl::add(data); 36 0 : checkHighAndOverflowWatermarks(); 37 0 : } 38 : 39 0 : void WatermarkBuffer::prepend(absl::string_view data) { 40 0 : OwnedImpl::prepend(data); 41 0 : checkHighAndOverflowWatermarks(); 42 0 : } 43 : 44 0 : void WatermarkBuffer::prepend(Instance& data) { 45 0 : OwnedImpl::prepend(data); 46 0 : checkHighAndOverflowWatermarks(); 47 0 : } 48 : 49 : void WatermarkBuffer::commit(uint64_t length, absl::Span<RawSlice> slices, 50 4368 : ReservationSlicesOwnerPtr slices_owner) { 51 4368 : OwnedImpl::commit(length, slices, std::move(slices_owner)); 52 4368 : checkHighAndOverflowWatermarks(); 53 4368 : } 54 : 55 30992 : void WatermarkBuffer::drain(uint64_t size) { 56 30992 : OwnedImpl::drain(size); 57 30992 : checkLowWatermark(); 58 30992 : } 59 : 60 9528 : void WatermarkBuffer::move(Instance& rhs) { 61 9528 : OwnedImpl::move(rhs); 62 9528 : checkHighAndOverflowWatermarks(); 63 9528 : } 64 : 65 0 : void WatermarkBuffer::move(Instance& rhs, uint64_t length) { 66 0 : OwnedImpl::move(rhs, length); 67 0 : checkHighAndOverflowWatermarks(); 68 0 : } 69 : 70 : void WatermarkBuffer::move(Instance& rhs, uint64_t length, 71 0 : bool reset_drain_trackers_and_accounting) { 72 0 : OwnedImpl::move(rhs, length, reset_drain_trackers_and_accounting); 73 0 : checkHighAndOverflowWatermarks(); 74 0 : } 75 : 76 0 : SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() { 77 0 : auto result = OwnedImpl::extractMutableFrontSlice(); 78 0 : checkLowWatermark(); 79 0 : return result; 80 0 : } 81 : 82 : // Adjust the reservation size based on space available before hitting 83 : // the high watermark to avoid overshooting by a lot and thus violating the limits 84 : // the watermark is imposing. 85 4371 : Reservation WatermarkBuffer::reserveForRead() { 86 4371 : constexpr auto preferred_length = default_read_reservation_size_; 87 4371 : uint64_t adjusted_length = preferred_length; 88 : 89 4371 : if (high_watermark_ > 0 && preferred_length > 0) { 90 2915 : const uint64_t current_length = OwnedImpl::length(); 91 2915 : if (current_length >= high_watermark_) { 92 : // Always allow a read of at least some data. The API doesn't allow returning 93 : // a zero-length reservation. 94 0 : adjusted_length = Slice::default_slice_size_; 95 2915 : } else { 96 2915 : const uint64_t available_length = high_watermark_ - current_length; 97 2915 : adjusted_length = IntUtil::roundUpToMultiple(available_length, Slice::default_slice_size_); 98 2915 : adjusted_length = std::min(adjusted_length, preferred_length); 99 2915 : } 100 2915 : } 101 : 102 4371 : return OwnedImpl::reserveWithMaxLength(adjusted_length); 103 4371 : } 104 : 105 0 : void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) { 106 0 : OwnedImpl::appendSliceForTest(data, size); 107 0 : checkHighAndOverflowWatermarks(); 108 0 : } 109 : 110 0 : void WatermarkBuffer::appendSliceForTest(absl::string_view data) { 111 0 : appendSliceForTest(data.data(), data.size()); 112 0 : } 113 : 114 3430 : size_t WatermarkBuffer::addFragments(absl::Span<const absl::string_view> fragments) { 115 3430 : size_t total_size_to_write = OwnedImpl::addFragments(fragments); 116 3430 : checkHighAndOverflowWatermarks(); 117 3430 : return total_size_to_write; 118 3430 : } 119 : 120 : void WatermarkBuffer::setWatermarks(uint32_t high_watermark, 121 9189 : uint32_t overflow_watermark_multiplier) { 122 9189 : if (overflow_watermark_multiplier > 0 && 123 9189 : (static_cast<uint64_t>(overflow_watermark_multiplier) * high_watermark) > 124 0 : std::numeric_limits<uint32_t>::max()) { 125 0 : ENVOY_LOG_MISC(debug, "Error setting overflow threshold: overflow_watermark_multiplier * " 126 0 : "high_watermark is overflowing. Disabling overflow watermark."); 127 0 : overflow_watermark_multiplier = 0; 128 0 : } 129 9189 : low_watermark_ = high_watermark / 2; 130 9189 : high_watermark_ = high_watermark; 131 9189 : overflow_watermark_ = overflow_watermark_multiplier * high_watermark; 132 9189 : checkHighAndOverflowWatermarks(); 133 9189 : checkLowWatermark(); 134 9189 : } 135 : 136 63839 : void WatermarkBuffer::checkLowWatermark() { 137 63839 : if (!above_high_watermark_called_ || 138 63839 : (high_watermark_ != 0 && OwnedImpl::length() > low_watermark_)) { 139 63768 : return; 140 63768 : } 141 : 142 71 : above_high_watermark_called_ = false; 143 71 : below_low_watermark_(); 144 71 : } 145 : 146 67528 : void WatermarkBuffer::checkHighAndOverflowWatermarks() { 147 67532 : if (high_watermark_ == 0 || OwnedImpl::length() <= high_watermark_) { 148 67142 : return; 149 67142 : } 150 : 151 390 : if (!above_high_watermark_called_) { 152 74 : above_high_watermark_called_ = true; 153 74 : above_high_watermark_(); 154 74 : } 155 : 156 : // Check if overflow watermark is enabled, wasn't previously triggered, 157 : // and the buffer size is above the threshold 158 390 : if (overflow_watermark_ != 0 && !above_overflow_watermark_called_ && 159 390 : OwnedImpl::length() > overflow_watermark_) { 160 0 : above_overflow_watermark_called_ = true; 161 0 : above_overflow_watermark_(); 162 0 : } 163 390 : } 164 : 165 : BufferMemoryAccountSharedPtr 166 612 : WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) { 167 612 : if (bitshift_ == kEffectivelyDisableTrackingBitshift) { 168 612 : return nullptr; // No tracking 169 612 : } 170 0 : return BufferMemoryAccountImpl::createAccount(this, reset_handler); 171 612 : } 172 : 173 : void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account, 174 : absl::optional<uint32_t> current_class, 175 0 : absl::optional<uint32_t> new_class) { 176 0 : ASSERT(current_class != new_class, "Expected the current_class and new_class to be different"); 177 : 178 0 : if (!current_class.has_value()) { 179 : // Start tracking 180 0 : ASSERT(new_class.has_value()); 181 0 : ASSERT(!size_class_account_sets_[new_class.value()].contains(account)); 182 0 : size_class_account_sets_[new_class.value()].insert(account); 183 0 : } else if (!new_class.has_value()) { 184 : // No longer track 185 0 : ASSERT(current_class.has_value()); 186 0 : ASSERT(size_class_account_sets_[current_class.value()].contains(account)); 187 0 : size_class_account_sets_[current_class.value()].erase(account); 188 0 : } else { 189 : // Moving between buckets 190 0 : ASSERT(size_class_account_sets_[current_class.value()].contains(account)); 191 0 : ASSERT(!size_class_account_sets_[new_class.value()].contains(account)); 192 0 : size_class_account_sets_[new_class.value()].insert( 193 0 : std::move(size_class_account_sets_[current_class.value()].extract(account).value())); 194 0 : } 195 0 : } 196 : 197 : void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account, 198 0 : absl::optional<uint32_t> current_class) { 199 0 : if (current_class.has_value()) { 200 0 : ASSERT(size_class_account_sets_[current_class.value()].contains(account)); 201 0 : size_class_account_sets_[current_class.value()].erase(account); 202 0 : } 203 0 : } 204 : 205 0 : uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) { 206 0 : ASSERT(pressure >= 0.0 && pressure <= 1.0, "Provided pressure is out of range [0, 1]."); 207 : 208 : // Compute buckets to clear 209 0 : const uint32_t buckets_to_clear = std::min<uint32_t>( 210 0 : std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8); 211 : 212 : // Clear buckets, prioritizing the buckets with larger streams. 213 0 : uint32_t num_streams_reset = 0; 214 0 : uint32_t num_buckets_reset = 0; 215 0 : for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) { 216 0 : const uint32_t bucket_to_clear = 217 0 : BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1; 218 0 : absl::flat_hash_set<BufferMemoryAccountSharedPtr>& bucket = 219 0 : size_class_account_sets_[bucket_to_clear]; 220 : 221 0 : if (bucket.empty()) { 222 0 : continue; 223 0 : } 224 0 : ++num_buckets_reset; 225 : 226 0 : auto it = bucket.begin(); 227 0 : while (it != bucket.end() && num_streams_reset < kMaxNumberOfStreamsToResetPerInvocation) { 228 0 : auto next = std::next(it); 229 : // This will trigger an erase, which avoids rehashing and invalidates the 230 : // iterator *it*. *next* is still valid. 231 0 : (*it)->resetDownstream(); 232 0 : it = next; 233 0 : ++num_streams_reset; 234 0 : } 235 0 : } 236 0 : if (num_buckets_reset > 0) { 237 0 : ENVOY_LOG_MISC(warn, "resetting {} streams in {} buckets, {} empty buckets", num_streams_reset, 238 0 : num_buckets_reset, buckets_to_clear - num_buckets_reset); 239 0 : } 240 0 : return num_streams_reset; 241 0 : } 242 : 243 : WatermarkBufferFactory::WatermarkBufferFactory( 244 : const envoy::config::overload::v3::BufferFactoryConfig& config) 245 : : bitshift_(config.minimum_account_to_track_power_of_two() 246 : ? config.minimum_account_to_track_power_of_two() - 1 247 1400 : : kEffectivelyDisableTrackingBitshift) {} 248 : 249 1400 : WatermarkBufferFactory::~WatermarkBufferFactory() { 250 11200 : for (auto& account_set : size_class_account_sets_) { 251 11200 : ASSERT(account_set.empty(), 252 11200 : "Expected all Accounts to have unregistered from the Watermark Factory."); 253 11200 : } 254 1400 : } 255 : 256 : BufferMemoryAccountSharedPtr 257 : BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory, 258 0 : Http::StreamResetHandler& reset_handler) { 259 : // We use shared_ptr ctor directly rather than make shared since the 260 : // constructor being invoked is private as we want users to use this static 261 : // method to createAccounts. 262 0 : auto account = 263 0 : std::shared_ptr<BufferMemoryAccount>(new BufferMemoryAccountImpl(factory, reset_handler)); 264 : // Set shared_this_ in the account. 265 0 : static_cast<BufferMemoryAccountImpl*>(account.get())->shared_this_ = account; 266 0 : return account; 267 0 : } 268 : 269 0 : absl::optional<uint32_t> BufferMemoryAccountImpl::balanceToClassIndex() { 270 0 : const uint64_t shifted_balance = buffer_memory_allocated_ >> factory_->bitshift(); 271 : 272 0 : if (shifted_balance == 0) { 273 0 : return {}; // Not worth tracking anything < configured minimum threshold 274 0 : } 275 : 276 0 : const int class_idx = absl::bit_width(shifted_balance) - 1; 277 0 : return std::min<uint32_t>(class_idx, NUM_MEMORY_CLASSES_ - 1); 278 0 : } 279 : 280 0 : void BufferMemoryAccountImpl::updateAccountClass() { 281 0 : auto new_class = balanceToClassIndex(); 282 0 : if (shared_this_ && new_class != current_bucket_idx_) { 283 0 : factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class); 284 0 : current_bucket_idx_ = new_class; 285 0 : } 286 0 : } 287 : 288 0 : void BufferMemoryAccountImpl::credit(uint64_t amount) { 289 0 : ASSERT(buffer_memory_allocated_ >= amount); 290 0 : buffer_memory_allocated_ -= amount; 291 0 : updateAccountClass(); 292 0 : } 293 : 294 0 : void BufferMemoryAccountImpl::charge(uint64_t amount) { 295 : // Check overflow 296 0 : ASSERT(std::numeric_limits<uint64_t>::max() - buffer_memory_allocated_ >= amount); 297 0 : buffer_memory_allocated_ += amount; 298 0 : updateAccountClass(); 299 0 : } 300 : 301 0 : void BufferMemoryAccountImpl::clearDownstream() { 302 0 : if (reset_handler_.has_value()) { 303 0 : reset_handler_.reset(); 304 0 : factory_->unregisterAccount(shared_this_, current_bucket_idx_); 305 0 : current_bucket_idx_.reset(); 306 0 : shared_this_ = nullptr; 307 0 : } 308 0 : } 309 : 310 : } // namespace Buffer 311 : } // namespace Envoy