/proc/self/cwd/source/common/network/listener_filter_buffer_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/common/network/listener_filter_buffer_impl.h" |
2 | | |
3 | | #include <string> |
4 | | |
5 | | namespace Envoy { |
6 | | namespace Network { |
7 | | |
8 | | ListenerFilterBufferImpl::ListenerFilterBufferImpl(IoHandle& io_handle, |
9 | | Event::Dispatcher& dispatcher, |
10 | | ListenerFilterBufferOnCloseCb close_cb, |
11 | | ListenerFilterBufferOnDataCb on_data_cb, |
12 | | uint64_t buffer_size) |
13 | | : io_handle_(io_handle), dispatcher_(dispatcher), on_close_cb_(close_cb), |
14 | | on_data_cb_(on_data_cb), buffer_(std::make_unique<uint8_t[]>(buffer_size)), |
15 | 115 | base_(buffer_.get()), buffer_size_(buffer_size) { |
16 | | // If the buffer_size not greater than 0, it means that doesn't expect any data. |
17 | 115 | ASSERT(buffer_size > 0); |
18 | | |
19 | 115 | io_handle_.initializeFileEvent( |
20 | 605 | dispatcher_, [this](uint32_t events) { onFileEvent(events); }, |
21 | 115 | Event::PlatformDefaultTriggerType, Event::FileReadyType::Read | Event::FileReadyType::Closed); |
22 | 115 | } |
23 | | |
24 | 1.43k | const Buffer::ConstRawSlice ListenerFilterBufferImpl::rawSlice() const { |
25 | 1.43k | Buffer::ConstRawSlice slice; |
26 | 1.43k | slice.mem_ = base_; |
27 | 1.43k | slice.len_ = data_size_; |
28 | 1.43k | return slice; |
29 | 1.43k | } |
30 | | |
31 | 607 | bool ListenerFilterBufferImpl::drain(uint64_t length) { |
32 | 607 | if (length == 0) { |
33 | 380 | return true; |
34 | 380 | } |
35 | | |
36 | 227 | ASSERT(length <= data_size_); |
37 | | |
38 | 227 | uint64_t read_size = 0; |
39 | 454 | while (read_size < length) { |
40 | 227 | auto result = io_handle_.recv(base_, length - read_size, 0); |
41 | 227 | ENVOY_LOG(trace, "recv returned: {}", result.return_value_); |
42 | | |
43 | 227 | if (!result.ok()) { |
44 | | // `IoErrorCode::Again` isn't processed here, since |
45 | | // the data already in the socket buffer. |
46 | 0 | return false; |
47 | 0 | } |
48 | 227 | read_size += result.return_value_; |
49 | 227 | } |
50 | 227 | base_ += length; |
51 | 227 | data_size_ -= length; |
52 | 227 | return true; |
53 | 227 | } |
54 | | |
55 | 603 | PeekState ListenerFilterBufferImpl::peekFromSocket() { |
56 | | // Reset buffer base in case of draining changed base. |
57 | 603 | auto old_base = base_; |
58 | 603 | base_ = buffer_.get(); |
59 | 603 | const auto result = io_handle_.recv(base_, buffer_size_, MSG_PEEK); |
60 | 603 | ENVOY_LOG(trace, "recv returned: {}", result.return_value_); |
61 | | |
62 | 603 | if (!result.ok()) { |
63 | 378 | if (result.err_->getErrorCode() == Api::IoError::IoErrorCode::Again) { |
64 | 378 | ENVOY_LOG(trace, "recv return try again"); |
65 | 378 | base_ = old_base; |
66 | 378 | return PeekState::Again; |
67 | 378 | } |
68 | 0 | ENVOY_LOG(debug, "recv failed: {}: {}", static_cast<int>(result.err_->getErrorCode()), |
69 | 0 | result.err_->getErrorDetails()); |
70 | 0 | return PeekState::Error; |
71 | 378 | } |
72 | | // Remote closed |
73 | 225 | if (result.return_value_ == 0) { |
74 | 0 | ENVOY_LOG(debug, "recv failed: remote closed"); |
75 | 0 | return PeekState::RemoteClose; |
76 | 0 | } |
77 | 225 | data_size_ = result.return_value_; |
78 | 225 | ASSERT(data_size_ <= buffer_size_); |
79 | | |
80 | 225 | return PeekState::Done; |
81 | 225 | } |
82 | | |
83 | 0 | void ListenerFilterBufferImpl::resetCapacity(uint64_t size) { |
84 | 0 | buffer_ = std::make_unique<uint8_t[]>(size); |
85 | 0 | base_ = buffer_.get(); |
86 | 0 | buffer_size_ = size; |
87 | 0 | data_size_ = 0; |
88 | 0 | } |
89 | | |
90 | 2 | void ListenerFilterBufferImpl::activateFileEvent(uint32_t events) { |
91 | 2 | io_handle_.activateFileEvents(events); |
92 | 2 | } |
93 | | |
94 | 605 | void ListenerFilterBufferImpl::onFileEvent(uint32_t events) { |
95 | 605 | ENVOY_LOG(trace, "onFileEvent: {}", events); |
96 | | |
97 | 605 | if (events & Event::FileReadyType::Closed) { |
98 | 2 | on_close_cb_(false); |
99 | 2 | return; |
100 | 2 | } |
101 | | |
102 | 603 | ASSERT(events == Event::FileReadyType::Read); |
103 | | |
104 | 603 | auto state = peekFromSocket(); |
105 | 603 | if (state == PeekState::Done) { |
106 | 225 | on_data_cb_(*this); |
107 | 378 | } else if (state == PeekState::Error) { |
108 | 0 | on_close_cb_(true); |
109 | 378 | } else if (state == PeekState::RemoteClose) { |
110 | 0 | on_close_cb_(false); |
111 | 0 | } |
112 | | // Did nothing for `Api::IoError::IoErrorCode::Again` |
113 | 603 | } |
114 | | |
115 | | } // namespace Network |
116 | | } // namespace Envoy |