1
#pragma once
2

            
3
#include <algorithm>
4
#include <cstdint>
5
#include <deque>
6
#include <memory>
7
#include <string>
8

            
9
#include "envoy/buffer/buffer.h"
10
#include "envoy/http/stream_reset_handler.h"
11

            
12
#include "source/common/common/assert.h"
13
#include "source/common/common/non_copyable.h"
14
#include "source/common/common/utility.h"
15
#include "source/common/event/libevent.h"
16

            
17
#include "absl/functional/any_invocable.h"
18

            
19
namespace Envoy {
20
namespace Buffer {
21

            
22
/**
23
 * A Slice manages a contiguous block of bytes.
24
 * The block is arranged like this:
25
 *                   |<- dataSize() ->|<- reservableSize() ->|
26
 * +-----------------+----------------+----------------------+
27
 * | Drained         | Data           | Reservable           |
28
 * | Unused space    | Usable content | New content can be   |
29
 * | that formerly   |                | added here with      |
30
 * | was in the Data |                | reserve()/commit()   |
31
 * | section         |                | or append()          |
32
 * +-----------------+----------------+----------------------+
33
 * ^                 ^                ^                      ^
34
 * |                 |                |                      |
35
 * base_             base_ + data_    base_ + reservable_    base_ + capacity_
36
 */
37
class Slice {
38
public:
39
  using Reservation = RawSlice;
40
  using StoragePtr = std::unique_ptr<uint8_t[]>;
41

            
42
  struct SizedStorage {
43
    StoragePtr mem_{};
44
    size_t len_{};
45
  };
46

            
47
  /**
48
   * Create an empty Slice with 0 capacity.
49
   */
50
100572684
  Slice() = default;
51

            
52
  /**
53
   * Create an empty mutable Slice that owns its storage, which it charges to the provided account,
54
   * if any.
55
   * @param min_capacity number of bytes of space the slice should have. Actual capacity is rounded
56
   * up to the next multiple of 4kb.
57
   * @param account the account to charge.
58
   */
59
  Slice(uint64_t min_capacity, const BufferMemoryAccountSharedPtr& account)
60
4198173
      : capacity_(sliceSize(min_capacity)), storage_(new uint8_t[capacity_]),
61
4198173
        base_(storage_.get()) {
62
4198173
    if (account) {
63
243
      account->charge(capacity_);
64
243
      account_ = account;
65
243
    }
66
4198173
  }
67

            
68
  /**
69
   * Create an empty mutable Slice that owns its storage, which it charges to the provided account,
70
   * if any.
71
   * @param storage backend storage for the slice.
72
   * @param used_size the size already used in storage.
73
   * @param account the account to charge.
74
   */
75
  Slice(SizedStorage storage, uint64_t used_size, const BufferMemoryAccountSharedPtr& account)
76
1757886
      : capacity_(storage.len_), storage_(std::move(storage.mem_)), base_(storage_.get()),
77
1757886
        reservable_(used_size) {
78
1757886
    ASSERT(sliceSize(capacity_) == capacity_);
79
1757886
    ASSERT(reservable_ <= capacity_);
80

            
81
1757886
    if (account) {
82
1
      account->charge(capacity_);
83
1
      account_ = account;
84
1
    }
85
1757886
  }
86

            
87
  /**
88
   * Create an immutable Slice that refers to an external buffer fragment.
89
   * @param fragment provides externally owned immutable data.
90
   */
91
  Slice(BufferFragment& fragment)
92
810291
      : capacity_(fragment.size()), storage_(nullptr),
93
810291
        base_(static_cast<uint8_t*>(const_cast<void*>(fragment.data()))),
94
810291
        reservable_(fragment.size()) {
95
810303
    releasor_ = [&fragment]() { fragment.done(); };
96
810291
  }
97

            
98
27
  Slice(Slice&& rhs) noexcept {
99
27
    capacity_ = rhs.capacity_;
100
27
    storage_ = std::move(rhs.storage_);
101
27
    base_ = rhs.base_;
102
27
    data_ = rhs.data_;
103
27
    reservable_ = rhs.reservable_;
104
27
    drain_trackers_ = std::move(rhs.drain_trackers_);
105
27
    account_ = std::move(rhs.account_);
106
27
    releasor_.swap(rhs.releasor_);
107

            
108
27
    rhs.capacity_ = 0;
109
27
    rhs.base_ = nullptr;
110
27
    rhs.data_ = 0;
111
27
    rhs.reservable_ = 0;
112
27
  }
113

            
114
31735866
  Slice& operator=(Slice&& rhs) noexcept {
115
31737324
    if (this != &rhs) {
116
31737324
      callAndClearDrainTrackersAndCharges();
117

            
118
31737324
      capacity_ = rhs.capacity_;
119
31737324
      storage_ = std::move(rhs.storage_);
120
31737324
      base_ = rhs.base_;
121
31737324
      data_ = rhs.data_;
122
31737324
      reservable_ = rhs.reservable_;
123
31737324
      drain_trackers_ = std::move(rhs.drain_trackers_);
124
31737324
      account_ = std::move(rhs.account_);
125
31737324
      if (releasor_) {
126
202969
        releasor_();
127
202969
      }
128
31737324
      releasor_ = rhs.releasor_;
129
31737324
      rhs.releasor_ = nullptr;
130

            
131
31737324
      rhs.capacity_ = 0;
132
31737324
      rhs.base_ = nullptr;
133
31737324
      rhs.data_ = 0;
134
31737324
      rhs.reservable_ = 0;
135
31737324
    }
136

            
137
31735866
    return *this;
138
31735866
  }
139

            
140
107257253
  ~Slice() {
141
107257253
    callAndClearDrainTrackersAndCharges();
142
107257253
    if (releasor_) {
143
607323
      releasor_();
144
607323
    }
145
107257253
  }
146

            
147
  /**
148
   * @return true if the data in the slice is mutable
149
   */
150
18
  bool isMutable() const { return storage_ != nullptr; }
151

            
152
  /**
153
   * @return true if content in this Slice can be coalesced into another Slice.
154
   */
155
3910430
  bool canCoalesce() const { return storage_ != nullptr; }
156

            
157
  /**
158
   * @return a pointer to the start of the usable content.
159
   */
160
5918834
  const uint8_t* data() const { return base_ + data_; }
161

            
162
  /**
163
   * @return a pointer to the start of the usable content.
164
   */
165
1891824
  uint8_t* data() { return base_ + data_; }
166

            
167
  /**
168
   * @return the size in bytes of the usable content.
169
   */
170
24318224
  uint64_t dataSize() const { return reservable_ - data_; }
171

            
172
  /**
173
   * Remove the first `size` bytes of usable content. Runs in O(1) time.
174
   * @param size number of bytes to remove. If greater than data_size(), the result is undefined.
175
   */
176
711072
  void drain(uint64_t size) {
177
711072
    ASSERT(data_ + size <= reservable_);
178
711072
    data_ += size;
179
711072
    if (data_ == reservable_) {
180
      // All the data in the slice has been drained. Reset the offsets so all
181
      // the data can be reused.
182
1
      data_ = 0;
183
1
      reservable_ = 0;
184
1
    }
185
711072
  }
186

            
187
  /**
188
   * @return the number of bytes available to be reserved.
189
   * @note Read-only implementations of Slice should return zero from this method.
190
   */
191
11540422
  uint64_t reservableSize() const {
192
11540422
    ASSERT(capacity_ >= reservable_);
193
11540422
    return capacity_ - reservable_;
194
11540422
  }
195

            
196
  /**
197
   * Reserve `size` bytes that the caller can populate with content. The caller SHOULD then
198
   * call commit() to add the newly populated content from the Reserved section to the Data
199
   * section.
200
   * @note If there is already an outstanding reservation (i.e., a reservation obtained
201
   *       from reserve() that has not been released by calling commit()), this method will
202
   *       return a new reservation that replaces it.
203
   * @param size the number of bytes to reserve. The Slice implementation MAY reserve
204
   *        fewer bytes than requested (for example, if it doesn't have enough room in the
205
   *        Reservable section to fulfill the whole request).
206
   * @return a tuple containing the address of the start of resulting reservation and the
207
   *         reservation size in bytes. If the address is null, the reservation failed.
208
   * @note Read-only implementations of Slice should return {nullptr, 0} from this method.
209
   */
210
1018229
  Reservation reserve(uint64_t size) {
211
1018229
    if (size == 0) {
212
1
      return {nullptr, 0};
213
1
    }
214
    // Verify the semantics that drain() enforces: if the slice is empty, either because
215
    // no data has been added or because all the added data has been drained, the data
216
    // section is at the very start of the slice.
217
1018228
    ASSERT(!(dataSize() == 0 && data_ > 0));
218
1018228
    uint64_t available_size = capacity_ - reservable_;
219
1018228
    if (available_size == 0) {
220
62
      return {nullptr, 0};
221
62
    }
222
1018166
    uint64_t reservation_size = std::min(size, available_size);
223
1018166
    void* reservation = &(base_[reservable_]);
224
1018166
    return {reservation, static_cast<size_t>(reservation_size)};
225
1018228
  }
226

            
227
  /**
228
   * Commit a Reservation that was previously obtained from a call to reserve().
229
   * The Reservation's size is added to the Data section.
230
   * @param reservation a reservation obtained from a previous call to reserve().
231
   *        If the reservation is not from this Slice, commit() will return false.
232
   *        If the caller is committing fewer bytes than provided by reserve(), it
233
   *        should change the len_ field of the reservation before calling commit().
234
   *        For example, if a caller reserve()s 4KB to do a nonblocking socket read,
235
   *        and the read only returns two bytes, the caller should set
236
   *        reservation.len_ = 2 and then call `commit(reservation)`.
237
   * @return whether the Reservation was successfully committed to the Slice.
238
   * @note template parameter `SafeCommit` can be used to disable memory range check.
239
   */
240
573213
  template <bool SafeCommit = true> bool commit(const Reservation& reservation) {
241
573213
    if constexpr (SafeCommit) {
242
9472
      if (static_cast<const uint8_t*>(reservation.mem_) != base_ + reservable_ ||
243
9472
          reservable_ + reservation.len_ > capacity_ || reservable_ >= capacity_) {
244
        // The reservation is not from this Slice.
245
3
        return false;
246
3
      }
247
571736
    } else {
248
563741
      ASSERT(static_cast<const uint8_t*>(reservation.mem_) == base_ + reservable_ &&
249
563741
             reservable_ + reservation.len_ <= capacity_);
250
563741
    }
251
9469
    reservable_ += reservation.len_;
252
573213
    return true;
253
573213
  }
254

            
255
  /**
256
   * Copy as much of the supplied data as possible to the end of the slice.
257
   * @param data start of the data to copy.
258
   * @param size number of bytes to copy.
259
   * @return number of bytes copied (may be a smaller than size, may even be zero).
260
   */
261
9622302
  uint64_t append(const void* data, uint64_t size) {
262
9622302
    uint64_t copy_size = std::min(size, reservableSize());
263
9622302
    if (copy_size == 0) {
264
140995
      return 0;
265
140995
    }
266
9481307
    uint8_t* dest = base_ + reservable_;
267
9481307
    reservable_ += copy_size;
268
    // NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
269
9481307
    memcpy(dest, data, copy_size); // NOLINT(safe-memcpy)
270
9481307
    return copy_size;
271
9622302
  }
272

            
273
  /**
274
   * Copy as much of the supplied data as possible to the front of the slice.
275
   * If only part of the data will fit in the slice, the bytes from the _end_ are
276
   * copied.
277
   * @param data start of the data to copy.
278
   * @param size number of bytes to copy.
279
   * @return number of bytes copied (may be a smaller than size, may even be zero).
280
   */
281
  uint64_t prepend(const void* data, uint64_t size);
282

            
283
  /**
284
   * Describe the in-memory representation of the slice. For use
285
   * in tests that want to make assertions about the specific arrangement of
286
   * bytes in a slice.
287
   */
288
  struct SliceRepresentation {
289
    uint64_t data;
290
    uint64_t reservable;
291
    uint64_t capacity;
292
  };
293
63
  SliceRepresentation describeSliceForTest() const {
294
63
    return SliceRepresentation{dataSize(), reservableSize(), capacity_};
295
63
  }
296

            
297
  /**
298
   * Move all drain trackers and charges from the current slice to the destination slice.
299
   */
300
1227070
  void transferDrainTrackersTo(Slice& destination) {
301
1227070
    destination.drain_trackers_.splice(destination.drain_trackers_.end(), drain_trackers_);
302
1227070
    ASSERT(drain_trackers_.empty());
303
    // The releasor needn't to be transferred, and actually if there is releasor, this
304
    // slice can't coalesce. Then there won't be a chance to calling this method.
305
1227070
    ASSERT(releasor_ == nullptr);
306
1227070
  }
307

            
308
  /**
309
   * Add a drain tracker to the slice.
310
   */
311
786398
  void addDrainTracker(std::function<void()> drain_tracker) {
312
786398
    drain_trackers_.emplace_back(std::move(drain_tracker));
313
786398
  }
314

            
315
  /**
316
   * Call all drain trackers associated with the slice, then clear
317
   * the drain tracker list.
318
   */
319
138920455
  void callAndClearDrainTrackersAndCharges() {
320
138920455
    for (const auto& drain_tracker : drain_trackers_) {
321
786400
      drain_tracker();
322
786400
    }
323
138920455
    drain_trackers_.clear();
324

            
325
138920455
    if (account_) {
326
290
      account_->credit(capacity_);
327
290
      account_.reset();
328
290
    }
329
138920455
  }
330

            
331
  /**
332
   * Charges the provided account for the resources if these conditions hold:
333
   * - we're not already charging for this slice
334
   * - the given account is non-null
335
   * - the slice owns backing memory
336
   */
337
2683429
  void maybeChargeAccount(const BufferMemoryAccountSharedPtr& account) {
338
2683438
    if (account_ != nullptr || storage_ == nullptr || account == nullptr) {
339
2683371
      return;
340
2683371
    }
341
67
    account->charge(capacity_);
342
67
    account_ = account;
343
67
  }
344

            
345
  static constexpr uint32_t default_slice_size_ = 16384;
346

            
347
public:
348
  /**
349
   * Compute a slice size big enough to hold a specified amount of data.
350
   * @param data_size the minimum amount of data the slice must be able to store, in bytes.
351
   * @return a recommended slice size, in bytes.
352
   */
353
6882831
  static uint64_t sliceSize(uint64_t data_size) {
354
6882831
    static constexpr uint64_t PageSize = 4096;
355
6882831
    const uint64_t num_pages = (data_size + PageSize - 1) / PageSize;
356
6882831
    return num_pages * PageSize;
357
6882831
  }
358

            
359
  /**
360
   * Create new backend storage with min capacity. This method will create a recommended capacity
361
   * which will bigger or equal to the min capacity and create new backend storage based on the
362
   * recommended capacity.
363
   * @param min_capacity the min capacity of new created backend storage.
364
   * @return a backend storage for slice.
365
   */
366
2685618
  static inline SizedStorage newStorage(uint64_t min_capacity) {
367
2685618
    const uint64_t slice_size = sliceSize(min_capacity);
368
2685618
    return {StoragePtr{new uint8_t[slice_size]}, static_cast<size_t>(slice_size)};
369
2685618
  }
370

            
371
protected:
372
  /** Length of the byte array that base_ points to. This is also the offset in bytes from the start
373
   * of the slice to the end of the Reservable section. */
374
  uint64_t capacity_ = 0;
375

            
376
  /** Backing storage for mutable slices which own their own storage. This storage should never be
377
   * accessed directly; access base_ instead. */
378
  StoragePtr storage_;
379

            
380
  /** Start of the slice. Points to storage_ iff the slice owns its own storage. */
381
  uint8_t* base_{nullptr};
382

            
383
  /** Offset in bytes from the start of the slice to the start of the Data section. */
384
  uint64_t data_ = 0;
385

            
386
  /** Offset in bytes from the start of the slice to the start of the Reservable section which is
387
   * also the end of the Data section. */
388
  uint64_t reservable_ = 0;
389

            
390
  /** Hooks to execute when the slice is destroyed. */
391
  std::list<std::function<void()>> drain_trackers_;
392

            
393
  /** Account associated with this slice. This may be null. When
394
   * coalescing with another slice, we do not transfer over their account. */
395
  BufferMemoryAccountSharedPtr account_;
396

            
397
  /** The releasor for the BufferFragment */
398
  std::function<void()> releasor_;
399
};
400

            
401
class OwnedImpl;
402

            
403
class SliceDataImpl : public SliceData {
404
public:
405
10
  explicit SliceDataImpl(Slice&& slice) : slice_(std::move(slice)) {}
406

            
407
  // SliceData
408
8
  absl::Span<uint8_t> getMutableData() override {
409
8
    RELEASE_ASSERT(slice_.isMutable(), "Not allowed to call getMutableData if slice is immutable");
410
8
    return {slice_.data(), static_cast<absl::Span<uint8_t>::size_type>(slice_.dataSize())};
411
8
  }
412

            
413
private:
414
  friend OwnedImpl;
415
  Slice slice_;
416
};
417

            
418
/**
419
 * Queue of Slice that supports efficient read and write access to both
420
 * the front and the back of the queue.
421
 * @note This class has similar properties to std::deque<T>. The reason for using
422
 *       a custom deque implementation is that benchmark testing during development
423
 *       revealed that std::deque was too slow to reach performance parity with the
424
 *       prior evbuffer-based buffer implementation.
425
 */
426
class SliceDeque {
427
public:
428
9688434
  SliceDeque() : ring_(inline_ring_), capacity_(InlineRingCapacity) {}
429

            
430
1781975
  SliceDeque(SliceDeque&& rhs) noexcept {
431
    // This custom move constructor is needed so that ring_ will be updated properly.
432
1781975
    std::move(rhs.inline_ring_, rhs.inline_ring_ + InlineRingCapacity, inline_ring_);
433
1781975
    external_ring_ = std::move(rhs.external_ring_);
434
1781975
    ring_ = (external_ring_ != nullptr) ? external_ring_.get() : inline_ring_;
435
1781975
    start_ = rhs.start_;
436
1781975
    size_ = rhs.size_;
437
1781975
    capacity_ = rhs.capacity_;
438
1781975
  }
439

            
440
479
  SliceDeque& operator=(SliceDeque&& rhs) noexcept {
441
    // This custom assignment move operator is needed so that ring_ will be updated properly.
442
479
    std::move(rhs.inline_ring_, rhs.inline_ring_ + InlineRingCapacity, inline_ring_);
443
479
    external_ring_ = std::move(rhs.external_ring_);
444
479
    ring_ = (external_ring_ != nullptr) ? external_ring_.get() : inline_ring_;
445
479
    start_ = rhs.start_;
446
479
    size_ = rhs.size_;
447
479
    capacity_ = rhs.capacity_;
448
479
    return *this;
449
479
  }
450

            
451
9368215
  void emplace_back(Slice&& slice) { // NOLINT(readability-identifier-naming)
452
9368215
    growRing();
453
9368215
    size_t index = internalIndex(size_);
454
9368215
    ring_[index] = std::move(slice);
455
9368215
    size_++;
456
9368215
  }
457

            
458
79192
  void emplace_front(Slice&& slice) { // NOLINT(readability-identifier-naming)
459
79192
    growRing();
460
79192
    start_ = (start_ == 0) ? capacity_ - 1 : start_ - 1;
461
79192
    ring_[start_] = std::move(slice);
462
79192
    size_++;
463
79192
  }
464

            
465
31908351
  bool empty() const { return size() == 0; }
466
42137508
  size_t size() const { return size_; }
467

            
468
20582468
  Slice& front() { return ring_[start_]; }
469
  const Slice& front() const { return ring_[start_]; }
470
15097747
  Slice& back() { return ring_[internalIndex(size_ - 1)]; }
471
  const Slice& back() const { return ring_[internalIndex(size_ - 1)]; }
472

            
473
27689
  Slice& operator[](size_t i) {
474
27689
    ASSERT(!empty());
475
27689
    return ring_[internalIndex(i)];
476
27689
  }
477
6066780
  const Slice& operator[](size_t i) const {
478
6066780
    ASSERT(!empty());
479
6066780
    return ring_[internalIndex(i)];
480
6066780
  }
481

            
482
7143594
  void pop_front() { // NOLINT(readability-identifier-naming)
483
7143594
    if (size() == 0) {
484
      return;
485
    }
486
7143594
    front() = Slice();
487
7143594
    size_--;
488
7143594
    start_++;
489
7143594
    if (start_ == capacity_) {
490
326288
      start_ = 0;
491
326288
    }
492
7143594
  }
493

            
494
83
  void pop_back() { // NOLINT(readability-identifier-naming)
495
83
    if (size() == 0) {
496
      return;
497
    }
498
83
    back() = Slice();
499
83
    size_--;
500
83
  }
501

            
502
  /**
503
   * Forward const iterator for SliceDeque.
504
   * @note this implementation currently supports the minimum functionality needed to support
505
   *       the `for (const auto& slice : slice_deque)` idiom.
506
   */
507
  class ConstIterator {
508
  public:
509
6065880
    const Slice& operator*() { return deque_[index_]; }
510

            
511
4655327
    ConstIterator operator++() {
512
4655327
      index_++;
513
4655327
      return *this;
514
4655327
    }
515

            
516
9864713
    bool operator!=(const ConstIterator& rhs) const {
517
9864880
      return &deque_ != &rhs.deque_ || index_ != rhs.index_;
518
9864713
    }
519

            
520
    friend class SliceDeque;
521

            
522
  private:
523
10418255
    ConstIterator(const SliceDeque& deque, size_t index) : deque_(deque), index_(index) {}
524
    const SliceDeque& deque_;
525
    size_t index_;
526
  };
527

            
528
5209740
  ConstIterator begin() const noexcept { return {*this, 0}; }
529

            
530
5209349
  ConstIterator end() const noexcept { return {*this, size_}; }
531

            
532
private:
533
  constexpr static size_t InlineRingCapacity = 8;
534

            
535
30551454
  size_t internalIndex(size_t index) const {
536
30551454
    size_t internal_index = start_ + index;
537
30551454
    if (internal_index >= capacity_) {
538
393241
      internal_index -= capacity_;
539
393241
      ASSERT(internal_index < capacity_);
540
393241
    }
541
30551454
    return internal_index;
542
30551454
  }
543

            
544
9446202
  void growRing() {
545
9446203
    if (size_ < capacity_) {
546
9430942
      return;
547
9430942
    }
548
15261
    const size_t new_capacity = capacity_ * 2;
549
15261
    auto new_ring = std::make_unique<Slice[]>(new_capacity);
550
15261
    size_t src = start_;
551
15261
    size_t dst = 0;
552
923504
    for (size_t i = 0; i < size_; i++) {
553
908243
      new_ring[dst++] = std::move(ring_[src++]);
554
908243
      if (src == capacity_) {
555
15409
        src = 0;
556
15409
      }
557
908243
    }
558
15261
    external_ring_.swap(new_ring);
559
15261
    ring_ = external_ring_.get();
560
15261
    start_ = 0;
561
15261
    capacity_ = new_capacity;
562
15261
  }
563

            
564
  Slice inline_ring_[InlineRingCapacity];
565
  std::unique_ptr<Slice[]> external_ring_;
566
  Slice* ring_; // points to start of either inline or external ring.
567
  size_t start_{0};
568
  size_t size_{0};
569
  size_t capacity_;
570
};
571

            
572
/**
573
 * An implementation of BufferFragment where a releasor callback is called when the data is
574
 * no longer needed.
575
 */
576
class BufferFragmentImpl : NonCopyable, public BufferFragment {
577
public:
578
  /**
579
   * Creates a new wrapper around the externally owned <data> of size <size>.
580
   * The caller must ensure <data> is valid until releasor() is called, or for the lifetime of the
581
   * fragment. releasor() is called with <data>, <size> and <this> to allow caller to delete
582
   * the fragment object.
583
   * @param data external data to reference
584
   * @param size size of data
585
   * @param releasor a callback function to be called when data is no longer needed.
586
   */
587
  BufferFragmentImpl(
588
      const void* data, size_t size,
589
      absl::AnyInvocable<void(const void*, size_t, const BufferFragmentImpl*)> releasor)
590
757595
      : data_(data), size_(size), releasor_(std::move(releasor)) {}
591

            
592
  // Buffer::BufferFragment
593
757590
  const void* data() const override { return data_; }
594
2272735
  size_t size() const override { return size_; }
595
757592
  void done() override {
596
757592
    if (releasor_) {
597
160901
      releasor_(data_, size_, this);
598
160901
    }
599
757592
  }
600

            
601
private:
602
  const void* const data_;
603
  const size_t size_;
604
  absl::AnyInvocable<void(const void*, size_t, const BufferFragmentImpl*)> releasor_;
605
};
606

            
607
class LibEventInstance : public Instance {
608
public:
609
  // Called after accessing the memory in buffer() directly to allow any post-processing.
610
  virtual void postProcess() PURE;
611
};
612

            
613
/**
614
 * Wrapper for uint64_t that asserts upon integer overflow and underflow.
615
 */
616
class OverflowDetectingUInt64 {
617
public:
618
23370948
  operator uint64_t() const { return value_; }
619

            
620
15583593
  OverflowDetectingUInt64& operator+=(uint64_t size) {
621
15583593
    uint64_t new_value = value_ + size;
622
15583593
    RELEASE_ASSERT(new_value >= value_, "64-bit unsigned integer overflowed");
623
15583593
    value_ = new_value;
624
15583593
    return *this;
625
15583593
  }
626

            
627
7815520
  OverflowDetectingUInt64& operator-=(uint64_t size) {
628
7815520
    RELEASE_ASSERT(value_ >= size, "unsigned integer underflowed");
629
7815520
    value_ -= size;
630
7815520
    return *this;
631
7815520
  }
632

            
633
private:
634
  uint64_t value_{0};
635
};
636

            
637
/**
638
 * Wraps an allocated and owned buffer.
639
 *
640
 * Note that due to the internals of move(), OwnedImpl is not
641
 * compatible with non-OwnedImpl buffers.
642
 */
643
class OwnedImpl : public LibEventInstance {
644
public:
645
  OwnedImpl();
646
  OwnedImpl(absl::string_view data);
647
  OwnedImpl(const Instance& data);
648
  OwnedImpl(const void* data, uint64_t size);
649
  OwnedImpl(BufferMemoryAccountSharedPtr account);
650

            
651
  // Buffer::Instance
652
  void addDrainTracker(std::function<void()> drain_tracker) override;
653
  void bindAccount(BufferMemoryAccountSharedPtr account) override;
654
  void add(const void* data, uint64_t size) override;
655
  void addBufferFragment(BufferFragment& fragment) override;
656
  void add(absl::string_view data) override;
657
  void add(const Instance& data) override;
658
  void prepend(absl::string_view data) override;
659
  void prepend(Instance& data) override;
660
  void copyOut(size_t start, uint64_t size, void* data) const override;
661
  uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices,
662
                           uint64_t num_slice) const override;
663
  void drain(uint64_t size) override;
664
  RawSliceVector getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const override;
665
  RawSlice frontSlice() const override;
666
  SliceDataPtr extractMutableFrontSlice() override;
667
  uint64_t length() const override;
668
  void* linearize(uint32_t size) override;
669
  void move(Instance& rhs) override;
670
  void move(Instance& rhs, uint64_t length) override;
671
  void move(Instance& rhs, uint64_t length, bool reset_drain_trackers_and_accounting) override;
672
  Reservation reserveForRead() override;
673
  ReservationSingleSlice reserveSingleSlice(uint64_t length, bool separate_slice = false) override;
674
  ssize_t search(const void* data, uint64_t size, size_t start, size_t length) const override;
675
  bool startsWith(absl::string_view data) const override;
676
  std::string toString() const override;
677

            
678
  // LibEventInstance
679
  void postProcess() override;
680

            
681
  /**
682
   * Create a new slice at the end of the buffer, and copy the supplied content into it.
683
   * @param data start of the content to copy.
684
   *
685
   */
686
  virtual void appendSliceForTest(const void* data, uint64_t size);
687

            
688
  /**
689
   * Create a new slice at the end of the buffer, and copy the supplied string into it.
690
   * @param data the string to append to the buffer.
691
   */
692
  virtual void appendSliceForTest(absl::string_view data);
693

            
694
  /**
695
   * @return the BufferMemoryAccount bound to this buffer, if any.
696
   */
697
  BufferMemoryAccountSharedPtr getAccountForTest();
698

            
699
  // Does not implement watermarking.
700
  // TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer
701
  // implementations. Also, make high-watermark config a constructor argument.
702
  void setWatermarks(uint64_t, uint32_t) override { ASSERT(false, "watermarks not implemented."); }
703
7
  uint64_t highWatermark() const override { return 0; }
704
  bool highWatermarkTriggered() const override { return false; }
705

            
706
  /**
707
   * Describe the in-memory representation of the slices in the buffer. For use
708
   * in tests that want to make assertions about the specific arrangement of
709
   * bytes in the buffer.
710
   */
711
  std::vector<Slice::SliceRepresentation> describeSlicesForTest() const;
712

            
713
  /**
714
   * Create a reservation for reading with a non-default length. Used in benchmark tests.
715
   */
716
1
  Reservation reserveForReadWithLengthForTest(uint64_t length) {
717
1
    return reserveWithMaxLength(length);
718
1
  }
719

            
720
  size_t addFragments(absl::Span<const absl::string_view> fragments) override;
721

            
722
protected:
723
  static constexpr uint64_t default_read_reservation_size_ =
724
      Reservation::MAX_SLICES_ * Slice::default_slice_size_;
725

            
726
  /**
727
   * Create a reservation with a maximum length.
728
   */
729
  Reservation reserveWithMaxLength(uint64_t max_length);
730

            
731
  void commit(uint64_t length, absl::Span<RawSlice> slices,
732
              ReservationSlicesOwnerPtr slices_owner) override;
733

            
734
private:
735
  /**
736
   * @param rhs another buffer
737
   * @return whether the rhs buffer is also an instance of OwnedImpl (or a subclass) that
738
   *         uses the same internal implementation as this buffer.
739
   */
740
  bool isSameBufferImpl(const Instance& rhs) const;
741

            
742
  void addImpl(const void* data, uint64_t size);
743
  void drainImpl(uint64_t size);
744

            
745
  /**
746
   * Moves contents of the `other_slice` by either taking its ownership or coalescing it
747
   * into an existing slice.
748
   * NOTE: the caller is responsible for draining the buffer that contains the `other_slice`.
749
   */
750
  void coalesceOrAddSlice(Slice&& other_slice);
751

            
752
  /** Ring buffer of slices. */
753
  SliceDeque slices_;
754

            
755
  /** Sum of the dataSize of all slices. */
756
  OverflowDetectingUInt64 length_;
757

            
758
  BufferMemoryAccountSharedPtr account_;
759

            
760
  struct OwnedImplReservationSlicesOwner : public ReservationSlicesOwner {
761
    virtual absl::Span<Slice::SizedStorage> ownedStorages() PURE;
762
  };
763

            
764
  struct OwnedImplReservationSlicesOwnerMultiple : public OwnedImplReservationSlicesOwner {
765
  public:
766
    static constexpr uint32_t free_list_max_ = Buffer::Reservation::MAX_SLICES_;
767

            
768
1118535
    OwnedImplReservationSlicesOwnerMultiple() : free_list_ref_(free_list_) {}
769
1118652
    ~OwnedImplReservationSlicesOwnerMultiple() override {
770
9660406
      for (auto r = owned_storages_.rbegin(); r != owned_storages_.rend(); r++) {
771
8541754
        if (r->mem_ != nullptr) {
772
7175796
          ASSERT(r->len_ == Slice::default_slice_size_);
773
7175846
          if (free_list_ref_.size() < free_list_max_) {
774
7175736
            free_list_ref_.push_back(std::move(r->mem_));
775
7175736
          }
776
7175796
        }
777
8541754
      }
778
1118652
    }
779

            
780
8049144
    Slice::SizedStorage newStorage() {
781
8049144
      ASSERT(Slice::sliceSize(Slice::default_slice_size_) == Slice::default_slice_size_);
782

            
783
8049144
      Slice::SizedStorage storage{nullptr, Slice::default_slice_size_};
784
8049144
      if (!free_list_ref_.empty()) {
785
7018827
        storage.mem_ = std::move(free_list_ref_.back());
786
7018827
        free_list_ref_.pop_back();
787
7068093
      } else {
788
1030317
        storage.mem_.reset(new uint8_t[Slice::default_slice_size_]);
789
1030317
      }
790

            
791
8049144
      return storage;
792
8049144
    }
793

            
794
603068
    absl::Span<Slice::SizedStorage> ownedStorages() override {
795
603068
      return absl::MakeSpan(owned_storages_);
796
603068
    }
797

            
798
    absl::InlinedVector<Slice::SizedStorage, Buffer::Reservation::MAX_SLICES_> owned_storages_;
799

            
800
  private:
801
    // Thread local resolving introduces additional overhead. Initialize this reference once when
802
    // constructing the owner to reduce thread local resolving to improve performance.
803
    absl::InlinedVector<Slice::StoragePtr, free_list_max_>& free_list_ref_;
804

            
805
    // Simple thread local cache to reduce unnecessary memory allocation and release. This cache
806
    // is currently only used for multiple slices reservation because of the additional overhead
807
    // that thread local resolving would introduce.
808
    static thread_local absl::InlinedVector<Slice::StoragePtr, free_list_max_> free_list_;
809
  };
810

            
811
  struct OwnedImplReservationSlicesOwnerSingle : public OwnedImplReservationSlicesOwner {
812
884657
    absl::Span<Slice::SizedStorage> ownedStorages() override {
813
884657
      return absl::MakeSpan(&owned_storage_, 1);
814
884657
    }
815

            
816
    Slice::SizedStorage owned_storage_;
817
  };
818
};
819

            
820
using BufferFragmentPtr = std::unique_ptr<BufferFragment>;
821

            
822
/**
823
 * An implementation of BufferFragment where a releasor callback is called when the data is
824
 * no longer needed. Copies data into internal buffer.
825
 */
826
class OwnedBufferFragmentImpl final : public BufferFragment, public InlineStorage {
827
public:
828
  using Releasor = std::function<void(const OwnedBufferFragmentImpl*)>;
829

            
830
  /**
831
   * Copies the data into internal buffer. The releasor is called when the data has been
832
   * fully drained or the buffer that contains this fragment is destroyed.
833
   * @param data external data to reference
834
   * @param releasor a callback function to be called when data is no longer needed.
835
   */
836

            
837
37108
  static BufferFragmentPtr create(absl::string_view data, const Releasor& releasor) {
838
37108
    return BufferFragmentPtr(new (sizeof(OwnedBufferFragmentImpl) + data.size())
839
37108
                                 OwnedBufferFragmentImpl(data, releasor));
840
37108
  }
841

            
842
  // Buffer::BufferFragment
843
37108
  const void* data() const override { return data_; }
844
111320
  size_t size() const override { return size_; }
845
37108
  void done() override { releasor_(this); }
846

            
847
private:
848
  OwnedBufferFragmentImpl(absl::string_view data, const Releasor& releasor)
849
37108
      : releasor_(releasor), size_(data.size()) {
850
37108
    ASSERT(releasor != nullptr);
851
37108
    memcpy(data_, data.data(), data.size()); // NOLINT(safe-memcpy)
852
37108
  }
853

            
854
  const Releasor releasor_;
855
  const size_t size_;
856
  uint8_t data_[];
857
};
858

            
859
using OwnedBufferFragmentImplPtr = std::unique_ptr<OwnedBufferFragmentImpl>;
860

            
861
} // namespace Buffer
862
} // namespace Envoy