Coverage Report

Created: 2024-09-19 09:45

/proc/self/cwd/source/common/buffer/watermark_buffer.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/buffer/watermark_buffer.h"
2
3
#include <cstdint>
4
#include <memory>
5
6
#include "envoy/buffer/buffer.h"
7
8
#include "source/common/common/assert.h"
9
#include "source/common/common/logger.h"
10
#include "source/common/runtime/runtime_features.h"
11
12
namespace Envoy {
13
namespace Buffer {
14
namespace {
15
// Effectively disables tracking as this should zero out all reasonable account
16
// balances when shifted by this amount.
17
constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63;
18
// 50 is an arbitrary limit, and is meant to both limit the number of streams
19
// Envoy ends up resetting and avoid triggering the Watchdog system.
20
constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50;
21
} // end namespace
22
23
396k
void WatermarkBuffer::add(const void* data, uint64_t size) {
24
396k
  OwnedImpl::add(data, size);
25
396k
  checkHighAndOverflowWatermarks();
26
396k
}
27
28
26.9k
void WatermarkBuffer::add(absl::string_view data) {
29
26.9k
  OwnedImpl::add(data);
30
26.9k
  checkHighAndOverflowWatermarks();
31
26.9k
}
32
33
0
void WatermarkBuffer::add(const Instance& data) {
34
0
  OwnedImpl::add(data);
35
0
  checkHighAndOverflowWatermarks();
36
0
}
37
38
0
void WatermarkBuffer::prepend(absl::string_view data) {
39
0
  OwnedImpl::prepend(data);
40
0
  checkHighAndOverflowWatermarks();
41
0
}
42
43
0
void WatermarkBuffer::prepend(Instance& data) {
44
0
  OwnedImpl::prepend(data);
45
0
  checkHighAndOverflowWatermarks();
46
0
}
47
48
void WatermarkBuffer::commit(uint64_t length, absl::Span<RawSlice> slices,
49
26.4k
                             ReservationSlicesOwnerPtr slices_owner) {
50
26.4k
  OwnedImpl::commit(length, slices, std::move(slices_owner));
51
26.4k
  checkHighAndOverflowWatermarks();
52
26.4k
}
53
54
251k
void WatermarkBuffer::drain(uint64_t size) {
55
251k
  OwnedImpl::drain(size);
56
251k
  checkLowWatermark();
57
251k
}
58
59
69.0k
void WatermarkBuffer::move(Instance& rhs) {
60
69.0k
  OwnedImpl::move(rhs);
61
69.0k
  checkHighAndOverflowWatermarks();
62
69.0k
}
63
64
0
void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
65
0
  OwnedImpl::move(rhs, length);
66
0
  checkHighAndOverflowWatermarks();
67
0
}
68
69
void WatermarkBuffer::move(Instance& rhs, uint64_t length,
70
0
                           bool reset_drain_trackers_and_accounting) {
71
0
  OwnedImpl::move(rhs, length, reset_drain_trackers_and_accounting);
72
0
  checkHighAndOverflowWatermarks();
73
0
}
74
75
0
SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() {
76
0
  auto result = OwnedImpl::extractMutableFrontSlice();
77
0
  checkLowWatermark();
78
0
  return result;
79
0
}
80
81
// Adjust the reservation size based on space available before hitting
82
// the high watermark to avoid overshooting by a lot and thus violating the limits
83
// the watermark is imposing.
84
26.4k
Reservation WatermarkBuffer::reserveForRead() {
85
26.4k
  constexpr auto preferred_length = default_read_reservation_size_;
86
26.4k
  uint64_t adjusted_length = preferred_length;
87
88
26.4k
  if (high_watermark_ > 0 && preferred_length > 0) {
89
19.1k
    const uint64_t current_length = OwnedImpl::length();
90
19.1k
    if (current_length >= high_watermark_) {
91
      // Always allow a read of at least some data. The API doesn't allow returning
92
      // a zero-length reservation.
93
0
      adjusted_length = Slice::default_slice_size_;
94
19.1k
    } else {
95
19.1k
      const uint64_t available_length = high_watermark_ - current_length;
96
19.1k
      adjusted_length = IntUtil::roundUpToMultiple(available_length, Slice::default_slice_size_);
97
19.1k
      adjusted_length = std::min(adjusted_length, preferred_length);
98
19.1k
    }
99
19.1k
  }
100
101
26.4k
  return OwnedImpl::reserveWithMaxLength(adjusted_length);
102
26.4k
}
103
104
0
void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) {
105
0
  OwnedImpl::appendSliceForTest(data, size);
106
0
  checkHighAndOverflowWatermarks();
107
0
}
108
109
0
void WatermarkBuffer::appendSliceForTest(absl::string_view data) {
110
0
  appendSliceForTest(data.data(), data.size());
111
0
}
112
113
93.9k
size_t WatermarkBuffer::addFragments(absl::Span<const absl::string_view> fragments) {
114
93.9k
  size_t total_size_to_write = OwnedImpl::addFragments(fragments);
115
93.9k
  checkHighAndOverflowWatermarks();
116
93.9k
  return total_size_to_write;
117
93.9k
}
118
119
void WatermarkBuffer::setWatermarks(uint32_t high_watermark,
120
395k
                                    uint32_t overflow_watermark_multiplier) {
121
395k
  if (overflow_watermark_multiplier > 0 &&
122
395k
      (static_cast<uint64_t>(overflow_watermark_multiplier) * high_watermark) >
123
0
          std::numeric_limits<uint32_t>::max()) {
124
0
    ENVOY_LOG_MISC(debug, "Error setting overflow threshold: overflow_watermark_multiplier * "
125
0
                          "high_watermark is overflowing. Disabling overflow watermark.");
126
0
    overflow_watermark_multiplier = 0;
127
0
  }
128
395k
  low_watermark_ = high_watermark / 2;
129
395k
  high_watermark_ = high_watermark;
130
395k
  overflow_watermark_ = overflow_watermark_multiplier * high_watermark;
131
395k
  checkHighAndOverflowWatermarks();
132
395k
  checkLowWatermark();
133
395k
}
134
135
874k
void WatermarkBuffer::checkLowWatermark() {
136
874k
  if (!above_high_watermark_called_ ||
137
874k
      (high_watermark_ != 0 && OwnedImpl::length() > low_watermark_)) {
138
872k
    return;
139
872k
  }
140
141
2.08k
  above_high_watermark_called_ = false;
142
2.08k
  below_low_watermark_();
143
2.08k
}
144
145
1.00M
void WatermarkBuffer::checkHighAndOverflowWatermarks() {
146
1.00M
  if (high_watermark_ == 0 || OwnedImpl::length() <= high_watermark_) {
147
1.00M
    return;
148
1.00M
  }
149
150
4.80k
  if (!above_high_watermark_called_) {
151
2.64k
    above_high_watermark_called_ = true;
152
2.64k
    above_high_watermark_();
153
2.64k
  }
154
155
  // Check if overflow watermark is enabled, wasn't previously triggered,
156
  // and the buffer size is above the threshold
157
4.80k
  if (overflow_watermark_ != 0 && !above_overflow_watermark_called_ &&
158
4.80k
      OwnedImpl::length() > overflow_watermark_) {
159
0
    above_overflow_watermark_called_ = true;
160
0
    above_overflow_watermark_();
161
0
  }
162
4.80k
}
163
164
BufferMemoryAccountSharedPtr
165
1.51k
WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) {
166
1.51k
  if (bitshift_ == kEffectivelyDisableTrackingBitshift) {
167
1.51k
    return nullptr; // No tracking
168
1.51k
  }
169
0
  return BufferMemoryAccountImpl::createAccount(this, reset_handler);
170
1.51k
}
171
172
void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account,
173
                                                absl::optional<uint32_t> current_class,
