Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/io_socket/user_space/io_handle_impl.cc
Line
Count
Source (jump to first uncovered line)
1
#include "source/extensions/io_socket/user_space/io_handle_impl.h"
2
3
#include "envoy/buffer/buffer.h"
4
#include "envoy/common/platform.h"
5
6
#include "source/common/api/os_sys_calls_impl.h"
7
#include "source/common/common/assert.h"
8
#include "source/common/common/utility.h"
9
#include "source/common/network/address_impl.h"
10
#include "source/extensions/io_socket/user_space/file_event_impl.h"
11
12
#include "absl/types/optional.h"
13
14
namespace Envoy {
15
16
namespace Extensions {
17
namespace IoSocket {
18
namespace UserSpace {
19
namespace {
20
0
Api::SysCallIntResult makeInvalidSyscallResult() {
21
0
  return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP};
22
0
}
23
24
/**
25
 * Move at most max_length from src to dst. If the dst is close or beyond high watermark, move no
26
 * more than 16K. It's not an error if src buffer doesn't contain enough data.
27
 * @param dst supplies the buffer where the data is move to.
28
 * @param src supplies the buffer where the data is move from.
29
 * @param max_length supplies the max bytes the call can move.
30
 * @return number of bytes this call moves.
31
 */
32
0
uint64_t moveUpTo(Buffer::Instance& dst, Buffer::Instance& src, uint64_t max_length) {
33
0
  ASSERT(src.length() > 0);
34
0
  if (dst.highWatermark() != 0) {
35
0
    if (dst.length() < dst.highWatermark()) {
36
      // Move until high watermark so that high watermark is not triggered.
37
      // However, if dst buffer is near high watermark, move 16K to avoid the small fragment move.
38
0
      max_length = std::min(max_length,
39
0
                            std::max<uint64_t>(FRAGMENT_SIZE, dst.highWatermark() - dst.length()));
40
0
    } else {
41
      // Move at most 16K if the dst buffer is over high watermark.
42
0
      max_length = std::min<uint64_t>(max_length, FRAGMENT_SIZE);
43
0
    }
44
0
  }
45
0
  uint64_t res = std::min(max_length, src.length());
46
0
  dst.move(src, res, /*reset_drain_trackers_and_accounting=*/true);
47
0
  return res;
48
0
}
49
} // namespace
50
51
0
const Network::Address::InstanceConstSharedPtr& IoHandleImpl::getCommonInternalAddress() {
52
0
  CONSTRUCT_ON_FIRST_USE(Network::Address::InstanceConstSharedPtr,
53
0
                         std::make_shared<const Network::Address::EnvoyInternalInstance>(
54
0
                             "internal_address_for_user_space_io_handle"));
55
0
}
56
57
IoHandleImpl::IoHandleImpl(PassthroughStateSharedPtr passthrough_state)
58
0
    : pending_received_data_([&]() -> void { this->onBelowLowWatermark(); },
59
0
                             [&]() -> void { this->onAboveHighWatermark(); }, []() -> void {}),
60
0
      passthrough_state_(passthrough_state) {}
61
62
0
IoHandleImpl::~IoHandleImpl() {
63
0
  if (!closed_) {
64
0
    close();
65
0
  }
66
0
}
67
68
0
Api::IoCallUint64Result IoHandleImpl::close() {
69
0
  ASSERT(!closed_);
70
0
  if (!closed_) {
71
0
    if (peer_handle_) {
72
0
      ENVOY_LOG(trace, "socket {} close before peer {} closes.", static_cast<void*>(this),
73
0
                static_cast<void*>(peer_handle_));
74
      // Notify the peer we won't write more data. shutdown(WRITE).
75
0
      peer_handle_->setWriteEnd();
76
      // Notify the peer that we no longer accept data. shutdown(RD).
77
0
      peer_handle_->onPeerDestroy();
78
0
      peer_handle_ = nullptr;
79
0
    } else {
80
0
      ENVOY_LOG(trace, "socket {} close after peer closed.", static_cast<void*>(this));
81
0
    }
82
0
  }
83
0
  if (user_file_event_) {
84
    // No event callback should be handled after close completes.
85
0
    user_file_event_.reset();
86
0
  }
87
0
  closed_ = true;
88
0
  return Api::ioCallUint64ResultNoError();
89
0
}
90
91
0
bool IoHandleImpl::isOpen() const { return !closed_; }
92
93
Api::IoCallUint64Result IoHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
94
0
                                            uint64_t num_slice) {
95
0
  if (!isOpen()) {
96
0
    return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
97
0
  }
98
0
  if (pending_received_data_.length() == 0) {
99
0
    if (receive_data_end_stream_) {
100
0
      return {0, Api::IoError::none()};
101
0
    } else {
102
0
      return {0, Network::IoSocketError::getIoSocketEagainError()};
103
0
    }
104
0
  }
105
  // The read bytes can not exceed the provided buffer size or pending received size.
106
0
  const auto max_bytes_to_read = std::min(pending_received_data_.length(), max_length);
107
0
  uint64_t bytes_offset = 0;
108
0
  for (uint64_t i = 0; i < num_slice && bytes_offset < max_length; i++) {
109
0
    auto bytes_to_read_in_this_slice =
110
0
        std::min(max_bytes_to_read - bytes_offset, uint64_t(slices[i].len_));
111
    // Copy and drain, so pending_received_data_ always copy from offset 0.
112
0
    pending_received_data_.copyOut(0, bytes_to_read_in_this_slice, slices[i].mem_);
113
0
    pending_received_data_.drain(bytes_to_read_in_this_slice);
114
0
    bytes_offset += bytes_to_read_in_this_slice;
115
0
  }
116
0
  const auto bytes_read = bytes_offset;
117
0
  ASSERT(bytes_read <= max_bytes_to_read);
118
0
  ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast<void*>(this), bytes_read);
119
0
  return {bytes_read, Api::IoError::none()};
120
0
}
121
122
Api::IoCallUint64Result IoHandleImpl::read(Buffer::Instance& buffer,
123
0
                                           absl::optional<uint64_t> max_length_opt) {
124
  // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
125
0
  uint64_t max_length = max_length_opt.value_or(MAX_FRAGMENT * FRAGMENT_SIZE);
126
0
  if (max_length == 0) {
127
0
    return Api::ioCallUint64ResultNoError();
128
0
  }
129
0
  if (!isOpen()) {
130
0
    return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
131
0
  }
132
0
  if (pending_received_data_.length() == 0) {
133
0
    if (receive_data_end_stream_) {
134
0
      return {0, Api::IoError::none()};
135
0
    } else {
136
0
      return {0, Network::IoSocketError::getIoSocketEagainError()};
137
0
    }
138
0
  }
139
0
  const uint64_t bytes_to_read = moveUpTo(buffer, pending_received_data_, max_length);
140
0
  return {bytes_to_read, Api::IoError::none()};
141
0
}
142
143
0
Api::IoCallUint64Result IoHandleImpl::writev(const Buffer::RawSlice* slices, uint64_t num_slice) {
144
  // Empty input is allowed even though the peer is shutdown.
145
0
  bool is_input_empty = true;
146
0
  for (uint64_t i = 0; i < num_slice; i++) {
147
0
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
148
0
      is_input_empty = false;
149
0
      break;
150
0
    }
151
0
  }
152
0
  if (is_input_empty) {
153
0
    return Api::ioCallUint64ResultNoError();
154
0
  }
155
0
  if (!isOpen()) {
156
0
    return {0, Network::IoSocketError::getIoSocketEbadfError()};
157
0
  }
158
  // Closed peer.
159
0
  if (!peer_handle_) {
160
0
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
161
0
  }
162
  // Error: write after close.
163
0
  if (peer_handle_->isPeerShutDownWrite()) {
164
    // TODO(lambdai): `EPIPE` or `ENOTCONN`.
165
0
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
166
0
  }
167
  // The peer is valid but temporarily does not accept new data. Likely due to flow control.
168
0
  if (!peer_handle_->isWritable()) {
169
0
    return {0, Network::IoSocketError::getIoSocketEagainError()};
170
0
  }
171
172
0
  auto* const dest_buffer = peer_handle_->getWriteBuffer();
173
  // Write along with iteration. Buffer guarantee the fragment is always append-able.
174
0
  uint64_t bytes_written = 0;
175
0
  for (uint64_t i = 0; i < num_slice && !dest_buffer->highWatermarkTriggered(); i++) {
176
0
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
177
0
      dest_buffer->add(slices[i].mem_, slices[i].len_);
178
0
      bytes_written += slices[i].len_;
179
0
    }
180
0
  }
