1
#include "source/extensions/io_socket/user_space/io_handle_impl.h"
2

            
3
#include "envoy/buffer/buffer.h"
4
#include "envoy/common/platform.h"
5

            
6
#include "source/common/api/os_sys_calls_impl.h"
7
#include "source/common/common/assert.h"
8
#include "source/common/common/utility.h"
9
#include "source/common/network/address_impl.h"
10
#include "source/extensions/io_socket/user_space/file_event_impl.h"
11

            
12
#include "absl/types/optional.h"
13

            
14
namespace Envoy {
15

            
16
namespace Extensions {
17
namespace IoSocket {
18
namespace UserSpace {
19
namespace {
20
7
Api::SysCallIntResult makeInvalidSyscallResult() {
21
7
  return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP};
22
7
}
23

            
24
/**
25
 * Move at most max_length from src to dst. If the dst is close or beyond high watermark, move no
26
 * more than 16K. It's not an error if src buffer doesn't contain enough data.
27
 * @param dst supplies the buffer where the data is move to.
28
 * @param src supplies the buffer where the data is move from.
29
 * @param max_length supplies the max bytes the call can move.
30
 * @return number of bytes this call moves.
31
 */
32
116
uint64_t moveUpTo(Buffer::Instance& dst, Buffer::Instance& src, uint64_t max_length) {
33
116
  ASSERT(src.length() > 0);
34
116
  if (dst.highWatermark() != 0) {
35
92
    if (dst.length() < dst.highWatermark()) {
36
      // Move until high watermark so that high watermark is not triggered.
37
      // However, if dst buffer is near high watermark, move 16K to avoid the small fragment move.
38
90
      max_length = std::min(max_length,
39
90
                            std::max<uint64_t>(FRAGMENT_SIZE, dst.highWatermark() - dst.length()));
40
90
    } else {
41
      // Move at most 16K if the dst buffer is over high watermark.
42
2
      max_length = std::min<uint64_t>(max_length, FRAGMENT_SIZE);
43
2
    }
44
92
  }
45
116
  uint64_t res = std::min(max_length, src.length());
46
116
  dst.move(src, res, /*reset_drain_trackers_and_accounting=*/true);
47
116
  return res;
48
116
}
49
} // namespace
50

            
51
8
const Network::Address::InstanceConstSharedPtr& IoHandleImpl::getCommonInternalAddress() {
52
8
  CONSTRUCT_ON_FIRST_USE(Network::Address::InstanceConstSharedPtr,
53
8
                         std::make_shared<const Network::Address::EnvoyInternalInstance>(
54
8
                             "internal_address_for_user_space_io_handle"));
55
8
}
56

            
57
IoHandleImpl::IoHandleImpl(PassthroughStateSharedPtr passthrough_state)
58
188
    : pending_received_data_([&]() -> void { this->onBelowLowWatermark(); },
59
188
                             [&]() -> void { this->onAboveHighWatermark(); }, []() -> void {}),
60
188
      passthrough_state_(passthrough_state) {}
61

            
62
188
IoHandleImpl::~IoHandleImpl() {
63
188
  if (!closed_) {
64
104
    close();
65
104
  }
66
188
}
67

            
68
188
Api::IoCallUint64Result IoHandleImpl::close() {
69
188
  ASSERT(!closed_);
70
188
  if (!closed_) {
71
188
    if (peer_handle_) {
72
94
      ENVOY_LOG(trace, "socket {} close before peer {} closes.", static_cast<void*>(this),
73
94
                static_cast<void*>(peer_handle_));
74
      // Notify the peer that it will not receive more data. shutdown(WRITE).
75
94
      peer_handle_->setEof();
76
      // Notify the peer that we no longer accept data. shutdown(RD).
77
94
      peer_handle_->onPeerDestroy();
78
94
      peer_handle_ = nullptr;
79
94
    } else {
80
94
      ENVOY_LOG(trace, "socket {} close after peer closed.", static_cast<void*>(this));
81
94
    }
82
188
  }
83
188
  if (user_file_event_) {
84
    // No event callback should be handled after close completes.
85
54
    user_file_event_.reset();
86
54
  }
87
188
  closed_ = true;
88
188
  return Api::ioCallUint64ResultNoError();
89
188
}
90

            
91
1229
bool IoHandleImpl::isOpen() const { return !closed_; }
92

            
93
bool IoHandleImpl::wasConnected() const { return false; }
94

            
95
Api::IoCallUint64Result IoHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
96
8
                                            uint64_t num_slice) {
97
8
  if (!isOpen()) {
98
1
    return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
99
1
  }
100
7
  if (pending_received_data_.length() == 0) {
101
2
    if (receive_data_end_stream_) {
102
1
      return {0, Api::IoError::none()};
103
1
    } else {
104
1
      return {0, Network::IoSocketError::getIoSocketEagainError()};
105
1
    }
106
2
  }
107
  // The read bytes can not exceed the provided buffer size or pending received size.
108
5
  const auto max_bytes_to_read = std::min(pending_received_data_.length(), max_length);
109
5
  uint64_t bytes_offset = 0;
110
11
  for (uint64_t i = 0; i < num_slice && bytes_offset < max_length; i++) {
111
6
    auto bytes_to_read_in_this_slice =
112
6
        std::min(max_bytes_to_read - bytes_offset, uint64_t(slices[i].len_));
113
    // Copy and drain, so pending_received_data_ always copy from offset 0.
114
6
    pending_received_data_.copyOut(0, bytes_to_read_in_this_slice, slices[i].mem_);
115
6
    pending_received_data_.drain(bytes_to_read_in_this_slice);
116
6
    bytes_offset += bytes_to_read_in_this_slice;
117
6
  }
118
5
  const auto bytes_read = bytes_offset;
119
5
  ASSERT(bytes_read <= max_bytes_to_read);
120
5
  ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast<void*>(this), bytes_read);
121
5
  return {bytes_read, Api::IoError::none()};
122
7
}
123

            
124
Api::IoCallUint64Result IoHandleImpl::read(Buffer::Instance& buffer,
125
131
                                           absl::optional<uint64_t> max_length_opt) {
126
  // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
127
131
  uint64_t max_length = max_length_opt.value_or(MAX_FRAGMENT * FRAGMENT_SIZE);
128
131
  if (max_length == 0) {
129
1
    return Api::ioCallUint64ResultNoError();
130
1
  }
131
130
  if (!isOpen()) {
132
1
    return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
133
1
  }
134
129
  if (pending_received_data_.length() == 0) {
135
78
    if (receive_data_end_stream_) {
136
37
      return {0, Api::IoError::none()};
137
41
    } else {
138
41
      return {0, Network::IoSocketError::getIoSocketEagainError()};
139
41
    }
140
78
  }
141
51
  const uint64_t bytes_to_read = moveUpTo(buffer, pending_received_data_, max_length);
142
51
  return {bytes_to_read, Api::IoError::none()};
143
129
}
144

            
145
11
Api::IoCallUint64Result IoHandleImpl::writev(const Buffer::RawSlice* slices, uint64_t num_slice) {
146
  // Empty input is allowed even though the peer is shutdown.
147
11
  bool is_input_empty = true;
148
12
  for (uint64_t i = 0; i < num_slice; i++) {
149
11
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
150
10
      is_input_empty = false;
151
10
      break;
152
10
    }
153
11
  }
154
11
  if (is_input_empty) {
155
1
    return Api::ioCallUint64ResultNoError();
156
1
  }
157
10
  if (!isOpen()) {
158
1
    return {0, Network::IoSocketError::getIoSocketEbadfError()};
159
1
  }
160
  // Closed peer.
161
9
  if (!peer_handle_) {
162
1
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
163
1
  }
164
  // Error: write after close.
165
8
  if (peer_handle_->hasReceivedEof()) {
166
    // TODO(lambdai): `EPIPE` or `ENOTCONN`.
167
1
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
168
1
  }
169
  // The peer is valid but temporarily does not accept new data. Likely due to flow control.
170
7
  if (!peer_handle_->canReceiveData()) {
171
2
    return {0, Network::IoSocketError::getIoSocketEagainError()};
172
2
  }
173

            
174
5
  auto* const dest_buffer = peer_handle_->getReceiveBuffer();
175
  // Write along with iteration. Buffer guarantee the fragment is always append-able.
176
5
  uint64_t bytes_written = 0;
177
14
  for (uint64_t i = 0; i < num_slice && !dest_buffer->highWatermarkTriggered(); i++) {
178
9
    if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
179
7
      dest_buffer->add(slices[i].mem_, slices[i].len_);
180
7
      bytes_written += slices[i].len_;
181
7
    }
182
9
  }