174
0
                                                absl::optional<uint32_t> new_class) {
175
0
  ASSERT(current_class != new_class, "Expected the current_class and new_class to be different");
176
177
0
  if (!current_class.has_value()) {
178
    // Start tracking
179
0
    ASSERT(new_class.has_value());
180
0
    ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
181
0
    size_class_account_sets_[new_class.value()].insert(account);
182
0
  } else if (!new_class.has_value()) {
183
    // No longer track
184
0
    ASSERT(current_class.has_value());
185
0
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
186
0
    size_class_account_sets_[current_class.value()].erase(account);
187
0
  } else {
188
    // Moving between buckets
189
0
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
190
0
    ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
191
0
    size_class_account_sets_[new_class.value()].insert(
192
0
        std::move(size_class_account_sets_[current_class.value()].extract(account).value()));
193
0
  }
194
0
}
195
196
void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account,
197
0
                                               absl::optional<uint32_t> current_class) {
198
0
  if (current_class.has_value()) {
199
0
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
200
0
    size_class_account_sets_[current_class.value()].erase(account);
201
0
  }
202
0
}
203
204
0
uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) {
205
0
  ASSERT(pressure >= 0.0 && pressure <= 1.0, "Provided pressure is out of range [0, 1].");
206
207
  // Compute buckets to clear
208
0
  const uint32_t buckets_to_clear = std::min<uint32_t>(
209
0
      std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8);
210
211
  // Clear buckets, prioritizing the buckets with larger streams.
212
0
  uint32_t num_streams_reset = 0;
213
0
  uint32_t num_buckets_reset = 0;
214
0
  for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) {
215
0
    const uint32_t bucket_to_clear =
216
0
        BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1;
217
0
    absl::flat_hash_set<BufferMemoryAccountSharedPtr>& bucket =
218
0
        size_class_account_sets_[bucket_to_clear];
219
220
0
    if (bucket.empty()) {
221
0
      continue;
222
0
    }
223
0
    ++num_buckets_reset;
224
225
0
    auto it = bucket.begin();
226
0
    while (it != bucket.end() && num_streams_reset < kMaxNumberOfStreamsToResetPerInvocation) {
227
0
      auto next = std::next(it);
228
      // This will trigger an erase, which avoids rehashing and invalidates the
229
      // iterator *it*. *next* is still valid.
230
0
      (*it)->resetDownstream();
231
0
      it = next;
232
0
      ++num_streams_reset;
233
0
    }
234
0
  }
