Coverage Report

Created: 2023-11-12 09:30

/proc/self/cwd/source/extensions/io_socket/user_space/io_handle_impl.h
Line
Count
Source (jump to first uncovered line)
1
#pragma once
2
3
#include <memory>
4
5
#include "envoy/api/io_error.h"
6
#include "envoy/api/os_sys_calls.h"
7
#include "envoy/common/platform.h"
8
#include "envoy/event/dispatcher.h"
9
#include "envoy/network/address.h"
10
#include "envoy/network/io_handle.h"
11
12
#include "source/common/buffer/watermark_buffer.h"
13
#include "source/common/common/logger.h"
14
#include "source/common/network/io_socket_error_impl.h"
15
#include "source/extensions/io_socket/user_space/file_event_impl.h"
16
#include "source/extensions/io_socket/user_space/io_handle.h"
17
18
namespace Envoy {
19
namespace Extensions {
20
namespace IoSocket {
21
namespace UserSpace {
22
23
// Align with Buffer::Reservation::MAX_SLICES_
24
constexpr uint32_t MAX_FRAGMENT = 8;
25
// Align with Buffer::Slice::default_slice_size_
26
constexpr uint64_t FRAGMENT_SIZE = 16 * 1024;
27
28
/**
29
 * Network::IoHandle implementation which provides a buffer as data source. It is designed to used
30
 * by Network::ConnectionImpl. Some known limitations include
31
 * 1. It doesn't include a file descriptor. Do not use "fdDoNotUse".
32
 * 2. It doesn't support socket options. Wrap this in ConnectionSocket and implement the socket
33
 * getter/setter options.
34
 * 3. It doesn't support UDP interface.
35
 * 4. The peer BufferedIoSocket must be scheduled in the same thread to avoid data race because
36
 *    IoHandleImpl mutates the state of peer handle and no lock is introduced.
37
 */
38
class IoHandleImpl final : public Network::IoHandle,
39
                           public UserSpace::IoHandle,
40
                           protected Logger::Loggable<Logger::Id::io>,
41
                           NonCopyable {
42
public:
43
  ~IoHandleImpl() override;
44
45
  // Network::IoHandle
46
0
  os_fd_t fdDoNotUse() const override {
47
0
    ASSERT(false, "not supported");
48
0
    return INVALID_SOCKET;
49
0
  }
50
  Api::IoCallUint64Result close() override;
51
  bool isOpen() const override;
52
  Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices,
53
                                uint64_t num_slice) override;
54
  Api::IoCallUint64Result read(Buffer::Instance& buffer,
55
                               absl::optional<uint64_t> max_length_opt) override;
56
  Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override;
57
  Api::IoCallUint64Result write(Buffer::Instance& buffer) override;
58
  Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags,
59
                                  const Network::Address::Ip* self_ip,
60
                                  const Network::Address::Instance& peer_address) override;
61
  Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice,
62
                                  uint32_t self_port, RecvMsgOutput& output) override;
63
  Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port,
64
                                   RecvMsgOutput& output) override;
65
  Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override;
66
  bool supportsMmsg() const override;
67
  bool supportsUdpGro() const override;
68
  Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override;
69
  Api::SysCallIntResult listen(int backlog) override;
70
  Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override;
71
  Api::SysCallIntResult connect(Network::Address::InstanceConstSharedPtr address) override;
72
  Api::SysCallIntResult setOption(int level, int optname, const void* optval,
73
                                  socklen_t optlen) override;
74
  Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override;
75
  Api::SysCallIntResult ioctl(unsigned long, void*, unsigned long, void*, unsigned long,
76
                              unsigned long*) override;
77
  Api::SysCallIntResult setBlocking(bool blocking) override;
78
  absl::optional<int> domain() override;
79
  Network::Address::InstanceConstSharedPtr localAddress() override;
80
  Network::Address::InstanceConstSharedPtr peerAddress() override;
81
82
  void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
83
                           Event::FileTriggerType trigger, uint32_t events) override;
84
  Network::IoHandlePtr duplicate() override;
85
  void activateFileEvents(uint32_t events) override;
86
  void enableFileEvents(uint32_t events) override;
87
  void resetFileEvents() override;
88
89
  Api::SysCallIntResult shutdown(int how) override;
90
0
  absl::optional<std::chrono::milliseconds> lastRoundTripTime() override { return absl::nullopt; }
91
0
  absl::optional<uint64_t> congestionWindowInBytes() const override { return absl::nullopt; }
92
0
  absl::optional<std::string> interfaceName() override { return absl::nullopt; }
93
94
0
  void setWatermarks(uint32_t watermark) { pending_received_data_.setWatermarks(watermark); }
95
0
  void onBelowLowWatermark() {
96
0
    if (peer_handle_) {
97
0
      ENVOY_LOG(debug, "Socket {} switches to low watermark. Notify {}.", static_cast<void*>(this),
98
0
                static_cast<void*>(peer_handle_));
99
0
      peer_handle_->onPeerBufferLowWatermark();
100
0
    }
101
0
  }
102
0
  void onAboveHighWatermark() {
103
    // Low to high is checked by peer after peer writes data.
104
0
  }
105
106
  // UserSpace::IoHandle
107
0
  void setWriteEnd() override {
108
0
    receive_data_end_stream_ = true;
109
0
    setNewDataAvailable();
110
0
  }
