Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/common/buffer/watermark_buffer.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/common/buffer/watermark_buffer.h"
2
#include "watermark_buffer.h"
3
4
#include <cstdint>
5
#include <memory>
6
7
#include "envoy/buffer/buffer.h"
8
9
#include "source/common/common/assert.h"
10
#include "source/common/common/logger.h"
11
#include "source/common/runtime/runtime_features.h"
12
13
namespace Envoy {
14
namespace Buffer {
15
namespace {
16
// Effectively disables tracking as this should zero out all reasonable account
17
// balances when shifted by this amount.
18
constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63;
19
// 50 is an arbitrary limit, and is meant to both limit the number of streams
20
// Envoy ends up resetting and avoid triggering the Watchdog system.
21
constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50;
22
} // end namespace
23
24
679k
void WatermarkBuffer::add(const void* data, uint64_t size) {
25
679k
  OwnedImpl::add(data, size);
26
679k
  checkHighAndOverflowWatermarks();
27
679k
}
28
29
37.4k
void WatermarkBuffer::add(absl::string_view data) {
30
37.4k
  OwnedImpl::add(data);
31
37.4k
  checkHighAndOverflowWatermarks();
32
37.4k
}
33
34
0
void WatermarkBuffer::add(const Instance& data) {
35
0
  OwnedImpl::add(data);
36
0
  checkHighAndOverflowWatermarks();
37
0
}
38
39
0
void WatermarkBuffer::prepend(absl::string_view data) {
40
0
  OwnedImpl::prepend(data);
41
0
  checkHighAndOverflowWatermarks();
42
0
}
43
44
0
void WatermarkBuffer::prepend(Instance& data) {
45
0
  OwnedImpl::prepend(data);
46
0
  checkHighAndOverflowWatermarks();
47
0
}
48
49
void WatermarkBuffer::commit(uint64_t length, absl::Span<RawSlice> slices,
50
47.9k
                             ReservationSlicesOwnerPtr slices_owner) {
51
47.9k
  OwnedImpl::commit(length, slices, std::move(slices_owner));
52
47.9k
  checkHighAndOverflowWatermarks();
53
47.9k
}
54
55
435k
void WatermarkBuffer::drain(uint64_t size) {
56
435k
  OwnedImpl::drain(size);
57
435k
  checkLowWatermark();
58
435k
}
59
60
95.9k
void WatermarkBuffer::move(Instance& rhs) {
61
95.9k
  OwnedImpl::move(rhs);
62
95.9k
  checkHighAndOverflowWatermarks();
63
95.9k
}
64
65
0
void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
66
0
  OwnedImpl::move(rhs, length);
67
0
  checkHighAndOverflowWatermarks();
68
0
}
69
70
void WatermarkBuffer::move(Instance& rhs, uint64_t length,
71
0
                           bool reset_drain_trackers_and_accounting) {
72
0
  OwnedImpl::move(rhs, length, reset_drain_trackers_and_accounting);
73
0
  checkHighAndOverflowWatermarks();
74
0
}
75
76
0
SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() {
77
0
  auto result = OwnedImpl::extractMutableFrontSlice();
78
0
  checkLowWatermark();
79
0
  return result;
80
0
}
81
82
// Adjust the reservation size based on space available before hitting
83
// the high watermark to avoid overshooting by a lot and thus violating the limits
84
// the watermark is imposing.
85
47.9k
Reservation WatermarkBuffer::reserveForRead() {
86
47.9k
  constexpr auto preferred_length = default_read_reservation_size_;
87
47.9k
  uint64_t adjusted_length = preferred_length;
88
89
47.9k
  if (high_watermark_ > 0 && preferred_length > 0) {
90
25.0k
    const uint64_t current_length = OwnedImpl::length();
91
25.0k
    if (current_length >= high_watermark_) {
92
      // Always allow a read of at least some data. The API doesn't allow returning
93
      // a zero-length reservation.
94
0
      adjusted_length = Slice::default_slice_size_;
95
25.0k
    } else {
96
25.0k
      const uint64_t available_length = high_watermark_ - current_length;
97
25.0k
      adjusted_length = IntUtil::roundUpToMultiple(available_length, Slice::default_slice_size_);
98
25.0k
      adjusted_length = std::min(adjusted_length, preferred_length);
99
25.0k
    }
100
25.0k
  }
101
102
47.9k
  return OwnedImpl::reserveWithMaxLength(adjusted_length);
103
47.9k
}
104
105
0
void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) {
106
0
  OwnedImpl::appendSliceForTest(data, size);
107
0
  checkHighAndOverflowWatermarks();
108
0
}
109
110
0
void WatermarkBuffer::appendSliceForTest(absl::string_view data) {
111
0
  appendSliceForTest(data.data(), data.size());
112
0
}
113
114
126k
size_t WatermarkBuffer::addFragments(absl::Span<const absl::string_view> fragments) {
115
126k
  size_t total_size_to_write = OwnedImpl::addFragments(fragments);
116
126k
  checkHighAndOverflowWatermarks();
117
126k
  return total_size_to_write;
118
126k
}
119
120
void WatermarkBuffer::setWatermarks(uint32_t high_watermark,
121
405k
                                    uint32_t overflow_watermark_multiplier) {
122
405k
  if (overflow_watermark_multiplier > 0 &&
123
405k
      (static_cast<uint64_t>(overflow_watermark_multiplier) * high_watermark) >
124
0
          std::numeric_limits<uint32_t>::max()) {
125
0
    ENVOY_LOG_MISC(debug, "Error setting overflow threshold: overflow_watermark_multiplier * "
126
0
                          "high_watermark is overflowing. Disabling overflow watermark.");
127
0
    overflow_watermark_multiplier = 0;
128
0
  }
129
405k
  low_watermark_ = high_watermark / 2;
130
405k
  high_watermark_ = high_watermark;
131
405k
  overflow_watermark_ = overflow_watermark_multiplier * high_watermark;
132
405k
  checkHighAndOverflowWatermarks();
133
405k
  checkLowWatermark();
134
405k
}
135
136
1.22M
void WatermarkBuffer::checkLowWatermark() {
137
1.22M
  if (!above_high_watermark_called_ ||
138
1.22M
      (high_watermark_ != 0 && OwnedImpl::length() > low_watermark_)) {
139
1.22M
    return;
140
1.22M
  }
141
142
2.27k
  above_high_watermark_called_ = false;
143
2.27k
  below_low_watermark_();
144
2.27k
}
145
146
1.39M
void WatermarkBuffer::checkHighAndOverflowWatermarks() {
147
1.39M
  if (high_watermark_ == 0 || OwnedImpl::length() <= high_watermark_) {
148
1.38M
    return;
149
1.38M
  }
150
151
7.94k
  if (!above_high_watermark_called_) {
152
2.92k
    above_high_watermark_called_ = true;
153
2.92k
    above_high_watermark_();
154
2.92k
  }
155
156
  // Check if overflow watermark is enabled, wasn't previously triggered,
157
  // and the buffer size is above the threshold
158
7.94k
  if (overflow_watermark_ != 0 && !above_overflow_watermark_called_ &&
159
7.94k
      OwnedImpl::length() > overflow_watermark_) {
160
0
    above_overflow_watermark_called_ = true;
161
0
    above_overflow_watermark_();
162
0
  }
163
7.94k
}
164
165
BufferMemoryAccountSharedPtr
166
4.48k
WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) {
167
4.48k
  if (bitshift_ == kEffectivelyDisableTrackingBitshift) {
168
4.48k
    return nullptr; // No tracking
169
4.48k
  }
170
0
  return BufferMemoryAccountImpl::createAccount(this, reset_handler);
171
4.48k
}
172
173
void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account,
174
                                                absl::optional<uint32_t> current_class,