181
0
  peer_handle_->setNewDataAvailable();
182
0
  ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), bytes_written);
183
0
  return {bytes_written, Api::IoError::none()};
184
0
}
185
186
0
Api::IoCallUint64Result IoHandleImpl::write(Buffer::Instance& buffer) {
187
  // Empty input is allowed even though the peer is shutdown.
188
0
  if (buffer.length() == 0) {
189
0
    return Api::ioCallUint64ResultNoError();
190
0
  }
191
0
  if (!isOpen()) {
192
0
    return {0, Network::IoSocketError::getIoSocketEbadfError()};
193
0
  }
194
  // Closed peer.
195
0
  if (!peer_handle_) {
196
0
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
197
0
  }
198
  // Error: write after close.
199
0
  if (peer_handle_->isPeerShutDownWrite()) {
200
    // TODO(lambdai): `EPIPE` or `ENOTCONN`.
201
0
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
202
0
  }
203
  // The peer is valid but temporarily does not accept new data. Likely due to flow control.
204
0
  if (!peer_handle_->isWritable()) {
205
0
    return {0, Network::IoSocketError::getIoSocketEagainError()};
206
0
  }
207
0
  const uint64_t max_bytes_to_write = buffer.length();
208
0
  const uint64_t total_bytes_to_write =
209
0
      moveUpTo(*peer_handle_->getWriteBuffer(), buffer,
210
               // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
211
0
               MAX_FRAGMENT * FRAGMENT_SIZE);
212
0
  peer_handle_->setNewDataAvailable();
213
0
  ENVOY_LOG(trace, "socket {} write {} bytes of {}", static_cast<void*>(this), total_bytes_to_write,
214
0
            max_bytes_to_write);
215
0
  return {total_bytes_to_write, Api::IoError::none()};
216
0
}
217
218
Api::IoCallUint64Result IoHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int,
219
                                              const Network::Address::Ip*,
