Line data Source code
1 : #pragma once
2 :
3 : #include <algorithm>
4 : #include <cstdint>
5 : #include <deque>
6 : #include <memory>
7 : #include <string>
8 :
9 : #include "envoy/buffer/buffer.h"
10 : #include "envoy/http/stream_reset_handler.h"
11 :
12 : #include "source/common/common/assert.h"
13 : #include "source/common/common/non_copyable.h"
14 : #include "source/common/common/utility.h"
15 : #include "source/common/event/libevent.h"
16 :
17 : #include "absl/functional/any_invocable.h"
18 :
19 : namespace Envoy {
20 : namespace Buffer {
21 :
22 : /**
23 : * A Slice manages a contiguous block of bytes.
24 : * The block is arranged like this:
25 : * |<- dataSize() ->|<- reservableSize() ->|
26 : * +-----------------+----------------+----------------------+
27 : * | Drained | Data | Reservable |
28 : * | Unused space | Usable content | New content can be |
29 : * | that formerly | | added here with |
30 : * | was in the Data | | reserve()/commit() |
31 : * | section | | or append() |
32 : * +-----------------+----------------+----------------------+
33 : * ^ ^ ^ ^
34 : * | | | |
35 : * base_ base_ + data_ base_ + reservable_ base_ + capacity_
36 : */
37 : class Slice {
38 : public:
39 : using Reservation = RawSlice;
40 : using StoragePtr = std::unique_ptr<uint8_t[]>;
41 :
42 : struct SizedStorage {
43 : StoragePtr mem_{};
44 : size_t len_{};
45 : };
46 :
47 : /**
48 : * Create an empty Slice with 0 capacity.
49 : */
50 21013405 : Slice() = default;
51 :
52 : /**
53 : * Create an empty mutable Slice that owns its storage, which it charges to the provided account,
54 : * if any.
55 : * @param min_capacity number of bytes of space the slice should have. Actual capacity is rounded
56 : * up to the next multiple of 4kb.
57 : * @param account the account to charge.
58 : */
59 : Slice(uint64_t min_capacity, const BufferMemoryAccountSharedPtr& account)
60 : : capacity_(sliceSize(min_capacity)), storage_(new uint8_t[capacity_]),
61 1419777 : base_(storage_.get()) {
62 1419777 : if (account) {
63 0 : account->charge(capacity_);
64 0 : account_ = account;
65 0 : }
66 1419777 : }
67 :
68 : /**
69 : * Create an empty mutable Slice that owns its storage, which it charges to the provided account,
70 : * if any.
71 : * @param storage backend storage for the slice.
72 : * @param used_size the size already used in storage.
73 : * @param account the account to charge.
74 : */
75 : Slice(SizedStorage storage, uint64_t used_size, const BufferMemoryAccountSharedPtr& account)
76 : : capacity_(storage.len_), storage_(std::move(storage.mem_)), base_(storage_.get()),
77 4804 : reservable_(used_size) {
78 4804 : ASSERT(sliceSize(capacity_) == capacity_);
79 4804 : ASSERT(reservable_ <= capacity_);
80 :
81 4804 : if (account) {
82 0 : account->charge(capacity_);
83 0 : account_ = account;
84 0 : }
85 4804 : }
86 :
87 : /**
88 : * Create an immutable Slice that refers to an external buffer fragment.
89 : * @param fragment provides externally owned immutable data.
90 : */
91 : Slice(BufferFragment& fragment)
92 : : capacity_(fragment.size()), storage_(nullptr),
93 : base_(static_cast<uint8_t*>(const_cast<void*>(fragment.data()))),
94 2574 : reservable_(fragment.size()) {
95 2574 : releasor_ = [&fragment]() { fragment.done(); };
96 2574 : }
97 :
98 0 : Slice(Slice&& rhs) noexcept {
99 0 : capacity_ = rhs.capacity_;
100 0 : storage_ = std::move(rhs.storage_);
101 0 : base_ = rhs.base_;
102 0 : data_ = rhs.data_;
103 0 : reservable_ = rhs.reservable_;
104 0 : drain_trackers_ = std::move(rhs.drain_trackers_);
105 0 : account_ = std::move(rhs.account_);
106 0 : releasor_.swap(rhs.releasor_);
107 0 :
108 0 : rhs.capacity_ = 0;
109 0 : rhs.base_ = nullptr;
110 0 : rhs.data_ = 0;
111 0 : rhs.reservable_ = 0;
112 0 : }
113 :
114 6651421 : Slice& operator=(Slice&& rhs) noexcept {
115 6651421 : if (this != &rhs) {
116 6651420 : callAndClearDrainTrackersAndCharges();
117 :
118 6651420 : capacity_ = rhs.capacity_;
119 6651420 : storage_ = std::move(rhs.storage_);
120 6651420 : base_ = rhs.base_;
121 6651420 : data_ = rhs.data_;
122 6651420 : reservable_ = rhs.reservable_;
123 6651420 : drain_trackers_ = std::move(rhs.drain_trackers_);
124 6651420 : account_ = std::move(rhs.account_);
125 6651420 : if (releasor_) {
126 767 : releasor_();
127 767 : }
128 6651420 : releasor_ = rhs.releasor_;
129 6651420 : rhs.releasor_ = nullptr;
130 :
131 6651420 : rhs.capacity_ = 0;
132 6651420 : rhs.base_ = nullptr;
133 6651420 : rhs.data_ = 0;
134 6651420 : rhs.reservable_ = 0;
135 6651420 : }
136 :
137 6651421 : return *this;
138 6651421 : }
139 :
140 22440521 : ~Slice() {
141 22440521 : callAndClearDrainTrackersAndCharges();
142 22440521 : if (releasor_) {
143 1807 : releasor_();
144 1807 : }
145 22440521 : }
146 :
147 : /**
148 : * @return true if the data in the slice is mutable
149 : */
150 0 : bool isMutable() const { return storage_ != nullptr; }
151 :
152 : /**
153 : * @return true if content in this Slice can be coalesced into another Slice.
154 : */
155 1989969 : bool canCoalesce() const { return storage_ != nullptr; }
156 :
157 : /**
158 : * @return a pointer to the start of the usable content.
159 : */
160 1415933 : const uint8_t* data() const { return base_ + data_; }
161 :
162 : /**
163 : * @return a pointer to the start of the usable content.
164 : */
165 26730 : uint8_t* data() { return base_ + data_; }
166 :
167 : /**
168 : * @return the size in bytes of the usable content.
169 : */
170 8073975 : uint64_t dataSize() const { return reservable_ - data_; }
171 :
172 : /**
173 : * Remove the first `size` bytes of usable content. Runs in O(1) time.
174 : * @param size number of bytes to remove. If greater than data_size(), the result is undefined.
175 : */
176 22499 : void drain(uint64_t size) {
177 22499 : ASSERT(data_ + size <= reservable_);
178 22499 : data_ += size;
179 22499 : if (data_ == reservable_) {
180 : // All the data in the slice has been drained. Reset the offsets so all
181 : // the data can be reused.
182 0 : data_ = 0;
183 0 : reservable_ = 0;
184 0 : }
185 22499 : }
186 :
187 : /**
188 : * @return the number of bytes available to be reserved.
189 : * @note Read-only implementations of Slice should return zero from this method.
190 : */
191 1673066 : uint64_t reservableSize() const {
192 1673066 : ASSERT(capacity_ >= reservable_);
193 1673066 : return capacity_ - reservable_;
194 1673066 : }
195 :
196 : /**
197 : * Reserve `size` bytes that the caller can populate with content. The caller SHOULD then
198 : * call commit() to add the newly populated content from the Reserved section to the Data
199 : * section.
200 : * @note If there is already an outstanding reservation (i.e., a reservation obtained
201 : * from reserve() that has not been released by calling commit()), this method will
202 : * return a new reservation that replaces it.
203 : * @param size the number of bytes to reserve. The Slice implementation MAY reserve
204 : * fewer bytes than requested (for example, if it doesn't have enough room in the
205 : * Reservable section to fulfill the whole request).
206 : * @return a tuple containing the address of the start of resulting reservation and the
207 : * reservation size in bytes. If the address is null, the reservation failed.
208 : * @note Read-only implementations of Slice should return {nullptr, 0} from this method.
209 : */
210 6000 : Reservation reserve(uint64_t size) {
211 6000 : if (size == 0) {
212 0 : return {nullptr, 0};
213 0 : }
214 : // Verify the semantics that drain() enforces: if the slice is empty, either because
215 : // no data has been added or because all the added data has been drained, the data
216 : // section is at the very start of the slice.
217 6000 : ASSERT(!(dataSize() == 0 && data_ > 0));
218 6000 : uint64_t available_size = capacity_ - reservable_;
219 6000 : if (available_size == 0) {
220 0 : return {nullptr, 0};
221 0 : }
222 6000 : uint64_t reservation_size = std::min(size, available_size);
223 6000 : void* reservation = &(base_[reservable_]);
224 6000 : return {reservation, static_cast<size_t>(reservation_size)};
225 6000 : }
226 :
227 : /**
228 : * Commit a Reservation that was previously obtained from a call to reserve().
229 : * The Reservation's size is added to the Data section.
230 : * @param reservation a reservation obtained from a previous call to reserve().
231 : * If the reservation is not from this Slice, commit() will return false.
232 : * If the caller is committing fewer bytes than provided by reserve(), it
233 : * should change the len_ field of the reservation before calling commit().
234 : * For example, if a caller reserve()s 4KB to do a nonblocking socket read,
235 : * and the read only returns two bytes, the caller should set
236 : * reservation.len_ = 2 and then call `commit(reservation)`.
237 : * @return whether the Reservation was successfully committed to the Slice.
238 : * @note template parameter `SafeCommit` can be used to disable memory range check.
239 : */
240 4051 : template <bool SafeCommit = true> bool commit(const Reservation& reservation) {
241 4051 : if constexpr (SafeCommit) {
242 4031 : if (static_cast<const uint8_t*>(reservation.mem_) != base_ + reservable_ ||
243 26 : reservable_ + reservation.len_ > capacity_ || reservable_ >= capacity_) {
244 : // The reservation is not from this Slice.
245 0 : return false;
246 0 : }
247 4031 : } else {
248 4025 : ASSERT(static_cast<const uint8_t*>(reservation.mem_) == base_ + reservable_ &&
249 4025 : reservable_ + reservation.len_ <= capacity_);
250 4025 : }
251 26 : reservable_ += reservation.len_;
252 26 : return true;
253 4051 : }
254 :
255 : /**
256 : * Copy as much of the supplied data as possible to the end of the slice.
257 : * @param data start of the data to copy.
258 : * @param size number of bytes to copy.
259 : * @return number of bytes copied (may be a smaller than size, may even be zero).
260 : */
261 1664596 : uint64_t append(const void* data, uint64_t size) {
262 1664596 : uint64_t copy_size = std::min(size, reservableSize());
263 1664596 : if (copy_size == 0) {
264 331 : return 0;
265 331 : }
266 1664265 : uint8_t* dest = base_ + reservable_;
267 1664265 : reservable_ += copy_size;
268 : // NOLINTNEXTLINE(clang-analyzer-core.NullDereference)
269 1664265 : memcpy(dest, data, copy_size); // NOLINT(safe-memcpy)
270 1664265 : return copy_size;
271 1664596 : }
272 :
273 : /**
274 : * Copy as much of the supplied data as possible to the front of the slice.
275 : * If only part of the data will fit in the slice, the bytes from the _end_ are
276 : * copied.
277 : * @param data start of the data to copy.
278 : * @param size number of bytes to copy.
279 : * @return number of bytes copied (may be a smaller than size, may even be zero).
280 : */
281 1390 : uint64_t prepend(const void* data, uint64_t size) {
282 1390 : const uint8_t* src = static_cast<const uint8_t*>(data);
283 1390 : uint64_t copy_size;
284 1390 : if (dataSize() == 0) {
285 : // There is nothing in the slice, so put the data at the very end in case the caller
286 : // later tries to prepend anything else in front of it.
287 740 : copy_size = std::min(size, reservableSize());
288 740 : reservable_ = capacity_;
289 740 : data_ = capacity_ - copy_size;
290 740 : } else {
291 650 : if (data_ == 0) {
292 : // There is content in the slice, and no space in front of it to write anything.
293 632 : return 0;
294 632 : }
295 : // Write into the space in front of the slice's current content.
296 18 : copy_size = std::min(size, data_);
297 18 : data_ -= copy_size;
298 18 : }
299 758 : memcpy(base_ + data_, src + size - copy_size, copy_size); // NOLINT(safe-memcpy)
300 758 : return copy_size;
301 1390 : }
302 :
303 : /**
304 : * Describe the in-memory representation of the slice. For use
305 : * in tests that want to make assertions about the specific arrangement of
306 : * bytes in a slice.
307 : */
308 : struct SliceRepresentation {
309 : uint64_t data;
310 : uint64_t reservable;
311 : uint64_t capacity;
312 : };
313 0 : SliceRepresentation describeSliceForTest() const {
314 0 : return SliceRepresentation{dataSize(), reservableSize(), capacity_};
315 0 : }
316 :
317 : /**
318 : * Move all drain trackers and charges from the current slice to the destination slice.
319 : */
320 4059 : void transferDrainTrackersTo(Slice& destination) {
321 4059 : destination.drain_trackers_.splice(destination.drain_trackers_.end(), drain_trackers_);
322 4059 : ASSERT(drain_trackers_.empty());
323 : // The releasor needn't to be transferred, and actually if there is releasor, this
324 : // slice can't coalesce. Then there won't be a chance to calling this method.
325 4059 : ASSERT(releasor_ == nullptr);
326 4059 : }
327 :
328 : /**
329 : * Add a drain tracker to the slice.
330 : */
331 47392 : void addDrainTracker(std::function<void()> drain_tracker) {
332 47392 : drain_trackers_.emplace_back(std::move(drain_tracker));
333 47392 : }
334 :
335 : /**
336 : * Call all drain trackers associated with the slice, then clear
337 : * the drain tracker list.
338 : */
339 29091947 : void callAndClearDrainTrackersAndCharges() {
340 29091947 : for (const auto& drain_tracker : drain_trackers_) {
341 47392 : drain_tracker();
342 47392 : }
343 29091947 : drain_trackers_.clear();
344 :
345 29091947 : if (account_) {
346 0 : account_->credit(capacity_);
347 0 : account_.reset();
348 0 : }
349 29091947 : }
350 :
351 : /**
352 : * Charges the provided account for the resources if these conditions hold:
353 : * - we're not already charging for this slice
354 : * - the given account is non-null
355 : * - the slice owns backing memory
356 : */
357 1986114 : void maybeChargeAccount(const BufferMemoryAccountSharedPtr& account) {
358 1986115 : if (account_ != nullptr || storage_ == nullptr || account == nullptr) {
359 1986115 : return;
360 1986115 : }
361 0 : account->charge(capacity_);
362 0 : account_ = account;
363 0 : }
364 :
365 : static constexpr uint32_t default_slice_size_ = 16384;
366 :
367 : public:
368 : /**
369 : * Compute a slice size big enough to hold a specified amount of data.
370 : * @param data_size the minimum amount of data the slice must be able to store, in bytes.
371 : * @return a recommended slice size, in bytes.
372 : */
373 1422631 : static uint64_t sliceSize(uint64_t data_size) {
374 1422631 : static constexpr uint64_t PageSize = 4096;
375 1422631 : const uint64_t num_pages = (data_size + PageSize - 1) / PageSize;
376 1422631 : return num_pages * PageSize;
377 1422631 : }
378 :
379 : /**
380 : * Create new backend storage with min capacity. This method will create a recommended capacity
381 : * which will bigger or equal to the min capacity and create new backend storage based on the
382 : * recommended capacity.
383 : * @param min_capacity the min capacity of new created backend storage.
384 : * @return a backend storage for slice.
385 : */
386 2854 : static inline SizedStorage newStorage(uint64_t min_capacity) {
387 2854 : const uint64_t slice_size = sliceSize(min_capacity);
388 2854 : return {StoragePtr{new uint8_t[slice_size]}, static_cast<size_t>(slice_size)};
389 2854 : }
390 :
391 : protected:
392 : /** Length of the byte array that base_ points to. This is also the offset in bytes from the start
393 : * of the slice to the end of the Reservable section. */
394 : uint64_t capacity_ = 0;
395 :
396 : /** Backing storage for mutable slices which own their own storage. This storage should never be
397 : * accessed directly; access base_ instead. */
398 : StoragePtr storage_;
399 :
400 : /** Start of the slice. Points to storage_ iff the slice owns its own storage. */
401 : uint8_t* base_{nullptr};
402 :
403 : /** Offset in bytes from the start of the slice to the start of the Data section. */
404 : uint64_t data_ = 0;
405 :
406 : /** Offset in bytes from the start of the slice to the start of the Reservable section which is
407 : * also the end of the Data section. */
408 : uint64_t reservable_ = 0;
409 :
410 : /** Hooks to execute when the slice is destroyed. */
411 : std::list<std::function<void()>> drain_trackers_;
412 :
413 : /** Account associated with this slice. This may be null. When
414 : * coalescing with another slice, we do not transfer over their account. */
415 : BufferMemoryAccountSharedPtr account_;
416 :
417 : /** The releasor for the BufferFragment */
418 : std::function<void()> releasor_;
419 : };
420 :
421 : class OwnedImpl;
422 :
423 : class SliceDataImpl : public SliceData {
424 : public:
425 0 : explicit SliceDataImpl(Slice&& slice) : slice_(std::move(slice)) {}
426 :
427 : // SliceData
428 0 : absl::Span<uint8_t> getMutableData() override {
429 0 : RELEASE_ASSERT(slice_.isMutable(), "Not allowed to call getMutableData if slice is immutable");
430 0 : return {slice_.data(), static_cast<absl::Span<uint8_t>::size_type>(slice_.dataSize())};
431 0 : }
432 :
433 : private:
434 : friend OwnedImpl;
435 : Slice slice_;
436 : };
437 :
438 : /**
439 : * Queue of Slice that supports efficient read and write access to both
440 : * the front and the back of the queue.
441 : * @note This class has similar properties to std::deque<T>. The reason for using
442 : * a custom deque implementation is that benchmark testing during development
443 : * revealed that std::deque was too slow to reach performance parity with the
444 : * prior evbuffer-based buffer implementation.
445 : */
446 : class SliceDeque {
447 : public:
448 2221795 : SliceDeque() : ring_(inline_ring_), capacity_(InlineRingCapacity) {}
449 :
450 16 : SliceDeque(SliceDeque&& rhs) noexcept {
451 : // This custom move constructor is needed so that ring_ will be updated properly.
452 16 : std::move(rhs.inline_ring_, rhs.inline_ring_ + InlineRingCapacity, inline_ring_);
453 16 : external_ring_ = std::move(rhs.external_ring_);
454 16 : ring_ = (external_ring_ != nullptr) ? external_ring_.get() : inline_ring_;
455 16 : start_ = rhs.start_;
456 16 : size_ = rhs.size_;
457 16 : capacity_ = rhs.capacity_;
458 16 : }
459 :
460 0 : SliceDeque& operator=(SliceDeque&& rhs) noexcept {
461 0 : // This custom assignment move operator is needed so that ring_ will be updated properly.
462 0 : std::move(rhs.inline_ring_, rhs.inline_ring_ + InlineRingCapacity, inline_ring_);
463 0 : external_ring_ = std::move(rhs.external_ring_);
464 0 : ring_ = (external_ring_ != nullptr) ? external_ring_.get() : inline_ring_;
465 0 : start_ = rhs.start_;
466 0 : size_ = rhs.size_;
467 0 : capacity_ = rhs.capacity_;
468 0 : return *this;
469 0 : }
470 :
471 3412306 : void emplace_back(Slice&& slice) { // NOLINT(readability-identifier-naming)
472 3412306 : growRing();
473 3412306 : size_t index = internalIndex(size_);
474 3412306 : ring_[index] = std::move(slice);
475 3412306 : size_++;
476 3412306 : }
477 :
478 959 : void emplace_front(Slice&& slice) { // NOLINT(readability-identifier-naming)
479 959 : growRing();
480 959 : start_ = (start_ == 0) ? capacity_ - 1 : start_ - 1;
481 959 : ring_[start_] = std::move(slice);
482 959 : size_++;
483 959 : }
484 :
485 9137645 : bool empty() const { return size() == 0; }
486 13762630 : size_t size() const { return size_; }
487 :
488 8534029 : Slice& front() { return ring_[start_]; }
489 0 : const Slice& front() const { return ring_[start_]; }
490 1733887 : Slice& back() { return ring_[internalIndex(size_ - 1)]; }
491 0 : const Slice& back() const { return ring_[internalIndex(size_ - 1)]; }
492 :
493 860 : Slice& operator[](size_t i) {
494 860 : ASSERT(!empty());
495 860 : return ring_[internalIndex(i)];
496 860 : }
497 1416927 : const Slice& operator[](size_t i) const {
498 1416927 : ASSERT(!empty());
499 1416927 : return ring_[internalIndex(i)];
500 1416927 : }
501 :
502 3236901 : void pop_front() { // NOLINT(readability-identifier-naming)
503 3236901 : if (size() == 0) {
504 0 : return;
505 0 : }
506 3236901 : front() = Slice();
507 3236901 : size_--;
508 3236901 : start_++;
509 3236901 : if (start_ == capacity_) {
510 125810 : start_ = 0;
511 125810 : }
512 3236901 : }
513 :
514 215 : void pop_back() { // NOLINT(readability-identifier-naming)
515 215 : if (size() == 0) {
516 0 : return;
517 0 : }
518 215 : back() = Slice();
519 215 : size_--;
520 215 : }
521 :
522 : /**
523 : * Forward const iterator for SliceDeque.
524 : * @note this implementation currently supports the minimum functionality needed to support
525 : * the `for (const auto& slice : slice_deque)` idiom.
526 : */
527 : class ConstIterator {
528 : public:
529 1416569 : const Slice& operator*() { return deque_[index_]; }
530 :
531 1412157 : ConstIterator operator++() {
532 1412157 : index_++;
533 1412157 : return *this;
534 1412157 : }
535 :
536 2818994 : bool operator!=(const ConstIterator& rhs) const {
537 2818995 : return &deque_ != &rhs.deque_ || index_ != rhs.index_;
538 2818994 : }
539 :
540 : friend class SliceDeque;
541 :
542 : private:
543 2813664 : ConstIterator(const SliceDeque& deque, size_t index) : deque_(deque), index_(index) {}
544 : const SliceDeque& deque_;
545 : size_t index_;
546 : };
547 :
548 1406834 : ConstIterator begin() const noexcept { return {*this, 0}; }
549 :
550 1406831 : ConstIterator end() const noexcept { return {*this, size_}; }
551 :
552 : private:
553 : constexpr static size_t InlineRingCapacity = 8;
554 :
555 6563982 : size_t internalIndex(size_t index) const {
556 6563982 : size_t internal_index = start_ + index;
557 6563982 : if (internal_index >= capacity_) {
558 11268 : internal_index -= capacity_;
559 11268 : ASSERT(internal_index < capacity_);
560 11268 : }
561 6563982 : return internal_index;
562 6563982 : }
563 :
564 3413268 : void growRing() {
565 3413271 : if (size_ < capacity_) {
566 3413193 : return;
567 3413193 : }
568 78 : const size_t new_capacity = capacity_ * 2;
569 78 : auto new_ring = std::make_unique<Slice[]>(new_capacity);
570 78 : size_t src = start_;
571 78 : size_t dst = 0;
572 990 : for (size_t i = 0; i < size_; i++) {
573 912 : new_ring[dst++] = std::move(ring_[src++]);
574 912 : if (src == capacity_) {
575 75 : src = 0;
576 75 : }
577 912 : }
578 78 : external_ring_.swap(new_ring);
579 78 : ring_ = external_ring_.get();
580 78 : start_ = 0;
581 78 : capacity_ = new_capacity;
582 78 : }
583 :
584 : Slice inline_ring_[InlineRingCapacity];
585 : std::unique_ptr<Slice[]> external_ring_;
586 : Slice* ring_; // points to start of either inline or external ring.
587 : size_t start_{0};
588 : size_t size_{0};
589 : size_t capacity_;
590 : };
591 :
592 : /**
593 : * An implementation of BufferFragment where a releasor callback is called when the data is
594 : * no longer needed.
595 : */
596 : class BufferFragmentImpl : NonCopyable, public BufferFragment {
597 : public:
598 : /**
599 : * Creates a new wrapper around the externally owned <data> of size <size>.
600 : * The caller must ensure <data> is valid until releasor() is called, or for the lifetime of the
601 : * fragment. releasor() is called with <data>, <size> and <this> to allow caller to delete
602 : * the fragment object.
603 : * @param data external data to reference
604 : * @param size size of data
605 : * @param releasor a callback function to be called when data is no longer needed.
606 : */
607 : BufferFragmentImpl(
608 : const void* data, size_t size,
609 : absl::AnyInvocable<void(const void*, size_t, const BufferFragmentImpl*)> releasor)
610 2780 : : data_(data), size_(size), releasor_(std::move(releasor)) {}
611 :
612 : // Buffer::BufferFragment
613 2780 : const void* data() const override { return data_; }
614 7194 : size_t size() const override { return size_; }
615 2780 : void done() override {
616 2780 : if (releasor_) {
617 2780 : releasor_(data_, size_, this);
618 2780 : }
619 2780 : }
620 :
621 : private:
622 : const void* const data_;
623 : const size_t size_;
624 : absl::AnyInvocable<void(const void*, size_t, const BufferFragmentImpl*)> releasor_;
625 : };
626 :
627 : class LibEventInstance : public Instance {
628 : public:
629 : // Called after accessing the memory in buffer() directly to allow any post-processing.
630 : virtual void postProcess() PURE;
631 : };
632 :
633 : /**
634 : * Wrapper for uint64_t that asserts upon integer overflow and underflow.
635 : */
636 : class OverflowDetectingUInt64 {
637 : public:
638 5560417 : operator uint64_t() const { return value_; }
639 :
640 3663535 : OverflowDetectingUInt64& operator+=(uint64_t size) {
641 3663535 : uint64_t new_value = value_ + size;
642 3663535 : RELEASE_ASSERT(new_value >= value_, "64-bit unsigned integer overflowed");
643 3663535 : value_ = new_value;
644 3663535 : return *this;
645 3663535 : }
646 :
647 3259220 : OverflowDetectingUInt64& operator-=(uint64_t size) {
648 3259220 : RELEASE_ASSERT(value_ >= size, "unsigned integer underflowed");
649 3259220 : value_ -= size;
650 3259220 : return *this;
651 3259220 : }
652 :
653 : private:
654 : uint64_t value_{0};
655 : };
656 :
657 : /**
658 : * Wraps an allocated and owned buffer.
659 : *
660 : * Note that due to the internals of move(), OwnedImpl is not
661 : * compatible with non-OwnedImpl buffers.
662 : */
663 : class OwnedImpl : public LibEventInstance {
664 : public:
665 : OwnedImpl();
666 : OwnedImpl(absl::string_view data);
667 : OwnedImpl(const Instance& data);
668 : OwnedImpl(const void* data, uint64_t size);
669 : OwnedImpl(BufferMemoryAccountSharedPtr account);
670 :
671 : // Buffer::Instance
672 : void addDrainTracker(std::function<void()> drain_tracker) override;
673 : void bindAccount(BufferMemoryAccountSharedPtr account) override;
674 : void add(const void* data, uint64_t size) override;
675 : void addBufferFragment(BufferFragment& fragment) override;
676 : void add(absl::string_view data) override;
677 : void add(const Instance& data) override;
678 : void prepend(absl::string_view data) override;
679 : void prepend(Instance& data) override;
680 : void copyOut(size_t start, uint64_t size, void* data) const override;
681 : uint64_t copyOutToSlices(uint64_t size, Buffer::RawSlice* slices,
682 : uint64_t num_slice) const override;
683 : void drain(uint64_t size) override;
684 : RawSliceVector getRawSlices(absl::optional<uint64_t> max_slices = absl::nullopt) const override;
685 : RawSlice frontSlice() const override;
686 : SliceDataPtr extractMutableFrontSlice() override;
687 : uint64_t length() const override;
688 : void* linearize(uint32_t size) override;
689 : void move(Instance& rhs) override;
690 : void move(Instance& rhs, uint64_t length) override;
691 : void move(Instance& rhs, uint64_t length, bool reset_drain_trackers_and_accounting) override;
692 : Reservation reserveForRead() override;
693 : ReservationSingleSlice reserveSingleSlice(uint64_t length, bool separate_slice = false) override;
694 : ssize_t search(const void* data, uint64_t size, size_t start, size_t length) const override;
695 : bool startsWith(absl::string_view data) const override;
696 : std::string toString() const override;
697 :
698 : // LibEventInstance
699 : void postProcess() override;
700 :
701 : /**
702 : * Create a new slice at the end of the buffer, and copy the supplied content into it.
703 : * @param data start of the content to copy.
704 : *
705 : */
706 : virtual void appendSliceForTest(const void* data, uint64_t size);
707 :
708 : /**
709 : * Create a new slice at the end of the buffer, and copy the supplied string into it.
710 : * @param data the string to append to the buffer.
711 : */
712 : virtual void appendSliceForTest(absl::string_view data);
713 :
714 : /**
715 : * @return the BufferMemoryAccount bound to this buffer, if any.
716 : */
717 : BufferMemoryAccountSharedPtr getAccountForTest();
718 :
719 : // Does not implement watermarking.
720 : // TODO(antoniovicente) Implement watermarks by merging the OwnedImpl and WatermarkBuffer
721 : // implementations. Also, make high-watermark config a constructor argument.
722 0 : void setWatermarks(uint32_t, uint32_t) override { ASSERT(false, "watermarks not implemented."); }
723 0 : uint32_t highWatermark() const override { return 0; }
724 0 : bool highWatermarkTriggered() const override { return false; }
725 :
726 : /**
727 : * Describe the in-memory representation of the slices in the buffer. For use
728 : * in tests that want to make assertions about the specific arrangement of
729 : * bytes in the buffer.
730 : */
731 : std::vector<Slice::SliceRepresentation> describeSlicesForTest() const;
732 :
733 : /**
734 : * Create a reservation for reading with a non-default length. Used in benchmark tests.
735 : */
736 0 : Reservation reserveForReadWithLengthForTest(uint64_t length) {
737 0 : return reserveWithMaxLength(length);
738 0 : }
739 :
740 : size_t addFragments(absl::Span<const absl::string_view> fragments) override;
741 :
742 : protected:
743 : static constexpr uint64_t default_read_reservation_size_ =
744 : Reservation::MAX_SLICES_ * Slice::default_slice_size_;
745 :
746 : /**
747 : * Create a reservation with a maximum length.
748 : */
749 : Reservation reserveWithMaxLength(uint64_t max_length);
750 :
751 : void commit(uint64_t length, absl::Span<RawSlice> slices,
752 : ReservationSlicesOwnerPtr slices_owner) override;
753 :
754 : private:
755 : /**
756 : * @param rhs another buffer
757 : * @return whether the rhs buffer is also an instance of OwnedImpl (or a subclass) that
758 : * uses the same internal implementation as this buffer.
759 : */
760 : bool isSameBufferImpl(const Instance& rhs) const;
761 :
762 : void addImpl(const void* data, uint64_t size);
763 : void drainImpl(uint64_t size);
764 :
765 : /**
766 : * Moves contents of the `other_slice` by either taking its ownership or coalescing it
767 : * into an existing slice.
768 : * NOTE: the caller is responsible for draining the buffer that contains the `other_slice`.
769 : */
770 : void coalesceOrAddSlice(Slice&& other_slice);
771 :
772 : /** Ring buffer of slices. */
773 : SliceDeque slices_;
774 :
775 : /** Sum of the dataSize of all slices. */
776 : OverflowDetectingUInt64 length_;
777 :
778 : BufferMemoryAccountSharedPtr account_;
779 :
780 : struct OwnedImplReservationSlicesOwner : public ReservationSlicesOwner {
781 : virtual absl::Span<Slice::SizedStorage> ownedStorages() PURE;
782 : };
783 :
784 : struct OwnedImplReservationSlicesOwnerMultiple : public OwnedImplReservationSlicesOwner {
785 : public:
786 : static constexpr uint32_t free_list_max_ = Buffer::Reservation::MAX_SLICES_;
787 :
788 6010 : OwnedImplReservationSlicesOwnerMultiple() : free_list_ref_(free_list_) {}
789 6006 : ~OwnedImplReservationSlicesOwnerMultiple() override {
790 53760 : for (auto r = owned_storages_.rbegin(); r != owned_storages_.rend(); r++) {
791 47754 : if (r->mem_ != nullptr) {
792 42174 : ASSERT(r->len_ == Slice::default_slice_size_);
793 42179 : if (free_list_ref_.size() < free_list_max_) {
794 42176 : free_list_ref_.push_back(std::move(r->mem_));
795 42176 : }
796 42174 : }
797 47754 : }
798 6006 : }
799 :
800 45231 : Slice::SizedStorage newStorage() {
801 45231 : ASSERT(Slice::sliceSize(Slice::default_slice_size_) == Slice::default_slice_size_);
802 :
803 45231 : Slice::SizedStorage storage{nullptr, Slice::default_slice_size_};
804 45231 : if (!free_list_ref_.empty()) {
805 40505 : storage.mem_ = std::move(free_list_ref_.back());
806 40505 : free_list_ref_.pop_back();
807 40505 : } else {
808 4726 : storage.mem_.reset(new uint8_t[Slice::default_slice_size_]);
809 4726 : }
810 :
811 45231 : return storage;
812 45231 : }
813 :
814 3411 : absl::Span<Slice::SizedStorage> ownedStorages() override {
815 3411 : return absl::MakeSpan(owned_storages_);
816 3411 : }
817 :
818 : absl::InlinedVector<Slice::SizedStorage, Buffer::Reservation::MAX_SLICES_> owned_storages_;
819 :
820 : private:
821 : // Thread local resolving introduces additional overhead. Initialize this reference once when
822 : // constructing the owner to reduce thread local resolving to improve performance.
823 : absl::InlinedVector<Slice::StoragePtr, free_list_max_>& free_list_ref_;
824 :
825 : // Simple thread local cache to reduce unnecessary memory allocation and release. This cache
826 : // is currently only used for multiple slices reservation because of the additional overhead
827 : // that thread local resolving would introduce.
828 : static thread_local absl::InlinedVector<Slice::StoragePtr, free_list_max_> free_list_;
829 : };
830 :
831 : struct OwnedImplReservationSlicesOwnerSingle : public OwnedImplReservationSlicesOwner {
832 1765 : absl::Span<Slice::SizedStorage> ownedStorages() override {
833 1765 : return absl::MakeSpan(&owned_storage_, 1);
834 1765 : }
835 :
836 : Slice::SizedStorage owned_storage_;
837 : };
838 : };
839 :
840 : using BufferFragmentPtr = std::unique_ptr<BufferFragment>;
841 :
842 : /**
843 : * An implementation of BufferFragment where a releasor callback is called when the data is
844 : * no longer needed. Copies data into internal buffer.
845 : */
846 : class OwnedBufferFragmentImpl final : public BufferFragment, public InlineStorage {
847 : public:
848 : using Releasor = std::function<void(const OwnedBufferFragmentImpl*)>;
849 :
850 : /**
851 : * Copies the data into internal buffer. The releasor is called when the data has been
852 : * fully drained or the buffer that contains this fragment is destroyed.
853 : * @param data external data to reference
854 : * @param releasor a callback function to be called when data is no longer needed.
855 : */
856 :
857 367 : static BufferFragmentPtr create(absl::string_view data, const Releasor& releasor) {
858 367 : return BufferFragmentPtr(new (sizeof(OwnedBufferFragmentImpl) + data.size())
859 367 : OwnedBufferFragmentImpl(data, releasor));
860 367 : }
861 :
862 : // Buffer::BufferFragment
863 367 : const void* data() const override { return data_; }
864 1101 : size_t size() const override { return size_; }
865 367 : void done() override { releasor_(this); }
866 :
867 : private:
868 : OwnedBufferFragmentImpl(absl::string_view data, const Releasor& releasor)
869 367 : : releasor_(releasor), size_(data.size()) {
870 367 : ASSERT(releasor != nullptr);
871 367 : memcpy(data_, data.data(), data.size()); // NOLINT(safe-memcpy)
872 367 : }
873 :
874 : const Releasor releasor_;
875 : const size_t size_;
876 : uint8_t data_[];
877 : };
878 :
879 : using OwnedBufferFragmentImplPtr = std::unique_ptr<OwnedBufferFragmentImpl>;
880 :
881 : } // namespace Buffer
882 : } // namespace Envoy
|