175
0
                                                absl::optional<uint32_t> new_class) {
176
0
  ASSERT(current_class != new_class, "Expected the current_class and new_class to be different");
177
178
0
  if (!current_class.has_value()) {
179
    // Start tracking
180
0
    ASSERT(new_class.has_value());
181
0
    ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
182
0
    size_class_account_sets_[new_class.value()].insert(account);
183
0
  } else if (!new_class.has_value()) {
184
    // No longer track
185
0
    ASSERT(current_class.has_value());
186
0
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
187
0
    size_class_account_sets_[current_class.value()].erase(account);
188
0
  } else {
189
    // Moving between buckets
190
0
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
191
0
    ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
192
0
    size_class_account_sets_[new_class.value()].insert(
193
0
        std::move(size_class_account_sets_[current_class.value()].extract(account).value()));
194
0
  }
195
0
}
196
197
void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account,
198
0
                                               absl::optional<uint32_t> current_class) {
199
0
  if (current_class.has_value()) {
200
0
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
201
0
    size_class_account_sets_[current_class.value()].erase(account);
202
0
  }
203
0
}
204
205
0
uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) {
206
0
  ASSERT(pressure >= 0.0 && pressure <= 1.0, "Provided pressure is out of range [0, 1].");
207
208
  // Compute buckets to clear
209
0
  const uint32_t buckets_to_clear = std::min<uint32_t>(
210
0
      std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8);
211
212
0
  uint32_t last_bucket_to_clear = BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_to_clear;
213
0
  ENVOY_LOG_MISC(warn, "resetting streams in buckets >= {}", last_bucket_to_clear);
214
215
  // Clear buckets, prioritizing the buckets with larger streams.
216
0
  uint32_t num_streams_reset = 0;
217
0
  for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) {
218
0
    const uint32_t bucket_to_clear =
219
0
        BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1;
220
0
    ENVOY_LOG_MISC(warn, "resetting {} streams in bucket {}.",
221
0
                   size_class_account_sets_[bucket_to_clear].size(), bucket_to_clear);
222
223
0
    auto it = size_class_account_sets_[bucket_to_clear].begin();
224
0
    while (it != size_class_account_sets_[bucket_to_clear].end()) {
225
0
      if (num_streams_reset >= kMaxNumberOfStreamsToResetPerInvocation) {
226
0
        return num_streams_reset;
227
0
      }
228
0
      auto next = std::next(it);
229
      // This will trigger an erase, which avoids rehashing and invalidates the
230
      // iterator *it*. *next* is still valid.
231
0
      (*it)->resetDownstream();
232
0
      it = next;
233
0
      ++num_streams_reset;
234
0
    }
235
0
  }
