LCOV - code coverage report
Current view: top level - source/extensions/io_socket/user_space - io_handle_impl.h (source / functions) Hit Total Coverage
Test: coverage.dat Lines: 0 75 0.0 %
Date: 2024-01-05 06:35:25 Functions: 0 20 0.0 %

          Line data    Source code
       1             : #pragma once
       2             : 
       3             : #include <memory>
       4             : 
       5             : #include "envoy/api/io_error.h"
       6             : #include "envoy/api/os_sys_calls.h"
       7             : #include "envoy/common/platform.h"
       8             : #include "envoy/event/dispatcher.h"
       9             : #include "envoy/network/address.h"
      10             : #include "envoy/network/io_handle.h"
      11             : 
      12             : #include "source/common/buffer/watermark_buffer.h"
      13             : #include "source/common/common/logger.h"
      14             : #include "source/common/network/io_socket_error_impl.h"
      15             : #include "source/extensions/io_socket/user_space/file_event_impl.h"
      16             : #include "source/extensions/io_socket/user_space/io_handle.h"
      17             : 
      18             : namespace Envoy {
      19             : namespace Extensions {
      20             : namespace IoSocket {
      21             : namespace UserSpace {
      22             : 
      23             : // Align with Buffer::Reservation::MAX_SLICES_
      24             : constexpr uint32_t MAX_FRAGMENT = 8;
      25             : // Align with Buffer::Slice::default_slice_size_
      26             : constexpr uint64_t FRAGMENT_SIZE = 16 * 1024;
      27             : 
      28             : /**
      29             :  * Network::IoHandle implementation which provides a buffer as data source. It is designed to used
      30             :  * by Network::ConnectionImpl. Some known limitations include
      31             :  * 1. It doesn't include a file descriptor. Do not use "fdDoNotUse".
      32             :  * 2. It doesn't support socket options. Wrap this in ConnectionSocket and implement the socket
      33             :  * getter/setter options.
      34             :  * 3. It doesn't support UDP interface.
      35             :  * 4. The peer BufferedIoSocket must be scheduled in the same thread to avoid data race because
      36             :  *    IoHandleImpl mutates the state of peer handle and no lock is introduced.
      37             :  */
      38             : class IoHandleImpl final : public Network::IoHandle,
      39             :                            public UserSpace::IoHandle,
      40             :                            protected Logger::Loggable<Logger::Id::io>,
      41             :                            NonCopyable {
      42             : public:
      43             :   ~IoHandleImpl() override;
      44             : 
      45             :   // Network::IoHandle
      46           0 :   os_fd_t fdDoNotUse() const override {
      47           0 :     ASSERT(false, "not supported");
      48           0 :     return INVALID_SOCKET;
      49           0 :   }
      50             :   Api::IoCallUint64Result close() override;
      51             :   bool isOpen() const override;
      52             :   Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices,
      53             :                                 uint64_t num_slice) override;
      54             :   Api::IoCallUint64Result read(Buffer::Instance& buffer,
      55             :                                absl::optional<uint64_t> max_length_opt) override;
      56             :   Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override;
      57             :   Api::IoCallUint64Result write(Buffer::Instance& buffer) override;
      58             :   Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags,
      59             :                                   const Network::Address::Ip* self_ip,
      60             :                                   const Network::Address::Instance& peer_address) override;
      61             :   Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice,
      62             :                                   uint32_t self_port, RecvMsgOutput& output) override;
      63             :   Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port,
      64             :                                    RecvMsgOutput& output) override;
      65             :   Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override;
      66             :   bool supportsMmsg() const override;
      67             :   bool supportsUdpGro() const override;
      68             :   Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override;
      69             :   Api::SysCallIntResult listen(int backlog) override;
      70             :   Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override;
      71             :   Api::SysCallIntResult connect(Network::Address::InstanceConstSharedPtr address) override;
      72             :   Api::SysCallIntResult setOption(int level, int optname, const void* optval,
      73             :                                   socklen_t optlen) override;
      74             :   Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override;
      75             :   Api::SysCallIntResult ioctl(unsigned long, void*, unsigned long, void*, unsigned long,
      76             :                               unsigned long*) override;
      77             :   Api::SysCallIntResult setBlocking(bool blocking) override;
      78             :   absl::optional<int> domain() override;
      79             :   Network::Address::InstanceConstSharedPtr localAddress() override;
      80             :   Network::Address::InstanceConstSharedPtr peerAddress() override;
      81             : 
      82             :   void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
      83             :                            Event::FileTriggerType trigger, uint32_t events) override;
      84             :   Network::IoHandlePtr duplicate() override;
      85             :   void activateFileEvents(uint32_t events) override;
      86             :   void enableFileEvents(uint32_t events) override;
      87             :   void resetFileEvents() override;
      88             : 
      89             :   Api::SysCallIntResult shutdown(int how) override;
      90           0 :   absl::optional<std::chrono::milliseconds> lastRoundTripTime() override { return absl::nullopt; }
      91           0 :   absl::optional<uint64_t> congestionWindowInBytes() const override { return absl::nullopt; }
      92           0 :   absl::optional<std::string> interfaceName() override { return absl::nullopt; }
      93             : 
      94           0 :   void setWatermarks(uint32_t watermark) { pending_received_data_.setWatermarks(watermark); }
      95           0 :   void onBelowLowWatermark() {
      96           0 :     if (peer_handle_) {
      97           0 :       ENVOY_LOG(debug, "Socket {} switches to low watermark. Notify {}.", static_cast<void*>(this),
      98           0 :                 static_cast<void*>(peer_handle_));
      99           0 :       peer_handle_->onPeerBufferLowWatermark();
     100           0 :     }
     101           0 :   }
     102           0 :   void onAboveHighWatermark() {
     103           0 :     // Low to high is checked by peer after peer writes data.
     104           0 :   }
     105             : 
     106             :   // UserSpace::IoHandle
     107           0 :   void setWriteEnd() override {
     108           0 :     receive_data_end_stream_ = true;
     109           0 :     setNewDataAvailable();
     110           0 :   }
     111           0 :   void setNewDataAvailable() override {
     112           0 :     ENVOY_LOG(trace, "{} on socket {}", __FUNCTION__, static_cast<void*>(this));
     113           0 :     if (user_file_event_) {
     114           0 :       user_file_event_->activateIfEnabled(
     115           0 :           Event::FileReadyType::Read |
     116             :           // Closed ready type is defined as `end of stream`
     117           0 :           (receive_data_end_stream_ ? Event::FileReadyType::Closed : 0));
     118           0 :     }
     119           0 :   }
     120           0 :   void onPeerDestroy() override {
     121           0 :     peer_handle_ = nullptr;
     122           0 :     write_shutdown_ = true;
     123           0 :   }
     124           0 :   void onPeerBufferLowWatermark() override {
     125           0 :     if (user_file_event_) {
     126           0 :       user_file_event_->activateIfEnabled(Event::FileReadyType::Write);
     127           0 :     }
     128           0 :   }
     129           0 :   bool isWritable() const override { return !pending_received_data_.highWatermarkTriggered(); }
     130           0 :   bool isPeerShutDownWrite() const override { return receive_data_end_stream_; }
     131           0 :   bool isPeerWritable() const override {
     132           0 :     return peer_handle_ != nullptr && !peer_handle_->isPeerShutDownWrite() &&
     133           0 :            peer_handle_->isWritable();
     134           0 :   }
     135           0 :   Buffer::Instance* getWriteBuffer() override { return &pending_received_data_; }
     136             : 
     137             :   // `UserspaceIoHandle`
     138           0 :   bool isReadable() const override {
     139           0 :     return isPeerShutDownWrite() || pending_received_data_.length() > 0;
     140           0 :   }
     141             : 
     142             :   // Set the peer which will populate the owned pending_received_data.
     143           0 :   void setPeerHandle(UserSpace::IoHandle* writable_peer) {
     144             :     // Swapping writable peer is undefined behavior.
     145           0 :     ASSERT(!peer_handle_);
     146           0 :     ASSERT(!write_shutdown_);
     147           0 :     peer_handle_ = writable_peer;
     148           0 :     ENVOY_LOG(trace, "io handle {} set peer handle to {}.", static_cast<void*>(this),
     149           0 :               static_cast<void*>(writable_peer));
     150           0 :   }
     151             : 
     152           0 :   PassthroughStateSharedPtr passthroughState() override { return passthrough_state_; }
     153             : 
     154             : private:
     155             :   friend class IoHandleFactory;
     156             :   explicit IoHandleImpl(PassthroughStateSharedPtr passthrough_state = nullptr);
     157             : 
     158             :   static const Network::Address::InstanceConstSharedPtr& getCommonInternalAddress();
     159             : 
     160             :   // Support isOpen() and close(). Network::IoHandle owner must invoke close() to avoid potential
     161             :   // resource leak.
     162             :   bool closed_{false};
     163             : 
     164             :   // The attached file event with this socket.
     165             :   std::unique_ptr<FileEventImpl> user_file_event_;
     166             : 
     167             :   // True if pending_received_data_ is not addable. Note that pending_received_data_ may have
     168             :   // pending data to drain.
     169             :   bool receive_data_end_stream_{false};
     170             : 
     171             :   // The buffer owned by this socket. This buffer is populated by the write operations of the peer
     172             :   // socket and drained by read operations of this socket.
     173             :   Buffer::WatermarkBuffer pending_received_data_;
     174             : 
     175             :   // Destination of the write(). The value remains non-null until the peer is closed.
     176             :   UserSpace::IoHandle* peer_handle_{nullptr};
     177             : 
     178             :   // The flag whether the peer is valid. Any write attempt must check this flag.
     179             :   bool write_shutdown_{false};
     180             : 
     181             :   // Shared state between peer handles.
     182             :   PassthroughStateSharedPtr passthrough_state_{nullptr};
     183             : };
     184             : 
     185             : class PassthroughStateImpl : public PassthroughState, public Logger::Loggable<Logger::Id::io> {
     186             : public:
     187             :   void initialize(std::unique_ptr<envoy::config::core::v3::Metadata> metadata,
     188             :                   const StreamInfo::FilterState::Objects& filter_state_objects) override;
     189             :   void mergeInto(envoy::config::core::v3::Metadata& metadata,
     190             :                  StreamInfo::FilterState& filter_state) override;
     191             : 
     192             : private:
     193             :   enum class State { Created, Initialized, Done };
     194             :   State state_{State::Created};
     195             :   std::unique_ptr<envoy::config::core::v3::Metadata> metadata_;
     196             :   StreamInfo::FilterState::Objects filter_state_objects_;
     197             : };
     198             : 
     199             : using IoHandleImplPtr = std::unique_ptr<IoHandleImpl>;
     200             : class IoHandleFactory {
     201             : public:
     202           0 :   static std::pair<IoHandleImplPtr, IoHandleImplPtr> createIoHandlePair() {
     203           0 :     auto state = std::make_shared<PassthroughStateImpl>();
     204           0 :     auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state),
     205           0 :                                                          new IoHandleImpl(state)};
     206           0 :     p.first->setPeerHandle(p.second.get());
     207           0 :     p.second->setPeerHandle(p.first.get());
     208           0 :     return p;
     209           0 :   }
     210             :   static std::pair<IoHandleImplPtr, IoHandleImplPtr>
     211           0 :   createBufferLimitedIoHandlePair(uint32_t buffer_size) {
     212           0 :     auto state = std::make_shared<PassthroughStateImpl>();
     213           0 :     auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state),
     214           0 :                                                          new IoHandleImpl(state)};
     215             :     // This buffer watermark setting emulates the OS socket buffer parameter
     216             :     // `/proc/sys/net/ipv4/tcp_{r,w}mem`.
     217           0 :     p.first->setWatermarks(buffer_size);
     218           0 :     p.second->setWatermarks(buffer_size);
     219           0 :     p.first->setPeerHandle(p.second.get());
     220           0 :     p.second->setPeerHandle(p.first.get());
     221           0 :     return p;
     222           0 :   }
     223             : };
     224             : } // namespace UserSpace
     225             : } // namespace IoSocket
     226             : } // namespace Extensions
     227             : } // namespace Envoy

Generated by: LCOV version 1.15