111
0
  void setNewDataAvailable() override {
112
0
    ENVOY_LOG(trace, "{} on socket {}", __FUNCTION__, static_cast<void*>(this));
113
0
    if (user_file_event_) {
114
0
      user_file_event_->activateIfEnabled(
115
0
          Event::FileReadyType::Read |
116
          // Closed ready type is defined as `end of stream`
117
0
          (receive_data_end_stream_ ? Event::FileReadyType::Closed : 0));
118
0
    }
119
0
  }
120
0
  void onPeerDestroy() override {
121
0
    peer_handle_ = nullptr;
122
0
    write_shutdown_ = true;
123
0
  }
124
0
  void onPeerBufferLowWatermark() override {
125
0
    if (user_file_event_) {
126
0
      user_file_event_->activateIfEnabled(Event::FileReadyType::Write);
127
0
    }
128
0
  }
129
0
  bool isWritable() const override { return !pending_received_data_.highWatermarkTriggered(); }
130
0
  bool isPeerShutDownWrite() const override { return receive_data_end_stream_; }
131
0
  bool isPeerWritable() const override {
132
0
    return peer_handle_ != nullptr && !peer_handle_->isPeerShutDownWrite() &&
133
0
           peer_handle_->isWritable();
134
0
  }
135
0
  Buffer::Instance* getWriteBuffer() override { return &pending_received_data_; }
136
137
  // `UserspaceIoHandle`
138
0
  bool isReadable() const override {
139
0
    return isPeerShutDownWrite() || pending_received_data_.length() > 0;
140
0
  }
141
142
  // Set the peer which will populate the owned pending_received_data.
143
0
  void setPeerHandle(UserSpace::IoHandle* writable_peer) {
144
    // Swapping writable peer is undefined behavior.
145
0
    ASSERT(!peer_handle_);
146
0
    ASSERT(!write_shutdown_);
147
0
    peer_handle_ = writable_peer;
148
0
    ENVOY_LOG(trace, "io handle {} set peer handle to {}.", static_cast<void*>(this),
149
0
              static_cast<void*>(writable_peer));
150
0
  }
151
152
0
  PassthroughStateSharedPtr passthroughState() override { return passthrough_state_; }
153
154
private:
155
  friend class IoHandleFactory;
156
  explicit IoHandleImpl(PassthroughStateSharedPtr passthrough_state = nullptr);
157
158
  static const Network::Address::InstanceConstSharedPtr& getCommonInternalAddress();
159
160
  // Support isOpen() and close(). Network::IoHandle owner must invoke close() to avoid potential
161
  // resource leak.
162
  bool closed_{false};
163
164
  // The attached file event with this socket.
165
  std::unique_ptr<FileEventImpl> user_file_event_;
166
167
  // True if pending_received_data_ is not addable. Note that pending_received_data_ may have
168
  // pending data to drain.
169
  bool receive_data_end_stream_{false};
170
171
  // The buffer owned by this socket. This buffer is populated by the write operations of the peer
172
  // socket and drained by read operations of this socket.
173
  Buffer::WatermarkBuffer pending_received_data_;
174
175
  // Destination of the write(). The value remains non-null until the peer is closed.
176
  UserSpace::IoHandle* peer_handle_{nullptr};
177
178
  // The flag whether the peer is valid. Any write attempt must check this flag.
179
  bool write_shutdown_{false};
180
181
  // Shared state between peer handles.
182
  PassthroughStateSharedPtr passthrough_state_{nullptr};
183
};
184
185
class PassthroughStateImpl : public PassthroughState, public Logger::Loggable<Logger::Id::io> {
186
public:
187
  void initialize(std::unique_ptr<envoy::config::core::v3::Metadata> metadata,
188
                  const StreamInfo::FilterState::Objects& filter_state_objects) override;
189
  void mergeInto(envoy::config::core::v3::Metadata& metadata,
190
                 StreamInfo::FilterState& filter_state) override;
191
192
private:
193
  enum class State { Created, Initialized, Done };
194
  State state_{State::Created};
195
  std::unique_ptr<envoy::config::core::v3::Metadata> metadata_;
196
  StreamInfo::FilterState::Objects filter_state_objects_;
197
};
198
199
using IoHandleImplPtr = std::unique_ptr<IoHandleImpl>;
200
class IoHandleFactory {
201
public:
202
0
  static std::pair<IoHandleImplPtr, IoHandleImplPtr> createIoHandlePair() {
203
0
    auto state = std::make_shared<PassthroughStateImpl>();
204
0
    auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state),
205
0
                                                         new IoHandleImpl(state)};
206
0
    p.first->setPeerHandle(p.second.get());
207
0
    p.second->setPeerHandle(p.first.get());
208
0
    return p;
209
0
  }
210
  static std::pair<IoHandleImplPtr, IoHandleImplPtr>
211
0
  createBufferLimitedIoHandlePair(uint32_t buffer_size) {
212
0
    auto state = std::make_shared<PassthroughStateImpl>();
213
0
    auto p = std::pair<IoHandleImplPtr, IoHandleImplPtr>{new IoHandleImpl(state),
214
0
                                                         new IoHandleImpl(state)};
215
    // This buffer watermark setting emulates the OS socket buffer parameter
216
    // `/proc/sys/net/ipv4/tcp_{r,w}mem`.
217
0
    p.first->setWatermarks(buffer_size);
218
0
    p.second->setWatermarks(buffer_size);
219
0
    p.first->setPeerHandle(p.second.get());
220
0
    p.second->setPeerHandle(p.first.get());
221
0
    return p;
222
0
  }
223
};
224
} // namespace UserSpace
225
} // namespace IoSocket
226
} // namespace Extensions
227
} // namespace Envoy