236
237
0
  return num_streams_reset;
238
0
}
239
240
WatermarkBufferFactory::WatermarkBufferFactory(
241
    const envoy::config::overload::v3::BufferFactoryConfig& config)
242
    : bitshift_(config.minimum_account_to_track_power_of_two()
243
                    ? config.minimum_account_to_track_power_of_two() - 1
244
30.4k
                    : kEffectivelyDisableTrackingBitshift) {}
245
246
30.4k
WatermarkBufferFactory::~WatermarkBufferFactory() {
247
243k
  for (auto& account_set : size_class_account_sets_) {
248
243k
    ASSERT(account_set.empty(),
249
243k
           "Expected all Accounts to have unregistered from the Watermark Factory.");
250
243k
  }
251
30.4k
}
252
253
BufferMemoryAccountSharedPtr
254
BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory,
255
0
                                       Http::StreamResetHandler& reset_handler) {
256
  // We use shared_ptr ctor directly rather than make shared since the
257
  // constructor being invoked is private as we want users to use this static
258
  // method to createAccounts.
259
0
  auto account =
260
0
      std::shared_ptr<BufferMemoryAccount>(new BufferMemoryAccountImpl(factory, reset_handler));
261
  // Set shared_this_ in the account.
262
0
  static_cast<BufferMemoryAccountImpl*>(account.get())->shared_this_ = account;
263
0
  return account;
264
0
}
265
266
0
absl::optional<uint32_t> BufferMemoryAccountImpl::balanceToClassIndex() {
267
0
  const uint64_t shifted_balance = buffer_memory_allocated_ >> factory_->bitshift();
268
269
0
  if (shifted_balance == 0) {
270
0
    return {}; // Not worth tracking anything < configured minimum threshold
271
0
  }
272
273
0
  const int class_idx = absl::bit_width(shifted_balance) - 1;
274
0
  return std::min<uint32_t>(class_idx, NUM_MEMORY_CLASSES_ - 1);
275
0
}
276
277
0
void BufferMemoryAccountImpl::updateAccountClass() {
278
0
  auto new_class = balanceToClassIndex();
279
0
  if (shared_this_ && new_class != current_bucket_idx_) {
280
0
    factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class);
281
0
    current_bucket_idx_ = new_class;
282
0
  }
283
0
}
284
285
0
void BufferMemoryAccountImpl::credit(uint64_t amount) {
286
0
  ASSERT(buffer_memory_allocated_ >= amount);
287
0
  buffer_memory_allocated_ -= amount;
288
0
  updateAccountClass();
289
0
}
290
291
0
void BufferMemoryAccountImpl::charge(uint64_t amount) {
292
  // Check overflow
293
0
  ASSERT(std::numeric_limits<uint64_t>::max() - buffer_memory_allocated_ >= amount);
294
0
  buffer_memory_allocated_ += amount;
295
0
  updateAccountClass();
296
0
}
297
298
0
void BufferMemoryAccountImpl::clearDownstream() {
299
0
  if (reset_handler_.has_value()) {
300
0
    reset_handler_.reset();
301
0
    factory_->unregisterAccount(shared_this_, current_bucket_idx_);
302
0
    current_bucket_idx_.reset();
303
0
    shared_this_ = nullptr;
304
0
  }
305
0
}
306
307
} // namespace Buffer
308
} // namespace Envoy