Line data Source code
1 : #pragma once 2 : 3 : #include <memory> 4 : 5 : #include "envoy/api/io_error.h" 6 : #include "envoy/api/os_sys_calls.h" 7 : #include "envoy/common/platform.h" 8 : #include "envoy/event/dispatcher.h" 9 : #include "envoy/network/address.h" 10 : #include "envoy/network/io_handle.h" 11 : 12 : #include "source/common/buffer/watermark_buffer.h" 13 : #include "source/common/common/logger.h" 14 : #include "source/common/network/io_socket_error_impl.h" 15 : #include "source/extensions/io_socket/user_space/file_event_impl.h" 16 : #include "source/extensions/io_socket/user_space/io_handle.h" 17 : 18 : namespace Envoy { 19 : namespace Extensions { 20 : namespace IoSocket { 21 : namespace UserSpace { 22 : 23 : // Align with Buffer::Reservation::MAX_SLICES_ 24 : constexpr uint32_t MAX_FRAGMENT = 8; 25 : // Align with Buffer::Slice::default_slice_size_ 26 : constexpr uint64_t FRAGMENT_SIZE = 16 * 1024; 27 : 28 : /** 29 : * Network::IoHandle implementation which provides a buffer as data source. It is designed to used 30 : * by Network::ConnectionImpl. Some known limitations include 31 : * 1. It doesn't include a file descriptor. Do not use "fdDoNotUse". 32 : * 2. It doesn't support socket options. Wrap this in ConnectionSocket and implement the socket 33 : * getter/setter options. 34 : * 3. It doesn't support UDP interface. 35 : * 4. The peer BufferedIoSocket must be scheduled in the same thread to avoid data race because 36 : * IoHandleImpl mutates the state of peer handle and no lock is introduced. 37 : */ 38 : class IoHandleImpl final : public Network::IoHandle, 39 : public UserSpace::IoHandle, 40 : protected Logger::Loggable<Logger::Id::io>, 41 : NonCopyable { 42 : public: 43 : ~IoHandleImpl() override; 44 : 45 : // Network::IoHandle 46 0 : os_fd_t fdDoNotUse() const override { 47 0 : ASSERT(false, "not supported"); 48 0 : return INVALID_SOCKET; 49 0 : } 50 : Api::IoCallUint64Result close() override; 51 : bool isOpen() const override; 52 : Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, 53 : uint64_t num_slice) override; 54 : Api::IoCallUint64Result read(Buffer::Instance& buffer, 55 : absl::optional<uint64_t> max_length_opt) override; 56 : Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; 57 : Api::IoCallUint64Result write(Buffer::Instance& buffer) override; 58 : Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags, 59 : const Network::Address::Ip* self_ip, 60 : const Network::Address::Instance& peer_address) override; 61 : Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice, 62 : uint32_t self_port, RecvMsgOutput& output) override; 63 : Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port, 64 : RecvMsgOutput& output) override; 65 : Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override; 66 : bool supportsMmsg() const override; 67 : bool supportsUdpGro() const override; 68 : Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override; 69 : Api::SysCallIntResult listen(int backlog) override; 70 : Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; 71 : Api::SysCallIntResult connect(Network::Address::InstanceConstSharedPtr address) override; 72 : Api::SysCallIntResult setOption(int level, int optname, const void* optval, 73 : socklen_t optlen) override; 74 : Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override; 75 : Api::SysCallIntResult ioctl(unsigned long, void*, unsigned long, void*, unsigned long, 76 : unsigned long*) override; 77 : Api::SysCallIntResult setBlocking(bool blocking) override; 78 : absl::optional<int> domain() override; 79 : Network::Address::InstanceConstSharedPtr localAddress() override; 80 : Network::Address::InstanceConstSharedPtr peerAddress() override; 81 : 82 : void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, 83 : Event::FileTriggerType trigger, uint32_t events) override; 84 : Network::IoHandlePtr duplicate() override; 85 : void activateFileEvents(uint32_t events) override; 86 : void enableFileEvents(uint32_t events) override; 87 : void resetFileEvents() override; 88 : 89 : Api::SysCallIntResult shutdown(int how) override; 90 0 : absl::optional<std::chrono::milliseconds> lastRoundTripTime() override { return absl::nullopt; } 91 0 : absl::optional<uint64_t> congestionWindowInBytes() const override { return absl::nullopt; } 92 0 : absl::optional<std::string> interfaceName() override { return absl::nullopt; } 93 : 94 0 : void setWatermarks(uint32_t watermark) { pending_received_data_.setWatermarks(watermark); } 95 0 : void onBelowLowWatermark() { 96 0 : if (peer_handle_) { 97 0 : ENVOY_LOG(debug, "Socket {} switches to low watermark. Notify {}.", static_cast<void*>(this), 98 0 : static_cast<void*>(peer_handle_)); 99 0 : peer_handle_->onPeerBufferLowWatermark(); 100 0 : } 101 0 : } 102 0 : void onAboveHighWatermark() { 103 0 : // Low to high is checked by peer after peer writes data. 104 0 : } 105 : 106 : // UserSpace::IoHandle 107 0 : void setWriteEnd() override { 108 0 : receive_data_end_stream_ = true; 109 0 : setNewDataAvailable(); 110 0 : } 111 0 : void setNewDataAvailable() override { 112 0 : ENVOY_LOG(trace, "{} on socket {}", __FUNCTION__, static_cast<void*>(this)); 113 0 : if (user_file_event_) { 114 0 : user_file_event_->activateIfEnabled( 115 0 : Event::FileReadyType::Read | 116 : // Closed ready type is defined as `end of stream` 117 0 : (receive_data_end_stream_ ? Event::FileReadyType::Closed : 0)); 118 0 : } 119 0 : } 120 0 : void onPeerDestroy() override { 121 0 : peer_handle_ = nullptr; 122 0 : write_shutdown_ = true; 123 0 : } 124 0 : void onPeerBufferLowWatermark() override { 125 0 : if (user_file_event_) { 126 0 : user_file_event_->activateIfEnabled(Event::FileReadyType::Write); 127 0 : } 128 0 : } 129 0 : bool isWritable() const override { return !pending_received_data_.highWatermarkTriggered(); } 130 0 : bool isPeerShutDownWrite() const override { return receive_data_end_stream_; } 131 0 : bool isPeerWritable() const override { 132 0 : return peer_handle_ != nullptr && !peer_handle_->isPeerShutDownWrite() && 133 0 : peer_handle_->isWritable(); 134 0 : } 135 0 : Buffer::Instance* getWriteBuffer() override { return &pending_received_data_; } 136 : 137 : // `UserspaceIoHandle` 138 0 : bool isReadable() const override { 139 0 : return isPeerShutDownWrite() || pending_received_data_.length() > 0; 140 0 : } 141 : 142 : // Set the peer which will populate the owned pending_received_data. 143 0 : void setPeerHandle(UserSpace::IoHandle* writable_peer) { 144 : // Swapping writable peer is undefined behavior. 145 0 : ASSERT(!peer_handle_); 146 0 : ASSERT(!write_shutdown_); 147 0 : peer_handle_ = writable_peer; 148 0 : ENVOY_LOG(trace, "io handle {} set peer handle to {}.", static_cast<void*>(this), 149 0 : static_cast<void*>(writable_peer)); 150 0 : } 151 : 152 0 : PassthroughStateSharedPtr passthroughState() override { return passthrough_state_; } 153 : 154 : private: 155 : friend class IoHandleFactory; 156 : explicit IoHandleImpl(PassthroughStateSharedPtr passthrough_state = nullptr); 157 : 158 : static const Network::Address::InstanceConstSharedPtr& getCommonInternalAddress(); 159 : 160 : // Support isOpen() and close(). Network::IoHandle owner must invoke close() to avoid potential 161 : // resource leak. 162 : bool closed_{false}; 163 : 164 : // The attached file event with this socket. 165 : std::unique_ptr<FileEventImpl> user_file_event_; 166 : 167 : // True if pending_received_data_ is not addable. Note that pending_received_data_ may have 168 : // pending data to drain. 169 : bool receive_data_end_stream_{false}; 170 : 171 : // The buffer owned by this socket. This buffer is populated by the write operations of the peer 172 : // socket and drained by read operations of this socket. 173 : Buffer::WatermarkBuffer pending_received_data_; 174 : 175 : // Destination of the write(). The value remains non-null until the peer is closed. 176 : UserSpace::IoHandle* peer_handle_{nullptr}; 177 : 178 : // The flag whether the peer is valid. Any write attempt must check this flag. 179 : bool write_shutdown_{false}; 180 : 181 : // Shared state between peer handles. 182 : PassthroughStateSharedPtr passthrough_state_{nullptr}; 183 : }; 184 : 185 : class PassthroughStateImpl : public PassthroughState, public Logger::Loggable<Logger::Id::io> { 186 : public: 187 : void initialize(std::unique_ptr<envoy::config::core::v3::Metadata> metadata, 188 : const StreamInfo::FilterState::Objects& filter_state_objects) override; 189 : void mergeInto(envoy::config::core::v3::Metadata& metadata, 190 : StreamInfo::FilterState& filter_state) override; 191 : 192 : private: 193 : enum class State { Created, Initialized, Done }; 194 : State state_{State::Created}; 195 : std::unique_ptr<envoy::config::core::v3::Metadata> metadata_; 196 : StreamInfo::FilterState::Objects filter_state_objects_; 197 : }; 198 : 199 : using IoHandleImplPtr = std::unique_ptr<IoHandleImpl>; 200 : class IoHandleFactory { 201 : public: 202 0 : static std::pair<IoHandleImplPtr, IoHandleImplPtr> createIoHandlePair() { 203 0 : auto state = std::make_shared<PassthroughStateImpl>(); 204 0 : auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state), 205 0 : new IoHandleImpl(state)}; 206 0 : p.first->setPeerHandle(p.second.get()); 207 0 : p.second->setPeerHandle(p.first.get()); 208 0 : return p; 209 0 : } 210 : static std::pair<IoHandleImplPtr, IoHandleImplPtr> 211 0 : createBufferLimitedIoHandlePair(uint32_t buffer_size) { 212 0 : auto state = std::make_shared<PassthroughStateImpl>(); 213 0 : auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state), 214 0 : new IoHandleImpl(state)}; 215 : // This buffer watermark setting emulates the OS socket buffer parameter 216 : // `/proc/sys/net/ipv4/tcp_{r,w}mem`. 217 0 : p.first->setWatermarks(buffer_size); 218 0 : p.second->setWatermarks(buffer_size); 219 0 : p.first->setPeerHandle(p.second.get()); 220 0 : p.second->setPeerHandle(p.first.get()); 221 0 : return p; 222 0 : } 223 : }; 224 : } // namespace UserSpace 225 : } // namespace IoSocket 226 : } // namespace Extensions 227 : } // namespace Envoy