183
5
  peer_handle_->setNewDataAvailable();
184
5
  ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), bytes_written);
185
5
  return {bytes_written, Api::IoError::none()};
186
7
}
187

            
188
74
Api::IoCallUint64Result IoHandleImpl::write(Buffer::Instance& buffer) {
189
  // Empty input is allowed even though the peer is shutdown.
190
74
  if (buffer.length() == 0) {
191
2
    return Api::ioCallUint64ResultNoError();
192
2
  }
193
72
  if (!isOpen()) {
194
1
    return {0, Network::IoSocketError::getIoSocketEbadfError()};
195
1
  }
196
  // Closed peer.
197
71
  if (!peer_handle_) {
198
1
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
199
1
  }
200
  // Error: write after close.
201
70
  if (peer_handle_->hasReceivedEof()) {
202
    // TODO(lambdai): `EPIPE` or `ENOTCONN`.
203
1
    return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
204
1
  }
205
  // The peer is valid but temporarily does not accept new data. Likely due to flow control.
206
69
  if (!peer_handle_->canReceiveData()) {
207
4
    return {0, Network::IoSocketError::getIoSocketEagainError()};
208
4
  }
209
65
  const uint64_t max_bytes_to_write = buffer.length();
210
65
  const uint64_t total_bytes_to_write =
211
65
      moveUpTo(*peer_handle_->getReceiveBuffer(), buffer,
212
               // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
213
65
               MAX_FRAGMENT * FRAGMENT_SIZE);
214
65
  peer_handle_->setNewDataAvailable();
215
65
  ENVOY_LOG(trace, "socket {} write {} bytes of {}", static_cast<void*>(this), total_bytes_to_write,
216
65
            max_bytes_to_write);
217
65
  return {total_bytes_to_write, Api::IoError::none()};
218
69
}
219

            
220
Api::IoCallUint64Result IoHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int,
221
                                              const Network::Address::Ip*,
