1
#include "source/common/buffer/watermark_buffer.h"
2

            
3
#include <cstdint>
4
#include <memory>
5

            
6
#include "envoy/buffer/buffer.h"
7

            
8
#include "source/common/common/assert.h"
9
#include "source/common/common/logger.h"
10
#include "source/common/runtime/runtime_features.h"
11

            
12
namespace Envoy {
13
namespace Buffer {
14
namespace {
15
// Effectively disables tracking as this should zero out all reasonable account
16
// balances when shifted by this amount.
17
constexpr uint32_t kEffectivelyDisableTrackingBitshift = 63;
18
// 50 is an arbitrary limit, and is meant to both limit the number of streams
19
// Envoy ends up resetting and avoid triggering the Watchdog system.
20
constexpr uint32_t kMaxNumberOfStreamsToResetPerInvocation = 50;
21
} // end namespace
22

            
23
700140
void WatermarkBuffer::add(const void* data, uint64_t size) {
24
700140
  OwnedImpl::add(data, size);
25
700140
  checkHighAndOverflowWatermarks();
26
700140
}
27

            
28
242763
void WatermarkBuffer::add(absl::string_view data) {
29
242763
  OwnedImpl::add(data);
30
242763
  checkHighAndOverflowWatermarks();
31
242763
}
32

            
33
12
void WatermarkBuffer::add(const Instance& data) {
34
12
  OwnedImpl::add(data);
35
12
  checkHighAndOverflowWatermarks();
36
12
}
37

            
38
14
void WatermarkBuffer::prepend(absl::string_view data) {
39
14
  OwnedImpl::prepend(data);
40
14
  checkHighAndOverflowWatermarks();
41
14
}
42

            
43
5
void WatermarkBuffer::prepend(Instance& data) {
44
5
  OwnedImpl::prepend(data);
45
5
  checkHighAndOverflowWatermarks();
46
5
}
47

            
48
void WatermarkBuffer::commit(uint64_t length, absl::Span<RawSlice> slices,
49
1117276
                             ReservationSlicesOwnerPtr slices_owner) {
50
1117276
  OwnedImpl::commit(length, slices, std::move(slices_owner));
51
1117276
  checkHighAndOverflowWatermarks();
52
1117276
}
53

            
54
1507933
void WatermarkBuffer::drain(uint64_t size) {
55
1507933
  OwnedImpl::drain(size);
56
1507933
  checkLowWatermark();
57
1507933
}
58

            
59
2102968
void WatermarkBuffer::move(Instance& rhs) {
60
2102968
  OwnedImpl::move(rhs);
61
2102968
  checkHighAndOverflowWatermarks();
62
2102968
}
63

            
64
5
void WatermarkBuffer::move(Instance& rhs, uint64_t length) {
65
5
  OwnedImpl::move(rhs, length);
66
5
  checkHighAndOverflowWatermarks();
67
5
}
68

            
69
void WatermarkBuffer::move(Instance& rhs, uint64_t length,
70
114
                           bool reset_drain_trackers_and_accounting) {
71
114
  OwnedImpl::move(rhs, length, reset_drain_trackers_and_accounting);
72
114
  checkHighAndOverflowWatermarks();
73
114
}
74

            
75
3
SliceDataPtr WatermarkBuffer::extractMutableFrontSlice() {
76
3
  auto result = OwnedImpl::extractMutableFrontSlice();
77
3
  checkLowWatermark();
78
3
  return result;
79
3
}
80

            
81
// Adjust the reservation size based on space available before hitting
82
// the high watermark to avoid overshooting by a lot and thus violating the limits
83
// the watermark is imposing.
84
1117207
Reservation WatermarkBuffer::reserveForRead() {
85
1117207
  constexpr auto preferred_length = default_read_reservation_size_;
86
1117207
  uint64_t adjusted_length = preferred_length;
87

            
88
1117207
  if (high_watermark_ > 0 && preferred_length > 0) {
89
312693
    const uint64_t current_length = OwnedImpl::length();
90
312693
    if (current_length >= high_watermark_) {
91
      // Always allow a read of at least some data. The API doesn't allow returning
92
      // a zero-length reservation.
93
3
      adjusted_length = Slice::default_slice_size_;
94
312690
    } else {
95
312690
      const uint64_t available_length = high_watermark_ - current_length;
96
312690
      adjusted_length = IntUtil::roundUpToMultiple(available_length, Slice::default_slice_size_);
97
312690
      adjusted_length = std::min(adjusted_length, preferred_length);
98
312690
    }
99
312693
  }
100

            
101
1117207
  return OwnedImpl::reserveWithMaxLength(adjusted_length);
102
1117207
}
103

            
104
3
void WatermarkBuffer::appendSliceForTest(const void* data, uint64_t size) {
105
3
  OwnedImpl::appendSliceForTest(data, size);
106
3
  checkHighAndOverflowWatermarks();
107
3
}
108

            
109
void WatermarkBuffer::appendSliceForTest(absl::string_view data) {
110
  appendSliceForTest(data.data(), data.size());
111
}
112

            
113
477125
size_t WatermarkBuffer::addFragments(absl::Span<const absl::string_view> fragments) {
114
477125
  size_t total_size_to_write = OwnedImpl::addFragments(fragments);
115
477125
  checkHighAndOverflowWatermarks();
116
477125
  return total_size_to_write;
117
477125
}
118

            
119
void WatermarkBuffer::setWatermarks(uint64_t high_watermark,
120
487175
                                    uint32_t overflow_watermark_multiplier) {
121
487175
  if (overflow_watermark_multiplier > 0 &&
122
487175
      (high_watermark > std::numeric_limits<uint64_t>::max() / overflow_watermark_multiplier)) {
123
1
    ENVOY_LOG_MISC(debug, "Error setting overflow threshold: overflow_watermark_multiplier * "
124
1
                          "high_watermark is overflowing. Disabling overflow watermark.");
125
1
    overflow_watermark_multiplier = 0;
126
1
  }
127
487175
  low_watermark_ = high_watermark / 2;
128
487175
  high_watermark_ = high_watermark;
129
487175
  overflow_watermark_ = overflow_watermark_multiplier * high_watermark;
130
487175
  checkHighAndOverflowWatermarks();
131
487175
  checkLowWatermark();
132
487175
}
133

            
134
3305690
void WatermarkBuffer::checkLowWatermark() {
135
3305690
  if (!above_high_watermark_called_ ||
136
3305694
      (high_watermark_ != 0 && OwnedImpl::length() > low_watermark_)) {
137
3096288
    return;
138
3096288
  }
139

            
140
209406
  above_high_watermark_called_ = false;
141
209406
  below_low_watermark_();
142
209406
}
143

            
144
5127333
void WatermarkBuffer::checkHighAndOverflowWatermarks() {
145
5127333
  if (high_watermark_ == 0 || OwnedImpl::length() <= high_watermark_) {
146
4765370
    return;
147
4765370
  }
148

            
149
361982
  if (!above_high_watermark_called_) {
150
209724
    above_high_watermark_called_ = true;
151
209724
    above_high_watermark_();
152
209724
  }
153

            
154
  // Check if overflow watermark is enabled, wasn't previously triggered,
155
  // and the buffer size is above the threshold
156
361963
  if (overflow_watermark_ != 0 && !above_overflow_watermark_called_ &&
157
361963
      OwnedImpl::length() > overflow_watermark_) {
158
4
    above_overflow_watermark_called_ = true;
159
4
    above_overflow_watermark_();
160
4
  }
161
361963
}
162

            
163
BufferMemoryAccountSharedPtr
164
93957
WatermarkBufferFactory::createAccount(Http::StreamResetHandler& reset_handler) {
165
93957
  if (bitshift_ == kEffectivelyDisableTrackingBitshift) {
166
93625
    return nullptr; // No tracking
167
93625
  }
168
332
  return BufferMemoryAccountImpl::createAccount(this, reset_handler);
169
93957
}
170

            
171
void WatermarkBufferFactory::updateAccountClass(const BufferMemoryAccountSharedPtr& account,
172
                                                absl::optional<uint32_t> current_class,
173
815
                                                absl::optional<uint32_t> new_class) {
174
815
  ASSERT(current_class != new_class, "Expected the current_class and new_class to be different");
175

            
176
815
  if (!current_class.has_value()) {
177
    // Start tracking
178
376
    ASSERT(new_class.has_value());
179
376
    ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
180
376
    size_class_account_sets_[new_class.value()].insert(account);
181
441
  } else if (!new_class.has_value()) {
182
    // No longer track
183
315
    ASSERT(current_class.has_value());
184
315
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
185
315
    size_class_account_sets_[current_class.value()].erase(account);
186
327
  } else {
187
    // Moving between buckets
188
124
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
189
124
    ASSERT(!size_class_account_sets_[new_class.value()].contains(account));
190
124
    size_class_account_sets_[new_class.value()].insert(
191
124
        std::move(size_class_account_sets_[current_class.value()].extract(account).value()));
192
124
  }
193
815
}
194

            
195
void WatermarkBufferFactory::unregisterAccount(const BufferMemoryAccountSharedPtr& account,
196
332
                                               absl::optional<uint32_t> current_class) {
197
332
  if (current_class.has_value()) {
198
61
    ASSERT(size_class_account_sets_[current_class.value()].contains(account));
199
61
    size_class_account_sets_[current_class.value()].erase(account);
200
61
  }
201
332
}
202

            
203
34
uint64_t WatermarkBufferFactory::resetAccountsGivenPressure(float pressure) {
204
34
  ASSERT(pressure >= 0.0 && pressure <= 1.0, "Provided pressure is out of range [0, 1].");
205

            
206
  // Compute buckets to clear
207
34
  const uint32_t buckets_to_clear = std::min<uint32_t>(
208
34
      std::floor(pressure * BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_) + 1, 8);
209

            
210
  // Clear buckets, prioritizing the buckets with larger streams.
211
34
  uint32_t num_streams_reset = 0;
212
34
  uint32_t num_buckets_reset = 0;
213
200
  for (uint32_t buckets_cleared = 0; buckets_cleared < buckets_to_clear; ++buckets_cleared) {
214
166
    const uint32_t bucket_to_clear =
215
166
        BufferMemoryAccountImpl::NUM_MEMORY_CLASSES_ - buckets_cleared - 1;
216
166
    absl::flat_hash_set<BufferMemoryAccountSharedPtr>& bucket =
217
166
        size_class_account_sets_[bucket_to_clear];
218

            
219
166
    if (bucket.empty()) {
220
143
      continue;
221
143
    }
222
23
    ++num_buckets_reset;
223

            
224
23
    auto it = bucket.begin();
225
241
    while (it != bucket.end() && num_streams_reset < kMaxNumberOfStreamsToResetPerInvocation) {
226
218
      auto next = std::next(it);
227
      // This will trigger an erase, which avoids rehashing and invalidates the
228
      // iterator *it*. *next* is still valid.
229
218
      (*it)->resetDownstream();
230
218
      it = next;
231
218
      ++num_streams_reset;
232
218
    }
233
23
  }
234
34
  if (num_buckets_reset > 0) {
235
21
    ENVOY_LOG_MISC(warn, "resetting {} streams in {} buckets, {} empty buckets", num_streams_reset,
236
21
                   num_buckets_reset, buckets_to_clear - num_buckets_reset);
237
21
  }
238
34
  return num_streams_reset;
239
34
}
240

            
241
WatermarkBufferFactory::WatermarkBufferFactory(
242
    const envoy::config::overload::v3::BufferFactoryConfig& config)
243
66078
    : bitshift_(config.minimum_account_to_track_power_of_two()
244
66078
                    ? config.minimum_account_to_track_power_of_two() - 1
245
66078
                    : kEffectivelyDisableTrackingBitshift) {}
246

            
247
66078
WatermarkBufferFactory::~WatermarkBufferFactory() {
248
528624
  for (auto& account_set : size_class_account_sets_) {
249
528624
    ASSERT(account_set.empty(),
250
528624
           "Expected all Accounts to have unregistered from the Watermark Factory.");
251
528624
  }
252
66078
}
253

            
254
BufferMemoryAccountSharedPtr
255
BufferMemoryAccountImpl::createAccount(WatermarkBufferFactory* factory,
256
332
                                       Http::StreamResetHandler& reset_handler) {
257
  // We use shared_ptr ctor directly rather than make shared since the
258
  // constructor being invoked is private as we want users to use this static
259
  // method to createAccounts.
260
332
  auto account =
261
332
      std::shared_ptr<BufferMemoryAccount>(new BufferMemoryAccountImpl(factory, reset_handler));
262
  // Set shared_this_ in the account.
263
332
  static_cast<BufferMemoryAccountImpl*>(account.get())->shared_this_ = account;
264
332
  return account;
265
332
}
266

            
267
1031
absl::optional<uint32_t> BufferMemoryAccountImpl::balanceToClassIndex() {
268
1031
  const uint64_t shifted_balance = buffer_memory_allocated_ >> factory_->bitshift();
269

            
270
1031
  if (shifted_balance == 0) {
271
425
    return {}; // Not worth tracking anything < configured minimum threshold
272
425
  }
273

            
274
606
  const int class_idx = absl::bit_width(shifted_balance) - 1;
275
606
  return std::min<uint32_t>(class_idx, NUM_MEMORY_CLASSES_ - 1);
276
1031
}
277

            
278
1031
void BufferMemoryAccountImpl::updateAccountClass() {
279
1031
  auto new_class = balanceToClassIndex();
280
1031
  if (shared_this_ && new_class != current_bucket_idx_) {
281
815
    factory_->updateAccountClass(shared_this_, current_bucket_idx_, new_class);
282
815
    current_bucket_idx_ = new_class;
283
815
  }
284
1031
}
285

            
286
514
void BufferMemoryAccountImpl::credit(uint64_t amount) {
287
514
  ASSERT(buffer_memory_allocated_ >= amount);
288
514
  buffer_memory_allocated_ -= amount;
289
514
  updateAccountClass();
290
514
}
291

            
292
517
void BufferMemoryAccountImpl::charge(uint64_t amount) {
293
  // Check overflow
294
517
  ASSERT(std::numeric_limits<uint64_t>::max() - buffer_memory_allocated_ >= amount);
295
517
  buffer_memory_allocated_ += amount;
296
517
  updateAccountClass();
297
517
}
298

            
299
347
void BufferMemoryAccountImpl::clearDownstream() {
300
347
  if (reset_handler_.has_value()) {
301
332
    reset_handler_.reset();
302
332
    factory_->unregisterAccount(shared_this_, current_bucket_idx_);
303
332
    current_bucket_idx_.reset();
304
332
    shared_this_ = nullptr;
305
332
  }
306
347
}
307

            
308
} // namespace Buffer
309
} // namespace Envoy