LCOV - code coverage report
Current view: top level - source/common/buffer - watermark_buffer.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 79 212 37.3 %
Date: 2024-01-05 06:35:25 Functions: 13 30 43.3 %

          Line data    Source code
       1             : #include "source/common/buffer/watermark_buffer.h"
       2             : #include "watermark_buffer.h"
       3             : 
       4             : #include <cstdint>
       5             : #include <memory>
       6             : 
       7             : #include "envoy/buffer/buffer.h"
       8             : 
       9             : #include "source/common/common/assert.h"
      10             : #include "source/common/common/logger.h"
      11             : #include "source/common/runtime/runtime_features.h"
      12             : 
      13             : namespace Envoy {
      14             : namespace Buffer {
      15             : namespace {
      16             : // Effectively disables tracking as this should zero out all reasonable account
      17             : // balances when shifted by this amount.
      18             : constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63;
      19             : // 50 is an arbitrary limit, and is meant to both limit the number of streams
      20             : // Envoy ends up resetting and avoid triggering the Watchdog system.
      21             : constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50;
      22             : } // end namespace
      23             : 
      24       39943 : void WatermarkBuffer::add(const void* data, uint64_t size) {
      25       39943 :   OwnedImpl::add(data, size);
      26       39943 :   checkHighAndOverflowWatermarks();
      27       39943 : }
      28             : 
      29        1069 : void WatermarkBuffer::add(absl::string_view data) {
      30        1069 :   OwnedImpl::add(data);
      31        1069 :   checkHighAndOverflowWatermarks();
      32        1069 : }
      33             : 
      34           0 : void WatermarkBuffer::add(const Instance& data) {
      35           0 :   OwnedImpl::add(data);
      36           0 :   checkHighAndOverflowWatermarks();
      37           0 : }
      38             : 
      39           0 : void WatermarkBuffer::prepend(absl::string_view data) {
      40           0 :   OwnedImpl::prepend(data);
      41           0 :   checkHighAndOverflowWatermarks();
      42           0 : }
      43             : 
      44           0 : void WatermarkBuffer::prepend(Instance& data) {
      45           0 :   OwnedImpl::prepend(data);
      46           0 :   checkHighAndOverflowWatermarks();
      47           0 : }
      48             : 
      49             : void WatermarkBuffer::commit(uint64_t length, absl::Span<RawSlice> slices,
      50        4368 :                              ReservationSlicesOwnerPtr slices_owner) {
      51        4368 :   OwnedImpl::commit(length, slices, std::move(slices_owner));
      52        4368 :   checkHighAndOverflowWatermarks();
      53        4368 : }
      54             : 
      55       30992 : void WatermarkBuffer::drain(uint64_t size) {
      56       30992 :   OwnedImpl::drain(size);
      57       30992 :   checkLowWatermark();
      58       30992 : }
      59             : 
      60        9528 : void WatermarkBuffer::move(Instance& rhs) {
      61        9528 :   OwnedImpl::move(rhs);
      62        9528 :   checkHighAndOverflowWatermarks();
      63        9528 : }
      64             : 
      65           0 : void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
      66           0 :   OwnedImpl::move(rhs, length);
      67           0 :   checkHighAndOverflowWatermarks();
      68           0 : }
      69             : 
      70             : void WatermarkBuffer::move(Instance& rhs, uint64_t length,
      71           0 :                            bool reset_drain_trackers_and_accounting) {
      72           0 :   OwnedImpl::move(rhs, length, reset_drain_trackers_and_accounting);
      73           0 :   checkHighAndOverflowWatermarks();
      74           0 : }
      75             : 
      76           0 : SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() {
      77           0 :   auto result = OwnedImpl::extractMutableFrontSlice();
      78           0 :   checkLowWatermark();
      79           0 :   return result;
      80           0 : }
      81             : 
      82             : // Adjust the reservation size based on space available before hitting
      83             : // the high watermark to avoid overshooting by a lot and thus violating the limits
      84             : // the watermark is imposing.
      85        4371 : Reservation WatermarkBuffer::reserveForRead() {
      86        4371 :   constexpr auto preferred_length = default_read_reservation_size_;
      87        4371 :   uint64_t adjusted_length = preferred_length;
      88             : 
      89        4371 :   if (high_watermark_ > 0 && preferred_length > 0) {
      90        2915 :     const uint64_t current_length = OwnedImpl::length();
      91        2915 :     if (current_length >= high_watermark_) {
      92             :       // Always allow a read of at least some data. The API doesn't allow returning
      93             :       // a zero-length reservation.
      94           0 :       adjusted_length = Slice::default_slice_size_;
      95        2915 :     } else {
      96        2915 :       const uint64_t available_length = high_watermark_ - current_length;
      97        2915 :       adjusted_length = IntUtil::roundUpToMultiple(available_length, Slice::default_slice_size_);
      98        2915 :       adjusted_length = std::min(adjusted_length, preferred_length);
      99        2915 :     }
     100        2915 :   }
     101             : 
     102        4371 :   return OwnedImpl::reserveWithMaxLength(adjusted_length);
     103        4371 : }
     104             : 
     105           0 : void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) {
     106           0 :   OwnedImpl::appendSliceForTest(data, size);
     107           0 :   checkHighAndOverflowWatermarks();
     108           0 : }
     109             : 
     110           0 : void WatermarkBuffer::appendSliceForTest(absl::string_view data) {
     111           0 :   appendSliceForTest(data.data(), data.size());
     112           0 : }
     113             : 
     114        3430 : size_t WatermarkBuffer::addFragments(absl::Span<const absl::string_view> fragments) {
     115        3430 :   size_t total_size_to_write = OwnedImpl::addFragments(fragments);
     116        3430 :   checkHighAndOverflowWatermarks();
     117        3430 :   return total_size_to_write;
     118        3430 : }
     119             : 
     120             : void WatermarkBuffer::setWatermarks(uint32_t high_watermark,
     121        9189 :                                     uint32_t overflow_watermark_multiplier) {
     122        9189 :   if (overflow_watermark_multiplier > 0 &&
     123        9189 :       (static_cast<uint64_t>(overflow_watermark_multiplier) * high_watermark) >
     124           0 :           std::numeric_limits<uint32_t>::max()) {
     125           0 :     ENVOY_LOG_MISC(debug, "Error setting overflow threshold: overflow_watermark_multiplier * "
     126           0 :                           "high_watermark is overflowing. Disabling overflow watermark.");
     127           0 :     overflow_watermark_multiplier = 0;
     128           0 :   }
     129        9189 :   low_watermark_ = high_watermark / 2;
     130        9189 :   high_watermark_ = high_watermark;
     131        9189 :   overflow_watermark_ = overflow_watermark_multiplier * high_watermark;
     132        9189 :   checkHighAndOverflowWatermarks();
     133        9189 :   checkLowWatermark();
     134        9189 : }
     135             : 
     136       63839 : void WatermarkBuffer::checkLowWatermark() {
     137       63839 :   if (!above_high_watermark_called_ ||
     138       63839 :       (high_watermark_ != 0 && OwnedImpl::length() > low_watermark_)) {
     139       63768 :     return;
     140       63768 :   }
     141             : 
     142          71 :   above_high_watermark_called_ = false;
     143          71 :   below_low_watermark_();
     144          71 : }
     145             : 
     146       67528 : void WatermarkBuffer::checkHighAndOverflowWatermarks() {
     147       67532 :   if (high_watermark_ == 0 || OwnedImpl::length() <= high_watermark_) {
     148       67142 :     return;
     149       67142 :   }
     150             : 
     151         390 :   if (!above_high_watermark_called_) {
     152          74 :     above_high_watermark_called_ = true;
     153          74 :     above_high_watermark_();
     154          74 :   }
     155             : 
     156             :   // Check if overflow watermark is enabled, wasn't previously triggered,
     157             :   // and the buffer size is above the threshold
     158         390 :   if (overflow_watermark_ != 0 && !above_overflow_watermark_called_ &&
     159         390 :       OwnedImpl::length() > overflow_watermark_) {
     160           0 :     above_overflow_watermark_called_ = true;
     161           0 :     above_overflow_watermark_();
     162           0 :   }
     163         390 : }
     164             : 
     165             : BufferMemoryAccountSharedPtr
     166         612 : WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) {
     167         612 :   if (bitshift_ == kEffectivelyDisableTrackingBitshift) {
     168         612 :     return nullptr; // No tracking
     169         612 :   }
     170           0 :   return BufferMemoryAccountImpl::createAccount(this, reset_handler);
     171         612 : }
     172             : 
     173             : void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account,
     174             :                                                 absl::optional<uint32_t> current_class,
     175           0 :                                                 absl::optional<uint32_t> new_class) {
     176           0 :   ASSERT(current_class != new_class, "Expected the current_class and new_class to be different");
     177             : 
     178           0 :   if (!current_class.has_value()) {
     179             :     // Start tracking
     180           0 :     ASSERT(new_class.has_value());
     181           0 :     ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
     182           0 :     size_class_account_sets_[new_class.value()].insert(account);
     183           0 :   } else if (!new_class.has_value()) {
     184             :     // No longer track
     185           0 :     ASSERT(current_class.has_value());
     186           0 :     ASSERT(size_class_account_sets_[current_class.value()].contains(account));
     187           0 :     size_class_account_sets_[current_class.value()].erase(account);
     188           0 :   } else {
     189             :     // Moving between buckets
     190           0 :     ASSERT(size_class_account_sets_[current_class.value()].contains(account));
     191           0 :     ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
     192           0 :     size_class_account_sets_[new_class.value()].insert(
     193           0 :         std::move(size_class_account_sets_[current_class.value()].extract(account).value()));
     194           0 :   }
     195           0 : }
     196             : 
     197             : void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account,
     198           0 :                                                absl::optional<uint32_t> current_class) {
     199           0 :   if (current_class.has_value()) {
     200           0 :     ASSERT(size_class_account_sets_[current_class.value()].contains(account));
     201           0 :     size_class_account_sets_[current_class.value()].erase(account);
     202           0 :   }
     203           0 : }
     204             : 
     205           0 : uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) {
     206           0 :   ASSERT(pressure >= 0.0 && pressure <= 1.0, "Provided pressure is out of range [0, 1].");
     207             : 
     208             :   // Compute buckets to clear
     209           0 :   const uint32_t buckets_to_clear = std::min<uint32_t>(
     210           0 :       std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8);
     211             : 
     212             :   // Clear buckets, prioritizing the buckets with larger streams.
     213           0 :   uint32_t num_streams_reset = 0;
     214           0 :   uint32_t num_buckets_reset = 0;
     215           0 :   for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) {
     216           0 :     const uint32_t bucket_to_clear =
     217           0 :         BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1;
     218           0 :     absl::flat_hash_set<BufferMemoryAccountSharedPtr>& bucket =
     219           0 :         size_class_account_sets_[bucket_to_clear];
     220             : 
     221           0 :     if (bucket.empty()) {
     222           0 :       continue;
     223           0 :     }
     224           0 :     ++num_buckets_reset;
     225             : 
     226           0 :     auto it = bucket.begin();
     227           0 :     while (it != bucket.end() && num_streams_reset < kMaxNumberOfStreamsToResetPerInvocation) {
     228           0 :       auto next = std::next(it);
     229             :       // This will trigger an erase, which avoids rehashing and invalidates the
     230             :       // iterator *it*. *next* is still valid.
     231           0 :       (*it)->resetDownstream();
     232           0 :       it = next;
     233           0 :       ++num_streams_reset;
     234           0 :     }
     235           0 :   }
     236           0 :   if (num_buckets_reset > 0) {
     237           0 :     ENVOY_LOG_MISC(warn, "resetting {} streams in {} buckets, {} empty buckets", num_streams_reset,
     238           0 :                    num_buckets_reset, buckets_to_clear - num_buckets_reset);
     239           0 :   }
     240           0 :   return num_streams_reset;
     241           0 : }
     242             : 
     243             : WatermarkBufferFactory::WatermarkBufferFactory(
     244             :     const envoy::config::overload::v3::BufferFactoryConfig& config)
     245             :     : bitshift_(config.minimum_account_to_track_power_of_two()
     246             :                     ? config.minimum_account_to_track_power_of_two() - 1
     247        1400 :                     : kEffectivelyDisableTrackingBitshift) {}
     248             : 
     249        1400 : WatermarkBufferFactory::~WatermarkBufferFactory() {
     250       11200 :   for (auto& account_set : size_class_account_sets_) {
     251       11200 :     ASSERT(account_set.empty(),
     252       11200 :            "Expected all Accounts to have unregistered from the Watermark Factory.");
     253       11200 :   }
     254        1400 : }
     255             : 
     256             : BufferMemoryAccountSharedPtr
     257             : BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory,
     258           0 :                                        Http::StreamResetHandler& reset_handler) {
     259             :   // We use shared_ptr ctor directly rather than make shared since the
     260             :   // constructor being invoked is private as we want users to use this static
     261             :   // method to createAccounts.
     262           0 :   auto account =
     263           0 :       std::shared_ptr<BufferMemoryAccount>(new BufferMemoryAccountImpl(factory, reset_handler));
     264             :   // Set shared_this_ in the account.
     265           0 :   static_cast<BufferMemoryAccountImpl*>(account.get())->shared_this_ = account;
     266           0 :   return account;
     267           0 : }
     268             : 
     269           0 : absl::optional<uint32_t> BufferMemoryAccountImpl::balanceToClassIndex() {
     270           0 :   const uint64_t shifted_balance = buffer_memory_allocated_ >> factory_->bitshift();
     271             : 
     272           0 :   if (shifted_balance == 0) {
     273           0 :     return {}; // Not worth tracking anything < configured minimum threshold
     274           0 :   }
     275             : 
     276           0 :   const int class_idx = absl::bit_width(shifted_balance) - 1;
     277           0 :   return std::min<uint32_t>(class_idx, NUM_MEMORY_CLASSES_ - 1);
     278           0 : }
     279             : 
     280           0 : void BufferMemoryAccountImpl::updateAccountClass() {
     281           0 :   auto new_class = balanceToClassIndex();
     282           0 :   if (shared_this_ && new_class != current_bucket_idx_) {
     283           0 :     factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class);
     284           0 :     current_bucket_idx_ = new_class;
     285           0 :   }
     286           0 : }
     287             : 
     288           0 : void BufferMemoryAccountImpl::credit(uint64_t amount) {
     289           0 :   ASSERT(buffer_memory_allocated_ >= amount);
     290           0 :   buffer_memory_allocated_ -= amount;
     291           0 :   updateAccountClass();
     292           0 : }
     293             : 
     294           0 : void BufferMemoryAccountImpl::charge(uint64_t amount) {
     295             :   // Check overflow
     296           0 :   ASSERT(std::numeric_limits<uint64_t>::max() - buffer_memory_allocated_ >= amount);
     297           0 :   buffer_memory_allocated_ += amount;
     298           0 :   updateAccountClass();
     299           0 : }
     300             : 
     301           0 : void BufferMemoryAccountImpl::clearDownstream() {
     302           0 :   if (reset_handler_.has_value()) {
     303           0 :     reset_handler_.reset();
     304           0 :     factory_->unregisterAccount(shared_this_, current_bucket_idx_);
     305           0 :     current_bucket_idx_.reset();
     306           0 :     shared_this_ = nullptr;
     307           0 :   }
     308           0 : }
     309             : 
     310             : } // namespace Buffer
     311             : } // namespace Envoy

Generated by: LCOV version 1.15