222
1
                                              const Network::Address::Instance&) {
223
1
  return Network::IoSocketError::ioResultSocketInvalidAddress();
224
1
}
225

            
226
Api::IoCallUint64Result IoHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, uint32_t,
227
                                              const Network::IoHandle::UdpSaveCmsgConfig&,
228
1
                                              RecvMsgOutput&) {
229
1
  return Network::IoSocketError::ioResultSocketInvalidAddress();
230
1
}
231

            
232
Api::IoCallUint64Result IoHandleImpl::recvmmsg(RawSliceArrays&, uint32_t,
233
                                               const Network::IoHandle::UdpSaveCmsgConfig&,
234
1
                                               RecvMsgOutput&) {
235
1
  return Network::IoSocketError::ioResultSocketInvalidAddress();
236
1
}
237

            
238
31
Api::IoCallUint64Result IoHandleImpl::recv(void* buffer, size_t length, int flags) {
239
31
  if (!isOpen()) {
240
1
    return {0, Network::IoSocketError::getIoSocketEbadfError()};
241
1
  }
242
  // No data and the writer closed.
243
30
  if (pending_received_data_.length() == 0) {
244
8
    if (receive_data_end_stream_) {
245
5
      return {0, Api::IoError::none()};
246
5
    } else {
247
3
      return {0, Network::IoSocketError::getIoSocketEagainError()};
248
3
    }
249
8
  }
250
  // Specify uint64_t since the latter length may not have the same type.
251
22
  const auto max_bytes_to_read = std::min<uint64_t>(pending_received_data_.length(), length);
252
22
  pending_received_data_.copyOut(0, max_bytes_to_read, buffer);
253
22
  if (!(flags & MSG_PEEK)) {
254
19
    pending_received_data_.drain(max_bytes_to_read);
255
19
  }
256
22
  return {max_bytes_to_read, Api::IoError::none()};
257
30
}
258

            
259
1
bool IoHandleImpl::supportsMmsg() const { return false; }
260

            
261
1
bool IoHandleImpl::supportsUdpGro() const { return false; }
262

            
263
1
Api::SysCallIntResult IoHandleImpl::bind(Network::Address::InstanceConstSharedPtr) {
264
1
  return makeInvalidSyscallResult();
265
1
}
266

            
267
1
Api::SysCallIntResult IoHandleImpl::listen(int) { return makeInvalidSyscallResult(); }
268

            
269
1
Network::IoHandlePtr IoHandleImpl::accept(struct sockaddr*, socklen_t*) {
270
1
  ENVOY_BUG(false, "unsupported call to accept");
271
1
  return nullptr;
272
1
}
273

            
274
27
Api::SysCallIntResult IoHandleImpl::connect(Network::Address::InstanceConstSharedPtr address) {
275
27
  if (peer_handle_ != nullptr) {
276
    // Buffered Io handle should always be considered as connected unless the server peer cannot be
277
    // found. Use write or read to determine if peer is closed.
278
23
    return {0, 0};
279
25
  } else {
280
4
    ENVOY_LOG(debug, "user namespace handle {} connect to previously closed peer {}.",
281
4
              static_cast<void*>(this), address->asStringView());
282
4
    return Api::SysCallIntResult{-1, SOCKET_ERROR_INVAL};
283
4
  }
284
27
}
285

            
286
1
Api::SysCallIntResult IoHandleImpl::setOption(int, int, const void*, socklen_t) {
287
1
  return makeInvalidSyscallResult();
288
1
}
289

            
290
Api::SysCallIntResult IoHandleImpl::getOption(int level, int optname, void* optval,
291
26
                                              socklen_t* optlen) {
292
  // Check result of connect(). It is either connected or closed.
293
26
  if (level == SOL_SOCKET && optname == SO_ERROR) {
294
25
    if (peer_handle_ != nullptr) {
295
      // The peer is valid at this comment. Consider it as connected.
296
23
      *optlen = sizeof(int);
297
23
      *static_cast<int*>(optval) = 0;
298
23
      return Api::SysCallIntResult{0, 0};
299
23
    } else {
300
      // The peer is closed. Reset the option value to non-zero.
301
2
      *optlen = sizeof(int);
302
2
      *static_cast<int*>(optval) = SOCKET_ERROR_INVAL;
303
2
      return Api::SysCallIntResult{0, 0};
304
2
    }
305
25
  }
306
1
  return makeInvalidSyscallResult();
307
26
}
308

            
309
Api::SysCallIntResult IoHandleImpl::ioctl(unsigned long, void*, unsigned long, void*, unsigned long,
310
1
                                          unsigned long*) {
311
1
  return makeInvalidSyscallResult();
312
1
}
313

            
314
2
Api::SysCallIntResult IoHandleImpl::setBlocking(bool) { return makeInvalidSyscallResult(); }
315

            
316
1
absl::optional<int> IoHandleImpl::domain() { return absl::nullopt; }
317

            
318
4
absl::StatusOr<Network::Address::InstanceConstSharedPtr> IoHandleImpl::localAddress() {
319
4
  return IoHandleImpl::getCommonInternalAddress();
320
4
}
321

            
322
4
absl::StatusOr<Network::Address::InstanceConstSharedPtr> IoHandleImpl::peerAddress() {
323
4
  return IoHandleImpl::getCommonInternalAddress();
324
4
}
325

            
326
void IoHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
327
57
                                       Event::FileTriggerType trigger, uint32_t events) {
328
57
  ASSERT(user_file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
329
57
                                      "file descriptor. This is not allowed.");
330
57
  ASSERT(trigger != Event::FileTriggerType::Level, "Native level trigger is not supported.");
331
57
  user_file_event_ = std::make_unique<FileEventImpl>(dispatcher, cb, events, *this);
332
57
}
333

            
334
1
Network::IoHandlePtr IoHandleImpl::duplicate() {
335
  // duplicate() is supposed to be used on listener io handle while this implementation doesn't
336
  // support listen.
337
1
  ENVOY_BUG(false, "unsupported call to duplicate");
338
1
  return nullptr;
339
1
}
340

            
341
76
void IoHandleImpl::activateFileEvents(uint32_t events) {
342
76
  if (user_file_event_) {
343
75
    user_file_event_->activate(events);
344
75
  } else {
345
1
    ENVOY_BUG(false, "Null user_file_event_");
346
1
  }
347
76
}
348

            
349
59
void IoHandleImpl::enableFileEvents(uint32_t events) {
350
59
  if (user_file_event_) {
351
58
    user_file_event_->setEnabled(events);
352
58
  } else {
353
1
    ENVOY_BUG(false, "Null user_file_event_");
354
1
  }
355
59
}
356

            
357
8
void IoHandleImpl::resetFileEvents() { user_file_event_.reset(); }
358

            
359
28
Api::SysCallIntResult IoHandleImpl::shutdown(int how) {
360
  // Support only shutdown write.
361
28
  ASSERT(how == ENVOY_SHUT_WR);
362
28
  ASSERT(!closed_);
363
28
  if (!sent_eof_) {
364
26
    ASSERT(peer_handle_);
365
    // Notify the peer that it will not receive more data.
366
26
    peer_handle_->setEof();
367
26
    sent_eof_ = true;
368
26
  }
369
28
  return {0, 0};
370
28
}
371

            
372
void PassthroughStateImpl::initialize(
373
    std::unique_ptr<envoy::config::core::v3::Metadata> metadata,
374
17
    const StreamInfo::FilterState::Objects& filter_state_objects) {
375
17
  ASSERT(state_ == State::Created);
376
17
  metadata_ = std::move(metadata);
377
17
  filter_state_objects_ = filter_state_objects;
378
17
  state_ = State::Initialized;
379
17
}
380
void PassthroughStateImpl::mergeInto(envoy::config::core::v3::Metadata& metadata,
381
19
                                     StreamInfo::FilterState& filter_state) {
382
19
  ASSERT(state_ == State::Created || state_ == State::Initialized);
383
19
  if (metadata_) {
384
17
    metadata.MergeFrom(*metadata_);
385
17
  }
386
25
  for (const auto& object : filter_state_objects_) {
387
    // This should not throw as stream info is new and filter objects are uniquely named.
388
25
    filter_state.setData(object.name_, object.data_, object.state_type_,
389
25
                         StreamInfo::FilterState::LifeSpan::Connection, object.stream_sharing_);
390
25
  }
391
19
  metadata_ = nullptr;
392
19
  filter_state_objects_.clear();
393
19
  state_ = State::Done;
394
19
}
395

            
396
std::pair<IoHandleImplPtr, IoHandleImplPtr>
397
71
IoHandleFactory::createIoHandlePair(PassthroughStatePtr state) {
398
71
  PassthroughStateSharedPtr shared_state;
399
71
  if (state != nullptr) {
400
1
    shared_state = std::move(state);
401
70
  } else {
402
70
    shared_state = std::make_shared<PassthroughStateImpl>();
403
70
  }
404
71
  auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(shared_state),
405
71
                                                       new IoHandleImpl(shared_state)};
