Line data Source code
1 : #include "source/extensions/io_socket/user_space/io_handle_impl.h"
2 :
3 : #include "envoy/buffer/buffer.h"
4 : #include "envoy/common/platform.h"
5 :
6 : #include "source/common/api/os_sys_calls_impl.h"
7 : #include "source/common/common/assert.h"
8 : #include "source/common/common/utility.h"
9 : #include "source/common/network/address_impl.h"
10 : #include "source/extensions/io_socket/user_space/file_event_impl.h"
11 :
12 : #include "absl/types/optional.h"
13 :
14 : namespace Envoy {
15 :
16 : namespace Extensions {
17 : namespace IoSocket {
18 : namespace UserSpace {
19 : namespace {
20 0 : Api::SysCallIntResult makeInvalidSyscallResult() {
21 0 : return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP};
22 0 : }
23 :
24 : /**
25 : * Move at most max_length from src to dst. If the dst is close or beyond high watermark, move no
26 : * more than 16K. It's not an error if src buffer doesn't contain enough data.
27 : * @param dst supplies the buffer where the data is move to.
28 : * @param src supplies the buffer where the data is move from.
29 : * @param max_length supplies the max bytes the call can move.
30 : * @return number of bytes this call moves.
31 : */
32 0 : uint64_t moveUpTo(Buffer::Instance& dst, Buffer::Instance& src, uint64_t max_length) {
33 0 : ASSERT(src.length() > 0);
34 0 : if (dst.highWatermark() != 0) {
35 0 : if (dst.length() < dst.highWatermark()) {
36 : // Move until high watermark so that high watermark is not triggered.
37 : // However, if dst buffer is near high watermark, move 16K to avoid the small fragment move.
38 0 : max_length = std::min(max_length,
39 0 : std::max<uint64_t>(FRAGMENT_SIZE, dst.highWatermark() - dst.length()));
40 0 : } else {
41 : // Move at most 16K if the dst buffer is over high watermark.
42 0 : max_length = std::min<uint64_t>(max_length, FRAGMENT_SIZE);
43 0 : }
44 0 : }
45 0 : uint64_t res = std::min(max_length, src.length());
46 0 : dst.move(src, res, /*reset_drain_trackers_and_accounting=*/true);
47 0 : return res;
48 0 : }
49 : } // namespace
50 :
51 0 : const Network::Address::InstanceConstSharedPtr& IoHandleImpl::getCommonInternalAddress() {
52 0 : CONSTRUCT_ON_FIRST_USE(Network::Address::InstanceConstSharedPtr,
53 0 : std::make_shared<const Network::Address::EnvoyInternalInstance>(
54 0 : "internal_address_for_user_space_io_handle"));
55 0 : }
56 :
57 : IoHandleImpl::IoHandleImpl(PassthroughStateSharedPtr passthrough_state)
58 0 : : pending_received_data_([&]() -> void { this->onBelowLowWatermark(); },
59 0 : [&]() -> void { this->onAboveHighWatermark(); }, []() -> void {}),
60 0 : passthrough_state_(passthrough_state) {}
61 :
62 0 : IoHandleImpl::~IoHandleImpl() {
63 0 : if (!closed_) {
64 0 : close();
65 0 : }
66 0 : }
67 :
68 0 : Api::IoCallUint64Result IoHandleImpl::close() {
69 0 : ASSERT(!closed_);
70 0 : if (!closed_) {
71 0 : if (peer_handle_) {
72 0 : ENVOY_LOG(trace, "socket {} close before peer {} closes.", static_cast<void*>(this),
73 0 : static_cast<void*>(peer_handle_));
74 : // Notify the peer we won't write more data. shutdown(WRITE).
75 0 : peer_handle_->setWriteEnd();
76 : // Notify the peer that we no longer accept data. shutdown(RD).
77 0 : peer_handle_->onPeerDestroy();
78 0 : peer_handle_ = nullptr;
79 0 : } else {
80 0 : ENVOY_LOG(trace, "socket {} close after peer closed.", static_cast<void*>(this));
81 0 : }
82 0 : }
83 0 : if (user_file_event_) {
84 : // No event callback should be handled after close completes.
85 0 : user_file_event_.reset();
86 0 : }
87 0 : closed_ = true;
88 0 : return Api::ioCallUint64ResultNoError();
89 0 : }
90 :
91 0 : bool IoHandleImpl::isOpen() const { return !closed_; }
92 :
93 : Api::IoCallUint64Result IoHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
94 0 : uint64_t num_slice) {
95 0 : if (!isOpen()) {
96 0 : return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
97 0 : }
98 0 : if (pending_received_data_.length() == 0) {
99 0 : if (receive_data_end_stream_) {
100 0 : return {0, Api::IoError::none()};
101 0 : } else {
102 0 : return {0, Network::IoSocketError::getIoSocketEagainError()};
103 0 : }
104 0 : }
105 : // The read bytes can not exceed the provided buffer size or pending received size.
106 0 : const auto max_bytes_to_read = std::min(pending_received_data_.length(), max_length);
107 0 : uint64_t bytes_offset = 0;
108 0 : for (uint64_t i = 0; i < num_slice && bytes_offset < max_length; i++) {
109 0 : auto bytes_to_read_in_this_slice =
110 0 : std::min(max_bytes_to_read - bytes_offset, uint64_t(slices[i].len_));
111 : // Copy and drain, so pending_received_data_ always copy from offset 0.
112 0 : pending_received_data_.copyOut(0, bytes_to_read_in_this_slice, slices[i].mem_);
113 0 : pending_received_data_.drain(bytes_to_read_in_this_slice);
114 0 : bytes_offset += bytes_to_read_in_this_slice;
115 0 : }
116 0 : const auto bytes_read = bytes_offset;
117 0 : ASSERT(bytes_read <= max_bytes_to_read);
118 0 : ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast<void*>(this), bytes_read);
119 0 : return {bytes_read, Api::IoError::none()};
120 0 : }
121 :
122 : Api::IoCallUint64Result IoHandleImpl::read(Buffer::Instance& buffer,
123 0 : absl::optional<uint64_t> max_length_opt) {
124 : // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
125 0 : uint64_t max_length = max_length_opt.value_or(MAX_FRAGMENT * FRAGMENT_SIZE);
126 0 : if (max_length == 0) {
127 0 : return Api::ioCallUint64ResultNoError();
128 0 : }
129 0 : if (!isOpen()) {
130 0 : return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
131 0 : }
132 0 : if (pending_received_data_.length() == 0) {
133 0 : if (receive_data_end_stream_) {
134 0 : return {0, Api::IoError::none()};
135 0 : } else {
136 0 : return {0, Network::IoSocketError::getIoSocketEagainError()};
137 0 : }
138 0 : }
139 0 : const uint64_t bytes_to_read = moveUpTo(buffer, pending_received_data_, max_length);
140 0 : return {bytes_to_read, Api::IoError::none()};
141 0 : }
142 :
143 0 : Api::IoCallUint64Result IoHandleImpl::writev(const Buffer::RawSlice* slices, uint64_t num_slice) {
144 : // Empty input is allowed even though the peer is shutdown.
145 0 : bool is_input_empty = true;
146 0 : for (uint64_t i = 0; i < num_slice; i++) {
147 0 : if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
148 0 : is_input_empty = false;
149 0 : break;
150 0 : }
151 0 : }
152 0 : if (is_input_empty) {
153 0 : return Api::ioCallUint64ResultNoError();
154 0 : }
155 0 : if (!isOpen()) {
156 0 : return {0, Network::IoSocketError::getIoSocketEbadfError()};
157 0 : }
158 : // Closed peer.
159 0 : if (!peer_handle_) {
160 0 : return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
161 0 : }
162 : // Error: write after close.
163 0 : if (peer_handle_->isPeerShutDownWrite()) {
164 : // TODO(lambdai): `EPIPE` or `ENOTCONN`.
165 0 : return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
166 0 : }
167 : // The peer is valid but temporarily does not accept new data. Likely due to flow control.
168 0 : if (!peer_handle_->isWritable()) {
169 0 : return {0, Network::IoSocketError::getIoSocketEagainError()};
170 0 : }
171 :
172 0 : auto* const dest_buffer = peer_handle_->getWriteBuffer();
173 : // Write along with iteration. Buffer guarantee the fragment is always append-able.
174 0 : uint64_t bytes_written = 0;
175 0 : for (uint64_t i = 0; i < num_slice && !dest_buffer->highWatermarkTriggered(); i++) {
176 0 : if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
177 0 : dest_buffer->add(slices[i].mem_, slices[i].len_);
178 0 : bytes_written += slices[i].len_;
179 0 : }
180 0 : }
181 0 : peer_handle_->setNewDataAvailable();
182 0 : ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), bytes_written);
183 0 : return {bytes_written, Api::IoError::none()};
184 0 : }
185 :
186 0 : Api::IoCallUint64Result IoHandleImpl::write(Buffer::Instance& buffer) {
187 : // Empty input is allowed even though the peer is shutdown.
188 0 : if (buffer.length() == 0) {
189 0 : return Api::ioCallUint64ResultNoError();
190 0 : }
191 0 : if (!isOpen()) {
192 0 : return {0, Network::IoSocketError::getIoSocketEbadfError()};
193 0 : }
194 : // Closed peer.
195 0 : if (!peer_handle_) {
196 0 : return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
197 0 : }
198 : // Error: write after close.
199 0 : if (peer_handle_->isPeerShutDownWrite()) {
200 : // TODO(lambdai): `EPIPE` or `ENOTCONN`.
201 0 : return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
202 0 : }
203 : // The peer is valid but temporarily does not accept new data. Likely due to flow control.
204 0 : if (!peer_handle_->isWritable()) {
205 0 : return {0, Network::IoSocketError::getIoSocketEagainError()};
206 0 : }
207 0 : const uint64_t max_bytes_to_write = buffer.length();
208 0 : const uint64_t total_bytes_to_write =
209 0 : moveUpTo(*peer_handle_->getWriteBuffer(), buffer,
210 : // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
211 0 : MAX_FRAGMENT * FRAGMENT_SIZE);
212 0 : peer_handle_->setNewDataAvailable();
213 0 : ENVOY_LOG(trace, "socket {} write {} bytes of {}", static_cast<void*>(this), total_bytes_to_write,
214 0 : max_bytes_to_write);
215 0 : return {total_bytes_to_write, Api::IoError::none()};
216 0 : }
217 :
218 : Api::IoCallUint64Result IoHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int,
219 : const Network::Address::Ip*,
220 0 : const Network::Address::Instance&) {
221 0 : return Network::IoSocketError::ioResultSocketInvalidAddress();
222 0 : }
223 :
224 : Api::IoCallUint64Result IoHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, uint32_t,
225 0 : RecvMsgOutput&) {
226 0 : return Network::IoSocketError::ioResultSocketInvalidAddress();
227 0 : }
228 :
229 0 : Api::IoCallUint64Result IoHandleImpl::recvmmsg(RawSliceArrays&, uint32_t, RecvMsgOutput&) {
230 0 : return Network::IoSocketError::ioResultSocketInvalidAddress();
231 0 : }
232 :
233 0 : Api::IoCallUint64Result IoHandleImpl::recv(void* buffer, size_t length, int flags) {
234 0 : if (!isOpen()) {
235 0 : return {0, Network::IoSocketError::getIoSocketEbadfError()};
236 0 : }
237 : // No data and the writer closed.
238 0 : if (pending_received_data_.length() == 0) {
239 0 : if (receive_data_end_stream_) {
240 0 : return {0, Api::IoError::none()};
241 0 : } else {
242 0 : return {0, Network::IoSocketError::getIoSocketEagainError()};
243 0 : }
244 0 : }
245 : // Specify uint64_t since the latter length may not have the same type.
246 0 : const auto max_bytes_to_read = std::min<uint64_t>(pending_received_data_.length(), length);
247 0 : pending_received_data_.copyOut(0, max_bytes_to_read, buffer);
248 0 : if (!(flags & MSG_PEEK)) {
249 0 : pending_received_data_.drain(max_bytes_to_read);
250 0 : }
251 0 : return {max_bytes_to_read, Api::IoError::none()};
252 0 : }
253 :
254 0 : bool IoHandleImpl::supportsMmsg() const { return false; }
255 :
256 0 : bool IoHandleImpl::supportsUdpGro() const { return false; }
257 :
258 0 : Api::SysCallIntResult IoHandleImpl::bind(Network::Address::InstanceConstSharedPtr) {
259 0 : return makeInvalidSyscallResult();
260 0 : }
261 :
262 0 : Api::SysCallIntResult IoHandleImpl::listen(int) { return makeInvalidSyscallResult(); }
263 :
264 0 : Network::IoHandlePtr IoHandleImpl::accept(struct sockaddr*, socklen_t*) {
265 0 : ENVOY_BUG(false, "unsupported call to accept");
266 0 : return nullptr;
267 0 : }
268 :
269 0 : Api::SysCallIntResult IoHandleImpl::connect(Network::Address::InstanceConstSharedPtr address) {
270 0 : if (peer_handle_ != nullptr) {
271 : // Buffered Io handle should always be considered as connected unless the server peer cannot be
272 : // found. Use write or read to determine if peer is closed.
273 0 : return {0, 0};
274 0 : } else {
275 0 : ENVOY_LOG(debug, "user namespace handle {} connect to previously closed peer {}.",
276 0 : static_cast<void*>(this), address->asStringView());
277 0 : return Api::SysCallIntResult{-1, SOCKET_ERROR_INVAL};
278 0 : }
279 0 : }
280 :
281 0 : Api::SysCallIntResult IoHandleImpl::setOption(int, int, const void*, socklen_t) {
282 0 : return makeInvalidSyscallResult();
283 0 : }
284 :
285 : Api::SysCallIntResult IoHandleImpl::getOption(int level, int optname, void* optval,
286 0 : socklen_t* optlen) {
287 : // Check result of connect(). It is either connected or closed.
288 0 : if (level == SOL_SOCKET && optname == SO_ERROR) {
289 0 : if (peer_handle_ != nullptr) {
290 : // The peer is valid at this comment. Consider it as connected.
291 0 : *optlen = sizeof(int);
292 0 : *static_cast<int*>(optval) = 0;
293 0 : return Api::SysCallIntResult{0, 0};
294 0 : } else {
295 : // The peer is closed. Reset the option value to non-zero.
296 0 : *optlen = sizeof(int);
297 0 : *static_cast<int*>(optval) = SOCKET_ERROR_INVAL;
298 0 : return Api::SysCallIntResult{0, 0};
299 0 : }
300 0 : }
301 0 : return makeInvalidSyscallResult();
302 0 : }
303 :
304 : Api::SysCallIntResult IoHandleImpl::ioctl(unsigned long, void*, unsigned long, void*, unsigned long,
305 0 : unsigned long*) {
306 0 : return makeInvalidSyscallResult();
307 0 : }
308 :
309 0 : Api::SysCallIntResult IoHandleImpl::setBlocking(bool) { return makeInvalidSyscallResult(); }
310 :
311 0 : absl::optional<int> IoHandleImpl::domain() { return absl::nullopt; }
312 :
313 0 : Network::Address::InstanceConstSharedPtr IoHandleImpl::localAddress() {
314 0 : return IoHandleImpl::getCommonInternalAddress();
315 0 : }
316 :
317 0 : Network::Address::InstanceConstSharedPtr IoHandleImpl::peerAddress() {
318 0 : return IoHandleImpl::getCommonInternalAddress();
319 0 : }
320 :
321 : void IoHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
322 0 : Event::FileTriggerType trigger, uint32_t events) {
323 0 : ASSERT(user_file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
324 0 : "file descriptor. This is not allowed.");
325 0 : ASSERT(trigger != Event::FileTriggerType::Level, "Native level trigger is not supported.");
326 0 : user_file_event_ = std::make_unique<FileEventImpl>(dispatcher, cb, events, *this);
327 0 : }
328 :
329 0 : Network::IoHandlePtr IoHandleImpl::duplicate() {
330 : // duplicate() is supposed to be used on listener io handle while this implementation doesn't
331 : // support listen.
332 0 : ENVOY_BUG(false, "unsupported call to duplicate");
333 0 : return nullptr;
334 0 : }
335 :
336 0 : void IoHandleImpl::activateFileEvents(uint32_t events) {
337 0 : if (user_file_event_) {
338 0 : user_file_event_->activate(events);
339 0 : } else {
340 0 : ENVOY_BUG(false, "Null user_file_event_");
341 0 : }
342 0 : }
343 :
344 0 : void IoHandleImpl::enableFileEvents(uint32_t events) {
345 0 : if (user_file_event_) {
346 0 : user_file_event_->setEnabled(events);
347 0 : } else {
348 0 : ENVOY_BUG(false, "Null user_file_event_");
349 0 : }
350 0 : }
351 :
352 0 : void IoHandleImpl::resetFileEvents() { user_file_event_.reset(); }
353 :
354 0 : Api::SysCallIntResult IoHandleImpl::shutdown(int how) {
355 : // Support only shutdown write.
356 0 : ASSERT(how == ENVOY_SHUT_WR);
357 0 : ASSERT(!closed_);
358 0 : if (!write_shutdown_) {
359 0 : ASSERT(peer_handle_);
360 : // Notify the peer we won't write more data.
361 0 : peer_handle_->setWriteEnd();
362 0 : write_shutdown_ = true;
363 0 : }
364 0 : return {0, 0};
365 0 : }
366 :
367 : void PassthroughStateImpl::initialize(
368 : std::unique_ptr<envoy::config::core::v3::Metadata> metadata,
369 0 : const StreamInfo::FilterState::Objects& filter_state_objects) {
370 0 : ASSERT(state_ == State::Created);
371 0 : metadata_ = std::move(metadata);
372 0 : filter_state_objects_ = filter_state_objects;
373 0 : state_ = State::Initialized;
374 0 : }
375 : void PassthroughStateImpl::mergeInto(envoy::config::core::v3::Metadata& metadata,
376 0 : StreamInfo::FilterState& filter_state) {
377 0 : ASSERT(state_ == State::Created || state_ == State::Initialized);
378 0 : if (metadata_) {
379 0 : metadata.MergeFrom(*metadata_);
380 0 : }
381 0 : for (const auto& object : filter_state_objects_) {
382 : // This should not throw as stream info is new and filter objects are uniquely named.
383 0 : filter_state.setData(object.name_, object.data_, object.state_type_,
384 0 : StreamInfo::FilterState::LifeSpan::Connection, object.stream_sharing_);
385 0 : }
386 0 : metadata_ = nullptr;
387 0 : filter_state_objects_.clear();
388 0 : state_ = State::Done;
389 0 : }
390 : } // namespace UserSpace
391 : } // namespace IoSocket
392 : } // namespace Extensions
393 : } // namespace Envoy
|