/proc/self/cwd/source/extensions/io_socket/user_space/io_handle_impl.cc
Line | Count | Source (jump to first uncovered line) |
1 | | #include "source/extensions/io_socket/user_space/io_handle_impl.h" |
2 | | |
3 | | #include "envoy/buffer/buffer.h" |
4 | | #include "envoy/common/platform.h" |
5 | | |
6 | | #include "source/common/api/os_sys_calls_impl.h" |
7 | | #include "source/common/common/assert.h" |
8 | | #include "source/common/common/utility.h" |
9 | | #include "source/common/network/address_impl.h" |
10 | | #include "source/extensions/io_socket/user_space/file_event_impl.h" |
11 | | |
12 | | #include "absl/types/optional.h" |
13 | | |
14 | | namespace Envoy { |
15 | | |
16 | | namespace Extensions { |
17 | | namespace IoSocket { |
18 | | namespace UserSpace { |
19 | | namespace { |
20 | 0 | Api::SysCallIntResult makeInvalidSyscallResult() { |
21 | 0 | return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP}; |
22 | 0 | } |
23 | | |
24 | | /** |
25 | | * Move at most max_length from src to dst. If the dst is close or beyond high watermark, move no |
26 | | * more than 16K. It's not an error if src buffer doesn't contain enough data. |
27 | | * @param dst supplies the buffer where the data is move to. |
28 | | * @param src supplies the buffer where the data is move from. |
29 | | * @param max_length supplies the max bytes the call can move. |
30 | | * @return number of bytes this call moves. |
31 | | */ |
32 | 0 | uint64_t moveUpTo(Buffer::Instance& dst, Buffer::Instance& src, uint64_t max_length) { |
33 | 0 | ASSERT(src.length() > 0); |
34 | 0 | if (dst.highWatermark() != 0) { |
35 | 0 | if (dst.length() < dst.highWatermark()) { |
36 | | // Move until high watermark so that high watermark is not triggered. |
37 | | // However, if dst buffer is near high watermark, move 16K to avoid the small fragment move. |
38 | 0 | max_length = std::min(max_length, |
39 | 0 | std::max<uint64_t>(FRAGMENT_SIZE, dst.highWatermark() - dst.length())); |
40 | 0 | } else { |
41 | | // Move at most 16K if the dst buffer is over high watermark. |
42 | 0 | max_length = std::min<uint64_t>(max_length, FRAGMENT_SIZE); |
43 | 0 | } |
44 | 0 | } |
45 | 0 | uint64_t res = std::min(max_length, src.length()); |
46 | 0 | dst.move(src, res, /*reset_drain_trackers_and_accounting=*/true); |
47 | 0 | return res; |
48 | 0 | } |
49 | | } // namespace |
50 | | |
51 | 0 | const Network::Address::InstanceConstSharedPtr& IoHandleImpl::getCommonInternalAddress() { |
52 | 0 | CONSTRUCT_ON_FIRST_USE(Network::Address::InstanceConstSharedPtr, |
53 | 0 | std::make_shared<const Network::Address::EnvoyInternalInstance>( |
54 | 0 | "internal_address_for_user_space_io_handle")); |
55 | 0 | } |
56 | | |
57 | | IoHandleImpl::IoHandleImpl(PassthroughStateSharedPtr passthrough_state) |
58 | 0 | : pending_received_data_([&]() -> void { this->onBelowLowWatermark(); }, |
59 | 0 | [&]() -> void { this->onAboveHighWatermark(); }, []() -> void {}), |
60 | 0 | passthrough_state_(passthrough_state) {} |
61 | | |
62 | 0 | IoHandleImpl::~IoHandleImpl() { |
63 | 0 | if (!closed_) { |
64 | 0 | close(); |
65 | 0 | } |
66 | 0 | } |
67 | | |
68 | 0 | Api::IoCallUint64Result IoHandleImpl::close() { |
69 | 0 | ASSERT(!closed_); |
70 | 0 | if (!closed_) { |
71 | 0 | if (peer_handle_) { |
72 | 0 | ENVOY_LOG(trace, "socket {} close before peer {} closes.", static_cast<void*>(this), |
73 | 0 | static_cast<void*>(peer_handle_)); |
74 | | // Notify the peer we won't write more data. shutdown(WRITE). |
75 | 0 | peer_handle_->setWriteEnd(); |
76 | | // Notify the peer that we no longer accept data. shutdown(RD). |
77 | 0 | peer_handle_->onPeerDestroy(); |
78 | 0 | peer_handle_ = nullptr; |
79 | 0 | } else { |
80 | 0 | ENVOY_LOG(trace, "socket {} close after peer closed.", static_cast<void*>(this)); |
81 | 0 | } |
82 | 0 | } |
83 | 0 | if (user_file_event_) { |
84 | | // No event callback should be handled after close completes. |
85 | 0 | user_file_event_.reset(); |
86 | 0 | } |
87 | 0 | closed_ = true; |
88 | 0 | return Api::ioCallUint64ResultNoError(); |
89 | 0 | } |
90 | | |
91 | 0 | bool IoHandleImpl::isOpen() const { return !closed_; } |
92 | | |
93 | | Api::IoCallUint64Result IoHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices, |
94 | 0 | uint64_t num_slice) { |
95 | 0 | if (!isOpen()) { |
96 | 0 | return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)}; |
97 | 0 | } |
98 | 0 | if (pending_received_data_.length() == 0) { |
99 | 0 | if (receive_data_end_stream_) { |
100 | 0 | return {0, Api::IoError::none()}; |
101 | 0 | } else { |
102 | 0 | return {0, Network::IoSocketError::getIoSocketEagainError()}; |
103 | 0 | } |
104 | 0 | } |
105 | | // The read bytes can not exceed the provided buffer size or pending received size. |
106 | 0 | const auto max_bytes_to_read = std::min(pending_received_data_.length(), max_length); |
107 | 0 | uint64_t bytes_offset = 0; |
108 | 0 | for (uint64_t i = 0; i < num_slice && bytes_offset < max_length; i++) { |
109 | 0 | auto bytes_to_read_in_this_slice = |
110 | 0 | std::min(max_bytes_to_read - bytes_offset, uint64_t(slices[i].len_)); |
111 | | // Copy and drain, so pending_received_data_ always copy from offset 0. |
112 | 0 | pending_received_data_.copyOut(0, bytes_to_read_in_this_slice, slices[i].mem_); |
113 | 0 | pending_received_data_.drain(bytes_to_read_in_this_slice); |
114 | 0 | bytes_offset += bytes_to_read_in_this_slice; |
115 | 0 | } |
116 | 0 | const auto bytes_read = bytes_offset; |
117 | 0 | ASSERT(bytes_read <= max_bytes_to_read); |
118 | 0 | ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast<void*>(this), bytes_read); |
119 | 0 | return {bytes_read, Api::IoError::none()}; |
120 | 0 | } |
121 | | |
122 | | Api::IoCallUint64Result IoHandleImpl::read(Buffer::Instance& buffer, |
123 | 0 | absl::optional<uint64_t> max_length_opt) { |
124 | | // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_. |
125 | 0 | uint64_t max_length = max_length_opt.value_or(MAX_FRAGMENT * FRAGMENT_SIZE); |
126 | 0 | if (max_length == 0) { |
127 | 0 | return Api::ioCallUint64ResultNoError(); |
128 | 0 | } |
129 | 0 | if (!isOpen()) { |
130 | 0 | return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)}; |
131 | 0 | } |
132 | 0 | if (pending_received_data_.length() == 0) { |
133 | 0 | if (receive_data_end_stream_) { |
134 | 0 | return {0, Api::IoError::none()}; |
135 | 0 | } else { |
136 | 0 | return {0, Network::IoSocketError::getIoSocketEagainError()}; |
137 | 0 | } |
138 | 0 | } |
139 | 0 | const uint64_t bytes_to_read = moveUpTo(buffer, pending_received_data_, max_length); |
140 | 0 | return {bytes_to_read, Api::IoError::none()}; |
141 | 0 | } |
142 | | |
143 | 0 | Api::IoCallUint64Result IoHandleImpl::writev(const Buffer::RawSlice* slices, uint64_t num_slice) { |
144 | | // Empty input is allowed even though the peer is shutdown. |
145 | 0 | bool is_input_empty = true; |
146 | 0 | for (uint64_t i = 0; i < num_slice; i++) { |
147 | 0 | if (slices[i].mem_ != nullptr && slices[i].len_ != 0) { |
148 | 0 | is_input_empty = false; |
149 | 0 | break; |
150 | 0 | } |
151 | 0 | } |
152 | 0 | if (is_input_empty) { |
153 | 0 | return Api::ioCallUint64ResultNoError(); |
154 | 0 | } |
155 | 0 | if (!isOpen()) { |
156 | 0 | return {0, Network::IoSocketError::getIoSocketEbadfError()}; |
157 | 0 | } |
158 | | // Closed peer. |
159 | 0 | if (!peer_handle_) { |
160 | 0 | return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)}; |
161 | 0 | } |
162 | | // Error: write after close. |
163 | 0 | if (peer_handle_->isPeerShutDownWrite()) { |
164 | | // TODO(lambdai): `EPIPE` or `ENOTCONN`. |
165 | 0 | return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)}; |
166 | 0 | } |
167 | | // The peer is valid but temporarily does not accept new data. Likely due to flow control. |
168 | 0 | if (!peer_handle_->isWritable()) { |
169 | 0 | return {0, Network::IoSocketError::getIoSocketEagainError()}; |
170 | 0 | } |
171 | | |
172 | 0 | auto* const dest_buffer = peer_handle_->getWriteBuffer(); |
173 | | // Write along with iteration. Buffer guarantee the fragment is always append-able. |
174 | 0 | uint64_t bytes_written = 0; |
175 | 0 | for (uint64_t i = 0; i < num_slice && !dest_buffer->highWatermarkTriggered(); i++) { |
176 | 0 | if (slices[i].mem_ != nullptr && slices[i].len_ != 0) { |
177 | 0 | dest_buffer->add(slices[i].mem_, slices[i].len_); |
178 | 0 | bytes_written += slices[i].len_; |
179 | 0 | } |
180 | 0 | } |
181 | 0 | peer_handle_->setNewDataAvailable(); |
182 | 0 | ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), bytes_written); |
183 | 0 | return {bytes_written, Api::IoError::none()}; |
184 | 0 | } |
185 | | |
186 | 0 | Api::IoCallUint64Result IoHandleImpl::write(Buffer::Instance& buffer) { |
187 | | // Empty input is allowed even though the peer is shutdown. |
188 | 0 | if (buffer.length() == 0) { |
189 | 0 | return Api::ioCallUint64ResultNoError(); |
190 | 0 | } |
191 | 0 | if (!isOpen()) { |
192 | 0 | return {0, Network::IoSocketError::getIoSocketEbadfError()}; |
193 | 0 | } |
194 | | // Closed peer. |
195 | 0 | if (!peer_handle_) { |
196 | 0 | return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)}; |
197 | 0 | } |
198 | | // Error: write after close. |
199 | 0 | if (peer_handle_->isPeerShutDownWrite()) { |
200 | | // TODO(lambdai): `EPIPE` or `ENOTCONN`. |
201 | 0 | return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)}; |
202 | 0 | } |
203 | | // The peer is valid but temporarily does not accept new data. Likely due to flow control. |
204 | 0 | if (!peer_handle_->isWritable()) { |
205 | 0 | return {0, Network::IoSocketError::getIoSocketEagainError()}; |
206 | 0 | } |
207 | 0 | const uint64_t max_bytes_to_write = buffer.length(); |
208 | 0 | const uint64_t total_bytes_to_write = |
209 | 0 | moveUpTo(*peer_handle_->getWriteBuffer(), buffer, |
210 | | // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_. |
211 | 0 | MAX_FRAGMENT * FRAGMENT_SIZE); |
212 | 0 | peer_handle_->setNewDataAvailable(); |
213 | 0 | ENVOY_LOG(trace, "socket {} write {} bytes of {}", static_cast<void*>(this), total_bytes_to_write, |
214 | 0 | max_bytes_to_write); |
215 | 0 | return {total_bytes_to_write, Api::IoError::none()}; |
216 | 0 | } |
217 | | |
218 | | Api::IoCallUint64Result IoHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int, |
219 | | const Network::Address::Ip*, |
220 | 0 | const Network::Address::Instance&) { |
221 | 0 | return Network::IoSocketError::ioResultSocketInvalidAddress(); |
222 | 0 | } |
223 | | |
224 | | Api::IoCallUint64Result IoHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, uint32_t, |
225 | 0 | RecvMsgOutput&) { |
226 | 0 | return Network::IoSocketError::ioResultSocketInvalidAddress(); |
227 | 0 | } |
228 | | |
229 | 0 | Api::IoCallUint64Result IoHandleImpl::recvmmsg(RawSliceArrays&, uint32_t, RecvMsgOutput&) { |
230 | 0 | return Network::IoSocketError::ioResultSocketInvalidAddress(); |
231 | 0 | } |
232 | | |
233 | 0 | Api::IoCallUint64Result IoHandleImpl::recv(void* buffer, size_t length, int flags) { |
234 | 0 | if (!isOpen()) { |
235 | 0 | return {0, Network::IoSocketError::getIoSocketEbadfError()}; |
236 | 0 | } |
237 | | // No data and the writer closed. |
238 | 0 | if (pending_received_data_.length() == 0) { |
239 | 0 | if (receive_data_end_stream_) { |
240 | 0 | return {0, Api::IoError::none()}; |
241 | 0 | } else { |
242 | 0 | return {0, Network::IoSocketError::getIoSocketEagainError()}; |
243 | 0 | } |
244 | 0 | } |
245 | | // Specify uint64_t since the latter length may not have the same type. |
246 | 0 | const auto max_bytes_to_read = std::min<uint64_t>(pending_received_data_.length(), length); |
247 | 0 | pending_received_data_.copyOut(0, max_bytes_to_read, buffer); |
248 | 0 | if (!(flags & MSG_PEEK)) { |
249 | 0 | pending_received_data_.drain(max_bytes_to_read); |
250 | 0 | } |
251 | 0 | return {max_bytes_to_read, Api::IoError::none()}; |
252 | 0 | } |
253 | | |
254 | 0 | bool IoHandleImpl::supportsMmsg() const { return false; } |
255 | | |
256 | 0 | bool IoHandleImpl::supportsUdpGro() const { return false; } |
257 | | |
258 | 0 | Api::SysCallIntResult IoHandleImpl::bind(Network::Address::InstanceConstSharedPtr) { |
259 | 0 | return makeInvalidSyscallResult(); |
260 | 0 | } |
261 | | |
262 | 0 | Api::SysCallIntResult IoHandleImpl::listen(int) { return makeInvalidSyscallResult(); } |
263 | | |
264 | 0 | Network::IoHandlePtr IoHandleImpl::accept(struct sockaddr*, socklen_t*) { |
265 | 0 | ENVOY_BUG(false, "unsupported call to accept"); |
266 | 0 | return nullptr; |
267 | 0 | } |
268 | | |
269 | 0 | Api::SysCallIntResult IoHandleImpl::connect(Network::Address::InstanceConstSharedPtr address) { |
270 | 0 | if (peer_handle_ != nullptr) { |
271 | | // Buffered Io handle should always be considered as connected unless the server peer cannot be |
272 | | // found. Use write or read to determine if peer is closed. |
273 | 0 | return {0, 0}; |
274 | 0 | } else { |
275 | 0 | ENVOY_LOG(debug, "user namespace handle {} connect to previously closed peer {}.", |
276 | 0 | static_cast<void*>(this), address->asStringView()); |
277 | 0 | return Api::SysCallIntResult{-1, SOCKET_ERROR_INVAL}; |
278 | 0 | } |
279 | 0 | } |
280 | | |
281 | 0 | Api::SysCallIntResult IoHandleImpl::setOption(int, int, const void*, socklen_t) { |
282 | 0 | return makeInvalidSyscallResult(); |
283 | 0 | } |
284 | | |
285 | | Api::SysCallIntResult IoHandleImpl::getOption(int level, int optname, void* optval, |
286 | 0 | socklen_t* optlen) { |
287 | | // Check result of connect(). It is either connected or closed. |
288 | 0 | if (level == SOL_SOCKET && optname == SO_ERROR) { |
289 | 0 | if (peer_handle_ != nullptr) { |
290 | | // The peer is valid at this comment. Consider it as connected. |
291 | 0 | *optlen = sizeof(int); |
292 | 0 | *static_cast<int*>(optval) = 0; |
293 | 0 | return Api::SysCallIntResult{0, 0}; |
294 | 0 | } else { |
295 | | // The peer is closed. Reset the option value to non-zero. |
296 | 0 | *optlen = sizeof(int); |
297 | 0 | *static_cast<int*>(optval) = SOCKET_ERROR_INVAL; |
298 | 0 | return Api::SysCallIntResult{0, 0}; |
299 | 0 | } |
300 | 0 | } |
301 | 0 | return makeInvalidSyscallResult(); |
302 | 0 | } |
303 | | |
304 | | Api::SysCallIntResult IoHandleImpl::ioctl(unsigned long, void*, unsigned long, void*, unsigned long, |
305 | 0 | unsigned long*) { |
306 | 0 | return makeInvalidSyscallResult(); |
307 | 0 | } |
308 | | |
309 | 0 | Api::SysCallIntResult IoHandleImpl::setBlocking(bool) { return makeInvalidSyscallResult(); } |
310 | | |
311 | 0 | absl::optional<int> IoHandleImpl::domain() { return absl::nullopt; } |
312 | | |
313 | 0 | Network::Address::InstanceConstSharedPtr IoHandleImpl::localAddress() { |
314 | 0 | return IoHandleImpl::getCommonInternalAddress(); |
315 | 0 | } |
316 | | |
317 | 0 | Network::Address::InstanceConstSharedPtr IoHandleImpl::peerAddress() { |
318 | 0 | return IoHandleImpl::getCommonInternalAddress(); |
319 | 0 | } |
320 | | |
321 | | void IoHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, |
322 | 0 | Event::FileTriggerType trigger, uint32_t events) { |
323 | 0 | ASSERT(user_file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same " |
324 | 0 | "file descriptor. This is not allowed."); |
325 | 0 | ASSERT(trigger != Event::FileTriggerType::Level, "Native level trigger is not supported."); |
326 | 0 | user_file_event_ = std::make_unique<FileEventImpl>(dispatcher, cb, events, *this); |
327 | 0 | } |
328 | | |
329 | 0 | Network::IoHandlePtr IoHandleImpl::duplicate() { |
330 | | // duplicate() is supposed to be used on listener io handle while this implementation doesn't |
331 | | // support listen. |
332 | 0 | ENVOY_BUG(false, "unsupported call to duplicate"); |
333 | 0 | return nullptr; |
334 | 0 | } |
335 | | |
336 | 0 | void IoHandleImpl::activateFileEvents(uint32_t events) { |
337 | 0 | if (user_file_event_) { |
338 | 0 | user_file_event_->activate(events); |
339 | 0 | } else { |
340 | 0 | ENVOY_BUG(false, "Null user_file_event_"); |
341 | 0 | } |
342 | 0 | } |
343 | | |
344 | 0 | void IoHandleImpl::enableFileEvents(uint32_t events) { |
345 | 0 | if (user_file_event_) { |
346 | 0 | user_file_event_->setEnabled(events); |
347 | 0 | } else { |
348 | 0 | ENVOY_BUG(false, "Null user_file_event_"); |
349 | 0 | } |
350 | 0 | } |
351 | | |
352 | 0 | void IoHandleImpl::resetFileEvents() { user_file_event_.reset(); } |
353 | | |
354 | 0 | Api::SysCallIntResult IoHandleImpl::shutdown(int how) { |
355 | | // Support only shutdown write. |
356 | 0 | ASSERT(how == ENVOY_SHUT_WR); |
357 | 0 | ASSERT(!closed_); |
358 | 0 | if (!write_shutdown_) { |
359 | 0 | ASSERT(peer_handle_); |
360 | | // Notify the peer we won't write more data. |
361 | 0 | peer_handle_->setWriteEnd(); |
362 | 0 | write_shutdown_ = true; |
363 | 0 | } |
364 | 0 | return {0, 0}; |
365 | 0 | } |
366 | | |
367 | | void PassthroughStateImpl::initialize( |
368 | | std::unique_ptr<envoy::config::core::v3::Metadata> metadata, |
369 | 0 | const StreamInfo::FilterState::Objects& filter_state_objects) { |
370 | 0 | ASSERT(state_ == State::Created); |
371 | 0 | metadata_ = std::move(metadata); |
372 | 0 | filter_state_objects_ = filter_state_objects; |
373 | 0 | state_ = State::Initialized; |
374 | 0 | } |
375 | | void PassthroughStateImpl::mergeInto(envoy::config::core::v3::Metadata& metadata, |
376 | 0 | StreamInfo::FilterState& filter_state) { |
377 | 0 | ASSERT(state_ == State::Created || state_ == State::Initialized); |
378 | 0 | if (metadata_) { |
379 | 0 | metadata.MergeFrom(*metadata_); |
380 | 0 | } |
381 | 0 | for (const auto& object : filter_state_objects_) { |
382 | | // This should not throw as stream info is new and filter objects are uniquely named. |
383 | 0 | filter_state.setData(object.name_, object.data_, object.state_type_, |
384 | 0 | StreamInfo::FilterState::LifeSpan::Connection, object.stream_sharing_); |
385 | 0 | } |
386 | 0 | metadata_ = nullptr; |
387 | 0 | filter_state_objects_.clear(); |
388 | 0 | state_ = State::Done; |
389 | 0 | } |
390 | | } // namespace UserSpace |
391 | | } // namespace IoSocket |
392 | | } // namespace Extensions |
393 | | } // namespace Envoy |