LCOV - code coverage report
Current view: top level - source/extensions/io_socket/user_space - io_handle_impl.cc (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 281 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 39 0.0 %

          Line data    Source code
       1             : #include "source/extensions/io_socket/user_space/io_handle_impl.h"
       2             : 
       3             : #include "envoy/buffer/buffer.h"
       4             : #include "envoy/common/platform.h"
       5             : 
       6             : #include "source/common/api/os_sys_calls_impl.h"
       7             : #include "source/common/common/assert.h"
       8             : #include "source/common/common/utility.h"
       9             : #include "source/common/network/address_impl.h"
      10             : #include "source/extensions/io_socket/user_space/file_event_impl.h"
      11             : 
      12             : #include "absl/types/optional.h"
      13             : 
      14             : namespace Envoy {
      15             : 
      16             : namespace Extensions {
      17             : namespace IoSocket {
      18             : namespace UserSpace {
      19             : namespace {
      20           0 : Api::SysCallIntResult makeInvalidSyscallResult() {
      21           0 :   return Api::SysCallIntResult{-1, SOCKET_ERROR_NOT_SUP};
      22           0 : }
      23             : 
      24             : /**
      25             :  * Move at most max_length from src to dst. If the dst is close or beyond high watermark, move no
      26             :  * more than 16K. It's not an error if src buffer doesn't contain enough data.
      27             :  * @param dst supplies the buffer where the data is move to.
      28             :  * @param src supplies the buffer where the data is move from.
      29             :  * @param max_length supplies the max bytes the call can move.
      30             :  * @return number of bytes this call moves.
      31             :  */
      32           0 : uint64_t moveUpTo(Buffer::Instance& dst, Buffer::Instance& src, uint64_t max_length) {
      33           0 :   ASSERT(src.length() > 0);
      34           0 :   if (dst.highWatermark() != 0) {
      35           0 :     if (dst.length() < dst.highWatermark()) {
      36             :       // Move until high watermark so that high watermark is not triggered.
      37             :       // However, if dst buffer is near high watermark, move 16K to avoid the small fragment move.
      38           0 :       max_length = std::min(max_length,
      39           0 :                             std::max<uint64_t>(FRAGMENT_SIZE, dst.highWatermark() - dst.length()));
      40           0 :     } else {
      41             :       // Move at most 16K if the dst buffer is over high watermark.
      42           0 :       max_length = std::min<uint64_t>(max_length, FRAGMENT_SIZE);
      43           0 :     }
      44           0 :   }
      45           0 :   uint64_t res = std::min(max_length, src.length());
      46           0 :   dst.move(src, res, /*reset_drain_trackers_and_accounting=*/true);
      47           0 :   return res;
      48           0 : }
      49             : } // namespace
      50             : 
      51           0 : const Network::Address::InstanceConstSharedPtr& IoHandleImpl::getCommonInternalAddress() {
      52           0 :   CONSTRUCT_ON_FIRST_USE(Network::Address::InstanceConstSharedPtr,
      53           0 :                          std::make_shared<const Network::Address::EnvoyInternalInstance>(
      54           0 :                              "internal_address_for_user_space_io_handle"));
      55           0 : }
      56             : 
      57             : IoHandleImpl::IoHandleImpl(PassthroughStateSharedPtr passthrough_state)
      58           0 :     : pending_received_data_([&]() -> void { this->onBelowLowWatermark(); },
      59           0 :                              [&]() -> void { this->onAboveHighWatermark(); }, []() -> void {}),
      60           0 :       passthrough_state_(passthrough_state) {}
      61             : 
      62           0 : IoHandleImpl::~IoHandleImpl() {
      63           0 :   if (!closed_) {
      64           0 :     close();
      65           0 :   }
      66           0 : }
      67             : 
      68           0 : Api::IoCallUint64Result IoHandleImpl::close() {
      69           0 :   ASSERT(!closed_);
      70           0 :   if (!closed_) {
      71           0 :     if (peer_handle_) {
      72           0 :       ENVOY_LOG(trace, "socket {} close before peer {} closes.", static_cast<void*>(this),
      73           0 :                 static_cast<void*>(peer_handle_));
      74             :       // Notify the peer we won't write more data. shutdown(WRITE).
      75           0 :       peer_handle_->setWriteEnd();
      76             :       // Notify the peer that we no longer accept data. shutdown(RD).
      77           0 :       peer_handle_->onPeerDestroy();
      78           0 :       peer_handle_ = nullptr;
      79           0 :     } else {
      80           0 :       ENVOY_LOG(trace, "socket {} close after peer closed.", static_cast<void*>(this));
      81           0 :     }
      82           0 :   }
      83           0 :   if (user_file_event_) {
      84             :     // No event callback should be handled after close completes.
      85           0 :     user_file_event_.reset();
      86           0 :   }
      87           0 :   closed_ = true;
      88           0 :   return Api::ioCallUint64ResultNoError();
      89           0 : }
      90             : 
      91           0 : bool IoHandleImpl::isOpen() const { return !closed_; }
      92             : 
      93             : Api::IoCallUint64Result IoHandleImpl::readv(uint64_t max_length, Buffer::RawSlice* slices,
      94           0 :                                             uint64_t num_slice) {
      95           0 :   if (!isOpen()) {
      96           0 :     return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
      97           0 :   }
      98           0 :   if (pending_received_data_.length() == 0) {
      99           0 :     if (receive_data_end_stream_) {
     100           0 :       return {0, Api::IoError::none()};
     101           0 :     } else {
     102           0 :       return {0, Network::IoSocketError::getIoSocketEagainError()};
     103           0 :     }
     104           0 :   }
     105             :   // The read bytes can not exceed the provided buffer size or pending received size.
     106           0 :   const auto max_bytes_to_read = std::min(pending_received_data_.length(), max_length);
     107           0 :   uint64_t bytes_offset = 0;
     108           0 :   for (uint64_t i = 0; i < num_slice && bytes_offset < max_length; i++) {
     109           0 :     auto bytes_to_read_in_this_slice =
     110           0 :         std::min(max_bytes_to_read - bytes_offset, uint64_t(slices[i].len_));
     111             :     // Copy and drain, so pending_received_data_ always copy from offset 0.
     112           0 :     pending_received_data_.copyOut(0, bytes_to_read_in_this_slice, slices[i].mem_);
     113           0 :     pending_received_data_.drain(bytes_to_read_in_this_slice);
     114           0 :     bytes_offset += bytes_to_read_in_this_slice;
     115           0 :   }
     116           0 :   const auto bytes_read = bytes_offset;
     117           0 :   ASSERT(bytes_read <= max_bytes_to_read);
     118           0 :   ENVOY_LOG(trace, "socket {} readv {} bytes", static_cast<void*>(this), bytes_read);
     119           0 :   return {bytes_read, Api::IoError::none()};
     120           0 : }
     121             : 
     122             : Api::IoCallUint64Result IoHandleImpl::read(Buffer::Instance& buffer,
     123           0 :                                            absl::optional<uint64_t> max_length_opt) {
     124             :   // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
     125           0 :   uint64_t max_length = max_length_opt.value_or(MAX_FRAGMENT * FRAGMENT_SIZE);
     126           0 :   if (max_length == 0) {
     127           0 :     return Api::ioCallUint64ResultNoError();
     128           0 :   }
     129           0 :   if (!isOpen()) {
     130           0 :     return {0, Network::IoSocketError::create(SOCKET_ERROR_BADF)};
     131           0 :   }
     132           0 :   if (pending_received_data_.length() == 0) {
     133           0 :     if (receive_data_end_stream_) {
     134           0 :       return {0, Api::IoError::none()};
     135           0 :     } else {
     136           0 :       return {0, Network::IoSocketError::getIoSocketEagainError()};
     137           0 :     }
     138           0 :   }
     139           0 :   const uint64_t bytes_to_read = moveUpTo(buffer, pending_received_data_, max_length);
     140           0 :   return {bytes_to_read, Api::IoError::none()};
     141           0 : }
     142             : 
     143           0 : Api::IoCallUint64Result IoHandleImpl::writev(const Buffer::RawSlice* slices, uint64_t num_slice) {
     144             :   // Empty input is allowed even though the peer is shutdown.
     145           0 :   bool is_input_empty = true;
     146           0 :   for (uint64_t i = 0; i < num_slice; i++) {
     147           0 :     if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
     148           0 :       is_input_empty = false;
     149           0 :       break;
     150           0 :     }
     151           0 :   }
     152           0 :   if (is_input_empty) {
     153           0 :     return Api::ioCallUint64ResultNoError();
     154           0 :   }
     155           0 :   if (!isOpen()) {
     156           0 :     return {0, Network::IoSocketError::getIoSocketEbadfError()};
     157           0 :   }
     158             :   // Closed peer.
     159           0 :   if (!peer_handle_) {
     160           0 :     return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
     161           0 :   }
     162             :   // Error: write after close.
     163           0 :   if (peer_handle_->isPeerShutDownWrite()) {
     164             :     // TODO(lambdai): `EPIPE` or `ENOTCONN`.
     165           0 :     return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
     166           0 :   }
     167             :   // The peer is valid but temporarily does not accept new data. Likely due to flow control.
     168           0 :   if (!peer_handle_->isWritable()) {
     169           0 :     return {0, Network::IoSocketError::getIoSocketEagainError()};
     170           0 :   }
     171             : 
     172           0 :   auto* const dest_buffer = peer_handle_->getWriteBuffer();
     173             :   // Write along with iteration. Buffer guarantee the fragment is always append-able.
     174           0 :   uint64_t bytes_written = 0;
     175           0 :   for (uint64_t i = 0; i < num_slice && !dest_buffer->highWatermarkTriggered(); i++) {
     176           0 :     if (slices[i].mem_ != nullptr && slices[i].len_ != 0) {
     177           0 :       dest_buffer->add(slices[i].mem_, slices[i].len_);
     178           0 :       bytes_written += slices[i].len_;
     179           0 :     }
     180           0 :   }
     181           0 :   peer_handle_->setNewDataAvailable();
     182           0 :   ENVOY_LOG(trace, "socket {} writev {} bytes", static_cast<void*>(this), bytes_written);
     183           0 :   return {bytes_written, Api::IoError::none()};
     184           0 : }
     185             : 
     186           0 : Api::IoCallUint64Result IoHandleImpl::write(Buffer::Instance& buffer) {
     187             :   // Empty input is allowed even though the peer is shutdown.
     188           0 :   if (buffer.length() == 0) {
     189           0 :     return Api::ioCallUint64ResultNoError();
     190           0 :   }
     191           0 :   if (!isOpen()) {
     192           0 :     return {0, Network::IoSocketError::getIoSocketEbadfError()};
     193           0 :   }
     194             :   // Closed peer.
     195           0 :   if (!peer_handle_) {
     196           0 :     return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
     197           0 :   }
     198             :   // Error: write after close.
     199           0 :   if (peer_handle_->isPeerShutDownWrite()) {
     200             :     // TODO(lambdai): `EPIPE` or `ENOTCONN`.
     201           0 :     return {0, Network::IoSocketError::create(SOCKET_ERROR_INVAL)};
     202           0 :   }
     203             :   // The peer is valid but temporarily does not accept new data. Likely due to flow control.
     204           0 :   if (!peer_handle_->isWritable()) {
     205           0 :     return {0, Network::IoSocketError::getIoSocketEagainError()};
     206           0 :   }
     207           0 :   const uint64_t max_bytes_to_write = buffer.length();
     208           0 :   const uint64_t total_bytes_to_write =
     209           0 :       moveUpTo(*peer_handle_->getWriteBuffer(), buffer,
     210             :                // Below value comes from Buffer::OwnedImpl::default_read_reservation_size_.
     211           0 :                MAX_FRAGMENT * FRAGMENT_SIZE);
     212           0 :   peer_handle_->setNewDataAvailable();
     213           0 :   ENVOY_LOG(trace, "socket {} write {} bytes of {}", static_cast<void*>(this), total_bytes_to_write,
     214           0 :             max_bytes_to_write);
     215           0 :   return {total_bytes_to_write, Api::IoError::none()};
     216           0 : }
     217             : 
     218             : Api::IoCallUint64Result IoHandleImpl::sendmsg(const Buffer::RawSlice*, uint64_t, int,
     219             :                                               const Network::Address::Ip*,
     220           0 :                                               const Network::Address::Instance&) {
     221           0 :   return Network::IoSocketError::ioResultSocketInvalidAddress();
     222           0 : }
     223             : 
     224             : Api::IoCallUint64Result IoHandleImpl::recvmsg(Buffer::RawSlice*, const uint64_t, uint32_t,
     225           0 :                                               RecvMsgOutput&) {
     226           0 :   return Network::IoSocketError::ioResultSocketInvalidAddress();
     227           0 : }
     228             : 
     229           0 : Api::IoCallUint64Result IoHandleImpl::recvmmsg(RawSliceArrays&, uint32_t, RecvMsgOutput&) {
     230           0 :   return Network::IoSocketError::ioResultSocketInvalidAddress();
     231           0 : }
     232             : 
     233           0 : Api::IoCallUint64Result IoHandleImpl::recv(void* buffer, size_t length, int flags) {
     234           0 :   if (!isOpen()) {
     235           0 :     return {0, Network::IoSocketError::getIoSocketEbadfError()};
     236           0 :   }
     237             :   // No data and the writer closed.
     238           0 :   if (pending_received_data_.length() == 0) {
     239           0 :     if (receive_data_end_stream_) {
     240           0 :       return {0, Api::IoError::none()};
     241           0 :     } else {
     242           0 :       return {0, Network::IoSocketError::getIoSocketEagainError()};
     243           0 :     }
     244           0 :   }
     245             :   // Specify uint64_t since the latter length may not have the same type.
     246           0 :   const auto max_bytes_to_read = std::min<uint64_t>(pending_received_data_.length(), length);
     247           0 :   pending_received_data_.copyOut(0, max_bytes_to_read, buffer);
     248           0 :   if (!(flags & MSG_PEEK)) {
     249           0 :     pending_received_data_.drain(max_bytes_to_read);
     250           0 :   }
     251           0 :   return {max_bytes_to_read, Api::IoError::none()};
     252           0 : }
     253             : 
     254           0 : bool IoHandleImpl::supportsMmsg() const { return false; }
     255             : 
     256           0 : bool IoHandleImpl::supportsUdpGro() const { return false; }
     257             : 
     258           0 : Api::SysCallIntResult IoHandleImpl::bind(Network::Address::InstanceConstSharedPtr) {
     259           0 :   return makeInvalidSyscallResult();
     260           0 : }
     261             : 
     262           0 : Api::SysCallIntResult IoHandleImpl::listen(int) { return makeInvalidSyscallResult(); }
     263             : 
     264           0 : Network::IoHandlePtr IoHandleImpl::accept(struct sockaddr*, socklen_t*) {
     265           0 :   ENVOY_BUG(false, "unsupported call to accept");
     266           0 :   return nullptr;
     267           0 : }
     268             : 
     269           0 : Api::SysCallIntResult IoHandleImpl::connect(Network::Address::InstanceConstSharedPtr address) {
     270           0 :   if (peer_handle_ != nullptr) {
     271             :     // Buffered Io handle should always be considered as connected unless the server peer cannot be
     272             :     // found. Use write or read to determine if peer is closed.
     273           0 :     return {0, 0};
     274           0 :   } else {
     275           0 :     ENVOY_LOG(debug, "user namespace handle {} connect to previously closed peer {}.",
     276           0 :               static_cast<void*>(this), address->asStringView());
     277           0 :     return Api::SysCallIntResult{-1, SOCKET_ERROR_INVAL};
     278           0 :   }
     279           0 : }
     280             : 
     281           0 : Api::SysCallIntResult IoHandleImpl::setOption(int, int, const void*, socklen_t) {
     282           0 :   return makeInvalidSyscallResult();
     283           0 : }
     284             : 
     285             : Api::SysCallIntResult IoHandleImpl::getOption(int level, int optname, void* optval,
     286           0 :                                               socklen_t* optlen) {
     287             :   // Check result of connect(). It is either connected or closed.
     288           0 :   if (level == SOL_SOCKET && optname == SO_ERROR) {
     289           0 :     if (peer_handle_ != nullptr) {
     290             :       // The peer is valid at this comment. Consider it as connected.
     291           0 :       *optlen = sizeof(int);
     292           0 :       *static_cast<int*>(optval) = 0;
     293           0 :       return Api::SysCallIntResult{0, 0};
     294           0 :     } else {
     295             :       // The peer is closed. Reset the option value to non-zero.
     296           0 :       *optlen = sizeof(int);
     297           0 :       *static_cast<int*>(optval) = SOCKET_ERROR_INVAL;
     298           0 :       return Api::SysCallIntResult{0, 0};
     299           0 :     }
     300           0 :   }
     301           0 :   return makeInvalidSyscallResult();
     302           0 : }
     303             : 
     304             : Api::SysCallIntResult IoHandleImpl::ioctl(unsigned long, void*, unsigned long, void*, unsigned long,
     305           0 :                                           unsigned long*) {
     306           0 :   return makeInvalidSyscallResult();
     307           0 : }
     308             : 
     309           0 : Api::SysCallIntResult IoHandleImpl::setBlocking(bool) { return makeInvalidSyscallResult(); }
     310             : 
     311           0 : absl::optional<int> IoHandleImpl::domain() { return absl::nullopt; }
     312             : 
     313           0 : Network::Address::InstanceConstSharedPtr IoHandleImpl::localAddress() {
     314           0 :   return IoHandleImpl::getCommonInternalAddress();
     315           0 : }
     316             : 
     317           0 : Network::Address::InstanceConstSharedPtr IoHandleImpl::peerAddress() {
     318           0 :   return IoHandleImpl::getCommonInternalAddress();
     319           0 : }
     320             : 
     321             : void IoHandleImpl::initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
     322           0 :                                        Event::FileTriggerType trigger, uint32_t events) {
     323           0 :   ASSERT(user_file_event_ == nullptr, "Attempting to initialize two `file_event_` for the same "
     324           0 :                                       "file descriptor. This is not allowed.");
     325           0 :   ASSERT(trigger != Event::FileTriggerType::Level, "Native level trigger is not supported.");
     326           0 :   user_file_event_ = std::make_unique<FileEventImpl>(dispatcher, cb, events, *this);
     327           0 : }
     328             : 
     329           0 : Network::IoHandlePtr IoHandleImpl::duplicate() {
     330             :   // duplicate() is supposed to be used on listener io handle while this implementation doesn't
     331             :   // support listen.
     332           0 :   ENVOY_BUG(false, "unsupported call to duplicate");
     333           0 :   return nullptr;
     334           0 : }
     335             : 
     336           0 : void IoHandleImpl::activateFileEvents(uint32_t events) {
     337           0 :   if (user_file_event_) {
     338           0 :     user_file_event_->activate(events);
     339           0 :   } else {
     340           0 :     ENVOY_BUG(false, "Null user_file_event_");
     341           0 :   }
     342           0 : }
     343             : 
     344           0 : void IoHandleImpl::enableFileEvents(uint32_t events) {
     345           0 :   if (user_file_event_) {
     346           0 :     user_file_event_->setEnabled(events);
     347           0 :   } else {
     348           0 :     ENVOY_BUG(false, "Null user_file_event_");
     349           0 :   }
     350           0 : }
     351             : 
     352           0 : void IoHandleImpl::resetFileEvents() { user_file_event_.reset(); }
     353             : 
     354           0 : Api::SysCallIntResult IoHandleImpl::shutdown(int how) {
     355             :   // Support only shutdown write.
     356           0 :   ASSERT(how == ENVOY_SHUT_WR);
     357           0 :   ASSERT(!closed_);
     358           0 :   if (!write_shutdown_) {
     359           0 :     ASSERT(peer_handle_);
     360             :     // Notify the peer we won't write more data.
     361           0 :     peer_handle_->setWriteEnd();
     362           0 :     write_shutdown_ = true;
     363           0 :   }
     364           0 :   return {0, 0};
     365           0 : }
     366             : 
     367             : void PassthroughStateImpl::initialize(
     368             :     std::unique_ptr<envoy::config::core::v3::Metadata> metadata,
     369           0 :     const StreamInfo::FilterState::Objects& filter_state_objects) {
     370           0 :   ASSERT(state_ == State::Created);
     371           0 :   metadata_ = std::move(metadata);
     372           0 :   filter_state_objects_ = filter_state_objects;
     373           0 :   state_ = State::Initialized;
     374           0 : }
     375             : void PassthroughStateImpl::mergeInto(envoy::config::core::v3::Metadata& metadata,
     376           0 :                                      StreamInfo::FilterState& filter_state) {
     377           0 :   ASSERT(state_ == State::Created || state_ == State::Initialized);
     378           0 :   if (metadata_) {
     379           0 :     metadata.MergeFrom(*metadata_);
     380           0 :   }
     381           0 :   for (const auto& object : filter_state_objects_) {
     382             :     // This should not throw as stream info is new and filter objects are uniquely named.
     383           0 :     filter_state.setData(object.name_, object.data_, object.state_type_,
     384           0 :                          StreamInfo::FilterState::LifeSpan::Connection, object.stream_sharing_);
     385           0 :   }
     386           0 :   metadata_ = nullptr;
     387           0 :   filter_state_objects_.clear();
     388           0 :   state_ = State::Done;
     389           0 : }
     390             : } // namespace UserSpace
     391             : } // namespace IoSocket
     392             : } // namespace Extensions
     393             : } // namespace Envoy

Generated by: LCOV version 1.15