406
71
  p.first->setPeerHandle(p.second.get());
407
71
  p.second->setPeerHandle(p.first.get());
408
71
  return p;
409
71
}
410

            
411
std::pair<IoHandleImplPtr, IoHandleImplPtr>
412
23
IoHandleFactory::createBufferLimitedIoHandlePair(uint32_t buffer_size, PassthroughStatePtr state) {
413
23
  PassthroughStateSharedPtr shared_state;
414
23
  if (state != nullptr) {
415
1
    shared_state = std::move(state);
416
23
  } else {
417
22
    shared_state = std::make_shared<PassthroughStateImpl>();
418
22
  }
419
23
  auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(shared_state),
420
23
                                                       new IoHandleImpl(shared_state)};
421
  // This buffer watermark setting emulates the OS socket buffer parameter
422
  // `/proc/sys/net/ipv4/tcp_{r,w}mem`.
423
23
  p.first->setWatermarks(buffer_size);
424
23
  p.second->setWatermarks(buffer_size);
425
23
  p.first->setPeerHandle(p.second.get());
426
23
  p.second->setPeerHandle(p.first.get());
427
23
  return p;
428
23
}
429

            
430
} // namespace UserSpace
431
} // namespace IoSocket
432
} // namespace Extensions
433
} // namespace Envoy