/proc/self/cwd/source/extensions/io_socket/user_space/io_handle_impl.h
Line | Count | Source (jump to first uncovered line) |
1 | | #pragma once |
2 | | |
3 | | #include <memory> |
4 | | |
5 | | #include "envoy/api/io_error.h" |
6 | | #include "envoy/api/os_sys_calls.h" |
7 | | #include "envoy/common/platform.h" |
8 | | #include "envoy/event/dispatcher.h" |
9 | | #include "envoy/network/address.h" |
10 | | #include "envoy/network/io_handle.h" |
11 | | |
12 | | #include "source/common/buffer/watermark_buffer.h" |
13 | | #include "source/common/common/logger.h" |
14 | | #include "source/common/network/io_socket_error_impl.h" |
15 | | #include "source/extensions/io_socket/user_space/file_event_impl.h" |
16 | | #include "source/extensions/io_socket/user_space/io_handle.h" |
17 | | |
18 | | namespace Envoy { |
19 | | namespace Extensions { |
20 | | namespace IoSocket { |
21 | | namespace UserSpace { |
22 | | |
23 | | // Align with Buffer::Reservation::MAX_SLICES_ |
24 | | constexpr uint32_t MAX_FRAGMENT = 8; |
25 | | // Align with Buffer::Slice::default_slice_size_ |
26 | | constexpr uint64_t FRAGMENT_SIZE = 16 * 1024; |
27 | | |
28 | | /** |
29 | | * Network::IoHandle implementation which provides a buffer as data source. It is designed to used |
30 | | * by Network::ConnectionImpl. Some known limitations include |
31 | | * 1. It doesn't include a file descriptor. Do not use "fdDoNotUse". |
32 | | * 2. It doesn't support socket options. Wrap this in ConnectionSocket and implement the socket |
33 | | * getter/setter options. |
34 | | * 3. It doesn't support UDP interface. |
35 | | * 4. The peer BufferedIoSocket must be scheduled in the same thread to avoid data race because |
36 | | * IoHandleImpl mutates the state of peer handle and no lock is introduced. |
37 | | */ |
38 | | class IoHandleImpl final : public Network::IoHandle, |
39 | | public UserSpace::IoHandle, |
40 | | protected Logger::Loggable<Logger::Id::io>, |
41 | | NonCopyable { |
42 | | public: |
43 | | ~IoHandleImpl() override; |
44 | | |
45 | | // Network::IoHandle |
46 | 0 | os_fd_t fdDoNotUse() const override { |
47 | 0 | ASSERT(false, "not supported"); |
48 | 0 | return INVALID_SOCKET; |
49 | 0 | } |
50 | | Api::IoCallUint64Result close() override; |
51 | | bool isOpen() const override; |
52 | | Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices, |
53 | | uint64_t num_slice) override; |
54 | | Api::IoCallUint64Result read(Buffer::Instance& buffer, |
55 | | absl::optional<uint64_t> max_length_opt) override; |
56 | | Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override; |
57 | | Api::IoCallUint64Result write(Buffer::Instance& buffer) override; |
58 | | Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags, |
59 | | const Network::Address::Ip* self_ip, |
60 | | const Network::Address::Instance& peer_address) override; |
61 | | Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice, |
62 | | uint32_t self_port, RecvMsgOutput& output) override; |
63 | | Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port, |
64 | | RecvMsgOutput& output) override; |
65 | | Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override; |
66 | | bool supportsMmsg() const override; |
67 | | bool supportsUdpGro() const override; |
68 | | Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override; |
69 | | Api::SysCallIntResult listen(int backlog) override; |
70 | | Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override; |
71 | | Api::SysCallIntResult connect(Network::Address::InstanceConstSharedPtr address) override; |
72 | | Api::SysCallIntResult setOption(int level, int optname, const void* optval, |
73 | | socklen_t optlen) override; |
74 | | Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override; |
75 | | Api::SysCallIntResult ioctl(unsigned long, void*, unsigned long, void*, unsigned long, |
76 | | unsigned long*) override; |
77 | | Api::SysCallIntResult setBlocking(bool blocking) override; |
78 | | absl::optional<int> domain() override; |
79 | | Network::Address::InstanceConstSharedPtr localAddress() override; |
80 | | Network::Address::InstanceConstSharedPtr peerAddress() override; |
81 | | |
82 | | void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb, |
83 | | Event::FileTriggerType trigger, uint32_t events) override; |
84 | | Network::IoHandlePtr duplicate() override; |
85 | | void activateFileEvents(uint32_t events) override; |
86 | | void enableFileEvents(uint32_t events) override; |
87 | | void resetFileEvents() override; |
88 | | |
89 | | Api::SysCallIntResult shutdown(int how) override; |
90 | 0 | absl::optional<std::chrono::milliseconds> lastRoundTripTime() override { return absl::nullopt; } |
91 | 0 | absl::optional<uint64_t> congestionWindowInBytes() const override { return absl::nullopt; } |
92 | 0 | absl::optional<std::string> interfaceName() override { return absl::nullopt; } |
93 | | |
94 | 0 | void setWatermarks(uint32_t watermark) { pending_received_data_.setWatermarks(watermark); } |
95 | 0 | void onBelowLowWatermark() { |
96 | 0 | if (peer_handle_) { |
97 | 0 | ENVOY_LOG(debug, "Socket {} switches to low watermark. Notify {}.", static_cast<void*>(this), |
98 | 0 | static_cast<void*>(peer_handle_)); |
99 | 0 | peer_handle_->onPeerBufferLowWatermark(); |
100 | 0 | } |
101 | 0 | } |
102 | 0 | void onAboveHighWatermark() { |
103 | | // Low to high is checked by peer after peer writes data. |
104 | 0 | } |
105 | | |
106 | | // UserSpace::IoHandle |
107 | 0 | void setWriteEnd() override { |
108 | 0 | receive_data_end_stream_ = true; |
109 | 0 | setNewDataAvailable(); |
110 | 0 | } |
111 | 0 | void setNewDataAvailable() override { |
112 | 0 | ENVOY_LOG(trace, "{} on socket {}", __FUNCTION__, static_cast<void*>(this)); |
113 | 0 | if (user_file_event_) { |
114 | 0 | user_file_event_->activateIfEnabled( |
115 | 0 | Event::FileReadyType::Read | |
116 | | // Closed ready type is defined as `end of stream` |
117 | 0 | (receive_data_end_stream_ ? Event::FileReadyType::Closed : 0)); |
118 | 0 | } |
119 | 0 | } |
120 | 0 | void onPeerDestroy() override { |
121 | 0 | peer_handle_ = nullptr; |
122 | 0 | write_shutdown_ = true; |
123 | 0 | } |
124 | 0 | void onPeerBufferLowWatermark() override { |
125 | 0 | if (user_file_event_) { |
126 | 0 | user_file_event_->activateIfEnabled(Event::FileReadyType::Write); |
127 | 0 | } |
128 | 0 | } |
129 | 0 | bool isWritable() const override { return !pending_received_data_.highWatermarkTriggered(); } |
130 | 0 | bool isPeerShutDownWrite() const override { return receive_data_end_stream_; } |
131 | 0 | bool isPeerWritable() const override { |
132 | 0 | return peer_handle_ != nullptr && !peer_handle_->isPeerShutDownWrite() && |
133 | 0 | peer_handle_->isWritable(); |
134 | 0 | } |
135 | 0 | Buffer::Instance* getWriteBuffer() override { return &pending_received_data_; } |
136 | | |
137 | | // `UserspaceIoHandle` |
138 | 0 | bool isReadable() const override { |
139 | 0 | return isPeerShutDownWrite() || pending_received_data_.length() > 0; |
140 | 0 | } |
141 | | |
142 | | // Set the peer which will populate the owned pending_received_data. |
143 | 0 | void setPeerHandle(UserSpace::IoHandle* writable_peer) { |
144 | | // Swapping writable peer is undefined behavior. |
145 | 0 | ASSERT(!peer_handle_); |
146 | 0 | ASSERT(!write_shutdown_); |
147 | 0 | peer_handle_ = writable_peer; |
148 | 0 | ENVOY_LOG(trace, "io handle {} set peer handle to {}.", static_cast<void*>(this), |
149 | 0 | static_cast<void*>(writable_peer)); |
150 | 0 | } |
151 | | |
152 | 0 | PassthroughStateSharedPtr passthroughState() override { return passthrough_state_; } |
153 | | |
154 | | private: |
155 | | friend class IoHandleFactory; |
156 | | explicit IoHandleImpl(PassthroughStateSharedPtr passthrough_state = nullptr); |
157 | | |
158 | | static const Network::Address::InstanceConstSharedPtr& getCommonInternalAddress(); |
159 | | |
160 | | // Support isOpen() and close(). Network::IoHandle owner must invoke close() to avoid potential |
161 | | // resource leak. |
162 | | bool closed_{false}; |
163 | | |
164 | | // The attached file event with this socket. |
165 | | std::unique_ptr<FileEventImpl> user_file_event_; |
166 | | |
167 | | // True if pending_received_data_ is not addable. Note that pending_received_data_ may have |
168 | | // pending data to drain. |
169 | | bool receive_data_end_stream_{false}; |
170 | | |
171 | | // The buffer owned by this socket. This buffer is populated by the write operations of the peer |
172 | | // socket and drained by read operations of this socket. |
173 | | Buffer::WatermarkBuffer pending_received_data_; |
174 | | |
175 | | // Destination of the write(). The value remains non-null until the peer is closed. |
176 | | UserSpace::IoHandle* peer_handle_{nullptr}; |
177 | | |
178 | | // The flag whether the peer is valid. Any write attempt must check this flag. |
179 | | bool write_shutdown_{false}; |
180 | | |
181 | | // Shared state between peer handles. |
182 | | PassthroughStateSharedPtr passthrough_state_{nullptr}; |
183 | | }; |
184 | | |
185 | | class PassthroughStateImpl : public PassthroughState, public Logger::Loggable<Logger::Id::io> { |
186 | | public: |
187 | | void initialize(std::unique_ptr<envoy::config::core::v3::Metadata> metadata, |
188 | | const StreamInfo::FilterState::Objects& filter_state_objects) override; |
189 | | void mergeInto(envoy::config::core::v3::Metadata& metadata, |
190 | | StreamInfo::FilterState& filter_state) override; |
191 | | |
192 | | private: |
193 | | enum class State { Created, Initialized, Done }; |
194 | | State state_{State::Created}; |
195 | | std::unique_ptr<envoy::config::core::v3::Metadata> metadata_; |
196 | | StreamInfo::FilterState::Objects filter_state_objects_; |
197 | | }; |
198 | | |
199 | | using IoHandleImplPtr = std::unique_ptr<IoHandleImpl>; |
200 | | class IoHandleFactory { |
201 | | public: |
202 | 0 | static std::pair<IoHandleImplPtr, IoHandleImplPtr> createIoHandlePair() { |
203 | 0 | auto state = std::make_shared<PassthroughStateImpl>(); |
204 | 0 | auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state), |
205 | 0 | new IoHandleImpl(state)}; |
206 | 0 | p.first->setPeerHandle(p.second.get()); |
207 | 0 | p.second->setPeerHandle(p.first.get()); |
208 | 0 | return p; |
209 | 0 | } |
210 | | static std::pair<IoHandleImplPtr, IoHandleImplPtr> |
211 | 0 | createBufferLimitedIoHandlePair(uint32_t buffer_size) { |
212 | 0 | auto state = std::make_shared<PassthroughStateImpl>(); |
213 | 0 | auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state), |
214 | 0 | new IoHandleImpl(state)}; |
215 | | // This buffer watermark setting emulates the OS socket buffer parameter |
216 | | // `/proc/sys/net/ipv4/tcp_{r,w}mem`. |
217 | 0 | p.first->setWatermarks(buffer_size); |
218 | 0 | p.second->setWatermarks(buffer_size); |
219 | 0 | p.first->setPeerHandle(p.second.get()); |
220 | 0 | p.second->setPeerHandle(p.first.get()); |
221 | 0 | return p; |
222 | 0 | } |
223 | | }; |
224 | | } // namespace UserSpace |
225 | | } // namespace IoSocket |
226 | | } // namespace Extensions |
227 | | } // namespace Envoy |