235
0
  if (num_buckets_reset > 0) {
236
0
    ENVOY_LOG_MISC(warn, "resetting {} streams in {} buckets, {} empty buckets", num_streams_reset,
237
0
                   num_buckets_reset, buckets_to_clear - num_buckets_reset);
238
0
  }
239
0
  return num_streams_reset;
240
0
}
241
242
WatermarkBufferFactory::WatermarkBufferFactory(
243
    const envoy::config::overload::v3::BufferFactoryConfig& config)
244
    : bitshift_(config.minimum_account_to_track_power_of_two()
245
                    ? config.minimum_account_to_track_power_of_two() - 1
246
23.0k
                    : kEffectivelyDisableTrackingBitshift) {}
247
248
23.0k
WatermarkBufferFactory::~WatermarkBufferFactory() {
249
184k
  for (auto& account_set : size_class_account_sets_) {
250
184k
    ASSERT(account_set.empty(),
251
184k
           "Expected all Accounts to have unregistered from the Watermark Factory.");
252
184k
  }
253
23.0k
}
254
255
BufferMemoryAccountSharedPtr
256
BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory,
257
0
                                       Http::StreamResetHandler& reset_handler) {
258
  // We use shared_ptr ctor directly rather than make shared since the
259
  // constructor being invoked is private as we want users to use this static
260
  // method to createAccounts.
261
0
  auto account =
262
0
      std::shared_ptr<BufferMemoryAccount>(new BufferMemoryAccountImpl(factory, reset_handler));
263
  // Set shared_this_ in the account.
264
0
  static_cast<BufferMemoryAccountImpl*>(account.get())->shared_this_ = account;
265
0
  return account;
266
0
}
267
268
0
absl::optional<uint32_t> BufferMemoryAccountImpl::balanceToClassIndex() {
269
0
  const uint64_t shifted_balance = buffer_memory_allocated_ >> factory_->bitshift();
270
271
0
  if (shifted_balance == 0) {
272
0
    return {}; // Not worth tracking anything < configured minimum threshold
273
0
  }
274
275
0
  const int class_idx = absl::bit_width(shifted_balance) - 1;
276
0
  return std::min<uint32_t>(class_idx, NUM_MEMORY_CLASSES_ - 1);
277
0
}
278
279
0
void BufferMemoryAccountImpl::updateAccountClass() {
280
0
  auto new_class = balanceToClassIndex();
281
0
  if (shared_this_ && new_class != current_bucket_idx_) {
282
0
    factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class);
283
0
    current_bucket_idx_ = new_class;
284
0
  }
285
0
}
286
287
0
void BufferMemoryAccountImpl::credit(uint64_t amount) {
288
0
  ASSERT(buffer_memory_allocated_ >= amount);
289
0
  buffer_memory_allocated_ -= amount;
290
0
  updateAccountClass();
291
0
}
292
293
0
void BufferMemoryAccountImpl::charge(uint64_t amount) {
294
  // Check overflow
295
0
  ASSERT(std::numeric_limits<uint64_t>::max() - buffer_memory_allocated_ >= amount);
296
0
  buffer_memory_allocated_ += amount;
297
0
  updateAccountClass();
298
0
}
299
300
0
void BufferMemoryAccountImpl::clearDownstream() {
301
0
  if (reset_handler_.has_value()) {
302
0
    reset_handler_.reset();
303
0
    factory_->unregisterAccount(shared_this_, current_bucket_idx_);
304
0
    current_bucket_idx_.reset();
305
0
    shared_this_ = nullptr;
306
0
  }
307
0
}
308
309
} // namespace Buffer
310
} // namespace Envoy