220
0
                                              const Network::Address::Instance&) {
221
0
  return Network::IoSocketError::ioResultSocketInvalidAddress();
222
0
}
223
224
Api::IoCallUint64Result IoHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, uint32_t,
225
0
                                              RecvMsgOutput&) {
226
0
  return Network::IoSocketError::ioResultSocketInvalidAddress();
227
0
}
228
229
0
Api::IoCallUint64Result IoHandleImpl::recvmmsg(RawSliceArrays&, uint32_t, RecvMsgOutput&) {
230
0
  return Network::IoSocketError::ioResultSocketInvalidAddress();
231
0
}
232
233
0
Api::IoCallUint64Result IoHandleImpl::recv(void* buffer, size_t length, int flags) {
234
0
  if (!isOpen()) {
235
0
    return {0, Network::IoSocketError::getIoSocketEbadfError()};
236
0
  }
237
  // No data and the writer closed.
238
0
  if (pending_received_data_.length() == 0) {
239
0
    if (receive_data_end_stream_) {
240
0
      return {0, Api::IoError::none()};
241
0
    } else {
242
0
      return {0, Network::IoSocketError::getIoSocketEagainError()};
243
0
    }
244
0
  }
245
  // Specify uint64_t since the latter length may not have the same type.
246
0
  const auto max_bytes_to_read = std::min<uint64_t>(pending_received_data_.length(), length);
247
0
  pending_received_data_.copyOut(0, max_bytes_to_read, buffer);
248
0
  if (!(flags & MSG_PEEK)) {
249
0
    pending_received_data_.drain(max_bytes_to_read);
250
0
  }
251
0
  return {max_bytes_to_read, Api::IoError::none()};
252
0
}
253
254
0
bool IoHandleImpl::supportsMmsg() const { return false; }
255
256
0
bool IoHandleImpl::supportsUdpGro() const { return false; }
257
258
0
Api::SysCallIntResult IoHandleImpl::bind(Network::Address::InstanceConstSharedPtr) {
259
0
  return makeInvalidSyscallResult();
260
0
}
261
262
0
Api::SysCallIntResult IoHandleImpl::listen(int) { return makeInvalidSyscallResult(); }
263
264
0
Network::IoHandlePtr IoHandleImpl::accept(struct sockaddr*, socklen_t*) {
265
0
  ENVOY_BUG(false, "unsupported call to accept");
266
0
  return nullptr;
267
0
}
268
269
0
Api::SysCallIntResult IoHandleImpl::connect(Network::Address::InstanceConstSharedPtr address) {
270
0
  if (peer_handle_ != nullptr) {
271
    // Buffered Io handle should always be considered as connected unless the server peer cannot be
272
    // found. Use write or read to determine if peer is closed.
273
0
    return {0, 0};
274
0
  } else {
275
0
    ENVOY_LOG(debug, "user namespace handle {} connect to previously closed peer {}.",
276
0
              static_cast<void*>(this), address->asStringView());
277
0
    return Api::SysCallIntResult{-1, SOCKET_ERROR_INVAL};
278
0
  }
279
0
}
280
281
0
Api::SysCallIntResult IoHandleImpl::setOption(int, int, const void*, socklen_t) {
282
0
  return makeInvalidSyscallResult();
283
0
}
284
285
Api::SysCallIntResult IoHandleImpl::getOption(int level, int optname, void* optval,
286
0
                                              socklen_t* optlen) {
287
  // Check result of connect(). It is either connected or closed.
288
0
  if (level == SOL_SOCKET && optname == SO_ERROR) {
289
0
    if (peer_handle_ != nullptr) {
290
      // The peer is valid at this comment. Consider it as connected.
291
0
      *optlen = sizeof(int);
292
0
      *static_cast<int*>(optval) = 0;
293
0
      return Api::SysCallIntResult{0, 0};
294
0
    } else {
295
      // The peer is closed. Reset the option value to non-zero.
296
0
      *optlen = sizeof(int);
297
0
      *static_cast<int*>(optval) = SOCKET_ERROR_INVAL;
298
0
      return Api::SysCallIntResult{0, 0};
299
0
    }
300
0
  }
301
0
  return makeInvalidSyscallResult();
302
0
}
303
304
Api::SysCallIntResult IoHandleImpl::ioctl(unsigned long, void*, unsigned long, void*, unsigned long,
305
0
                                          unsigned long*) {
306
0
  return makeInvalidSyscallResult();
307
0
}
308
309
0
Api::SysCallIntResult IoHandleImpl::setBlocking(bool) { return makeInvalidSyscallResult(); }
310
311
0
absl::optional<int> IoHandleImpl::domain() { return absl::nullopt; }
312
313
0
Network::Address::InstanceConstSharedPtr IoHandleImpl::localAddress() {
314
0
  return IoHandleImpl::getCommonInternalAddress();
315
0
}
316
317
0
Network::Address::InstanceConstSharedPtr IoHandleImpl::peerAddress() {
318
0
  return IoHandleImpl::getCommonInternalAddress();
319
0
}
320
321
void IoHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
322
0
                                       Event::FileTriggerType trigger, uint32_t events) {
323
0
  ASSERT(user_file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
324
0
                                      "file descriptor. This is not allowed.");
325
0
  ASSERT(trigger != Event::FileTriggerType::Level, "Native level trigger is not supported.");
326
0
  user_file_event_ = std::make_unique<FileEventImpl>(dispatcher, cb, events, *this);
327
0
}
328
329
0
Network::IoHandlePtr IoHandleImpl::duplicate() {
330
  // duplicate() is supposed to be used on listener io handle while this implementation doesn't
331
  // support listen.
332
0
  ENVOY_BUG(false, "unsupported call to duplicate");
333
0
  return nullptr;
334
0
}
335
336
0
void IoHandleImpl::activateFileEvents(uint32_t events) {
337
0
  if (user_file_event_) {
338
0
    user_file_event_->activate(events);
339
0
  } else {
340
0
    ENVOY_BUG(false, "Null user_file_event_");
341
0
  }
342
0
}
343
344
0
void IoHandleImpl::enableFileEvents(uint32_t events) {
345
0
  if (user_file_event_) {
346
0
    user_file_event_->setEnabled(events);
347
0
  } else {
348
0
    ENVOY_BUG(false, "Null user_file_event_");
349
0
  }
350
0
}
351
352
0
void IoHandleImpl::resetFileEvents() { user_file_event_.reset(); }
353
354
0
Api::SysCallIntResult IoHandleImpl::shutdown(int how) {
355
  // Support only shutdown write.
356
0
  ASSERT(how == ENVOY_SHUT_WR);
357
0
  ASSERT(!closed_);
358
0
  if (!write_shutdown_) {
359
0
    ASSERT(peer_handle_);
360
    // Notify the peer we won't write more data.
361
0
    peer_handle_->setWriteEnd();
362
0
    write_shutdown_ = true;
363
0
  }
364
0
  return {0, 0};
365
0
}
366
367
void PassthroughStateImpl::initialize(
368
    std::unique_ptr<envoy::config::core::v3::Metadata> metadata,
369
0
    const StreamInfo::FilterState::Objects& filter_state_objects) {
370
0
  ASSERT(state_ == State::Created);
371
0
  metadata_ = std::move(metadata);
372
0
  filter_state_objects_ = filter_state_objects;
373
0
  state_ = State::Initialized;
374
0
}
375
void PassthroughStateImpl::mergeInto(envoy::config::core::v3::Metadata& metadata,
376
0
                                     StreamInfo::FilterState& filter_state) {
377
0
  ASSERT(state_ == State::Created || state_ == State::Initialized);
378
0
  if (metadata_) {
379
0
    metadata.MergeFrom(*metadata_);
380
0
  }
381
0
  for (const auto& object : filter_state_objects_) {
382
    // This should not throw as stream info is new and filter objects are uniquely named.
383
0
    filter_state.setData(object.name_, object.data_, object.state_type_,
384
0
                         StreamInfo::FilterState::LifeSpan::Connection, object.stream_sharing_);
385
0
  }
386
0
  metadata_ = nullptr;
387
0
  filter_state_objects_.clear();
388
0
  state_ = State::Done;
389
0
}
390
} // namespace UserSpace
391
} // namespace IoSocket
392
} // namespace Extensions
393
} // namespace Envoy