1
#pragma once
2

            
3
#include <memory>
4

            
5
#include "envoy/api/io_error.h"
6
#include "envoy/api/os_sys_calls.h"
7
#include "envoy/common/platform.h"
8
#include "envoy/event/dispatcher.h"
9
#include "envoy/network/address.h"
10
#include "envoy/network/io_handle.h"
11

            
12
#include "source/common/buffer/watermark_buffer.h"
13
#include "source/common/common/logger.h"
14
#include "source/common/network/io_socket_error_impl.h"
15
#include "source/extensions/io_socket/user_space/file_event_impl.h"
16
#include "source/extensions/io_socket/user_space/io_handle.h"
17

            
18
namespace Envoy {
19
namespace Extensions {
20
namespace IoSocket {
21
namespace UserSpace {
22

            
23
// Align with Buffer::Reservation::MAX_SLICES_
24
constexpr uint32_t MAX_FRAGMENT = 8;
25
// Align with Buffer::Slice::default_slice_size_
26
constexpr uint64_t FRAGMENT_SIZE = 16 * 1024;
27

            
28
/**
29
 * Network::IoHandle implementation which provides a buffer as data source. It is designed to used
30
 * by Network::ConnectionImpl. Some known limitations include
31
 * 1. It doesn't include a file descriptor. Do not use "fdDoNotUse".
32
 * 2. It doesn't support socket options. Wrap this in ConnectionSocket and implement the socket
33
 * getter/setter options.
34
 * 3. It doesn't support UDP interface.
35
 * 4. The peer BufferedIoSocket must be scheduled in the same thread to avoid data race because
36
 *    IoHandleImpl mutates the state of peer handle and no lock is introduced.
37
 */
38
class IoHandleImpl final : public Network::IoHandle,
39
                           public UserSpace::IoHandle,
40
                           protected Logger::Loggable<Logger::Id::io>,
41
                           NonCopyable {
42
public:
43
  ~IoHandleImpl() override;
44

            
45
  // Network::IoHandle
46
  os_fd_t fdDoNotUse() const override {
47
    ASSERT(false, "not supported");
48
    return INVALID_SOCKET;
49
  }
50
  Api::IoCallUint64Result close() override;
51
  bool isOpen() const override;
52
  bool wasConnected() const override;
53
  Api::IoCallUint64Result readv(uint64_t max_length, Buffer::RawSlice* slices,
54
                                uint64_t num_slice) override;
55
  Api::IoCallUint64Result read(Buffer::Instance& buffer,
56
                               absl::optional<uint64_t> max_length_opt) override;
57
  Api::IoCallUint64Result writev(const Buffer::RawSlice* slices, uint64_t num_slice) override;
58
  Api::IoCallUint64Result write(Buffer::Instance& buffer) override;
59
  Api::IoCallUint64Result sendmsg(const Buffer::RawSlice* slices, uint64_t num_slice, int flags,
60
                                  const Network::Address::Ip* self_ip,
61
                                  const Network::Address::Instance& peer_address) override;
62
  Api::IoCallUint64Result recvmsg(Buffer::RawSlice* slices, const uint64_t num_slice,
63
                                  uint32_t self_port,
64
                                  const Network::IoHandle::UdpSaveCmsgConfig& udp_save_cmsg_config,
65
                                  RecvMsgOutput& output) override;
66
  Api::IoCallUint64Result recvmmsg(RawSliceArrays& slices, uint32_t self_port,
67
                                   const Network::IoHandle::UdpSaveCmsgConfig& udp_save_cmsg_config,
68
                                   RecvMsgOutput& output) override;
69
  Api::IoCallUint64Result recv(void* buffer, size_t length, int flags) override;
70
  bool supportsMmsg() const override;
71
  bool supportsUdpGro() const override;
72
  Api::SysCallIntResult bind(Network::Address::InstanceConstSharedPtr address) override;
73
  Api::SysCallIntResult listen(int backlog) override;
74
  Network::IoHandlePtr accept(struct sockaddr* addr, socklen_t* addrlen) override;
75
  Api::SysCallIntResult connect(Network::Address::InstanceConstSharedPtr address) override;
76
  Api::SysCallIntResult setOption(int level, int optname, const void* optval,
77
                                  socklen_t optlen) override;
78
  Api::SysCallIntResult getOption(int level, int optname, void* optval, socklen_t* optlen) override;
79
  Api::SysCallIntResult ioctl(unsigned long, void*, unsigned long, void*, unsigned long,
80
                              unsigned long*) override;
81
  Api::SysCallIntResult setBlocking(bool blocking) override;
82
  absl::optional<int> domain() override;
83
  absl::StatusOr<Network::Address::InstanceConstSharedPtr> localAddress() override;
84
  absl::StatusOr<Network::Address::InstanceConstSharedPtr> peerAddress() override;
85

            
86
  void initializeFileEvent(Event::Dispatcher& dispatcher, Event::FileReadyCb cb,
87
                           Event::FileTriggerType trigger, uint32_t events) override;
88
  Network::IoHandlePtr duplicate() override;
89
  void activateFileEvents(uint32_t events) override;
90
  void enableFileEvents(uint32_t events) override;
91
  void resetFileEvents() override;
92

            
93
  Api::SysCallIntResult shutdown(int how) override;
94
1
  absl::optional<std::chrono::milliseconds> lastRoundTripTime() override { return absl::nullopt; }
95
  absl::optional<uint64_t> congestionWindowInBytes() const override { return absl::nullopt; }
96
1
  absl::optional<std::string> interfaceName() override { return absl::nullopt; }
97

            
98
53
  void setWatermarks(uint32_t watermark) { pending_received_data_.setWatermarks(watermark); }
99
15
  void onBelowLowWatermark() {
100
15
    if (peer_handle_) {
101
15
      ENVOY_LOG(debug, "Socket {} switches to low watermark. Notify {}.", static_cast<void*>(this),
102
15
                static_cast<void*>(peer_handle_));
103
15
      peer_handle_->onPeerBufferLowWatermark();
104
15
    }
105
15
  }
106
18
  void onAboveHighWatermark() {
107
    // Low to high is checked by peer after peer populates the receive buffer.
108
18
  }
109

            
110
  // UserSpace::IoHandle
111
124
  void setEof() override {
112
124
    receive_data_end_stream_ = true;
113
124
    setNewDataAvailable();
114
124
  }
115
194
  void setNewDataAvailable() override {
116
194
    ENVOY_LOG(trace, "{} on socket {}", __FUNCTION__, static_cast<void*>(this));
117
194
    if (user_file_event_) {
118
89
      user_file_event_->activateIfEnabled(
119
89
          Event::FileReadyType::Read |
120
          // Closed ready type is defined as `end of stream`
121
89
          (receive_data_end_stream_ ? Event::FileReadyType::Closed : 0));
122
89
    }
123
194
  }
124
94
  void onPeerDestroy() override {
125
94
    peer_handle_ = nullptr;
126
94
    sent_eof_ = true;
127
94
  }
128
15
  void onPeerBufferLowWatermark() override {
129
15
    if (user_file_event_) {
130
11
      user_file_event_->activateIfEnabled(Event::FileReadyType::Write);
131
11
    }
132
15
  }
133
205
  bool canReceiveData() const override { return !pending_received_data_.highWatermarkTriggered(); }
134
365
  bool hasReceivedEof() const override { return receive_data_end_stream_; }
135
106
  bool isWritable() const override {
136
106
    return peer_handle_ != nullptr && !peer_handle_->hasReceivedEof() &&
137
106
           peer_handle_->canReceiveData();
138
106
  }
139
82
  Buffer::Instance* getReceiveBuffer() override { return &pending_received_data_; }
140

            
141
  // `UserspaceIoHandle`
142
99
  bool isReadable() const override {
143
99
    return hasReceivedEof() || pending_received_data_.length() > 0;
144
99
  }
145

            
146
  // Set the peer which will populate the owned pending_received_data.
147
188
  void setPeerHandle(UserSpace::IoHandle* peer_handle) {
148
    // Swapping peers is undefined behavior.
149
188
    ASSERT(!peer_handle_);
150
188
    ASSERT(!sent_eof_);
151
188
    peer_handle_ = peer_handle;
152
188
    ENVOY_LOG(trace, "io handle {} set peer handle to {}.", static_cast<void*>(this),
153
188
              static_cast<void*>(peer_handle));
154
188
  }
155

            
156
76
  PassthroughStateSharedPtr passthroughState() override { return passthrough_state_; }
157

            
158
private:
159
  friend class IoHandleFactory;
160
  explicit IoHandleImpl(PassthroughStateSharedPtr passthrough_state = nullptr);
161

            
162
  static const Network::Address::InstanceConstSharedPtr& getCommonInternalAddress();
163

            
164
  // Support isOpen() and close(). Network::IoHandle owner must invoke close() to avoid potential
165
  // resource leak.
166
  bool closed_{false};
167

            
168
  // The attached file event with this socket.
169
  std::unique_ptr<FileEventImpl> user_file_event_;
170

            
171
  // True if pending_received_data_ is not addable. Note that pending_received_data_ may have
172
  // pending data to drain.
173
  bool receive_data_end_stream_{false};
174

            
175
  // The buffer owned by this socket. This buffer is populated by the write operations of the peer
176
  // socket and drained by read operations of this socket.
177
  Buffer::WatermarkBuffer pending_received_data_;
178

            
179
  // write() calls will populate the receive buffer of the peer handle. Guaranteed to be non-null
180
  // until close() is called on either this handle or the peer handle.
181
  UserSpace::IoHandle* peer_handle_{nullptr};
182

            
183
  // Indicates whether this handle has sent EOF to the peer by calling setEof().
184
  bool sent_eof_{false};
185

            
186
  // Shared state between peer handles.
187
  PassthroughStateSharedPtr passthrough_state_{nullptr};
188
};
189

            
190
class PassthroughStateImpl : public PassthroughState, public Logger::Loggable<Logger::Id::io> {
191
public:
192
  void initialize(std::unique_ptr<envoy::config::core::v3::Metadata> metadata,
193
                  const StreamInfo::FilterState::Objects& filter_state_objects) override;
194
  void mergeInto(envoy::config::core::v3::Metadata& metadata,
195
                 StreamInfo::FilterState& filter_state) override;
196

            
197
protected:
198
  enum class State { Created, Initialized, Done };
199
  State state_{State::Created};
200
  std::unique_ptr<envoy::config::core::v3::Metadata> metadata_;
201
  StreamInfo::FilterState::Objects filter_state_objects_;
202
};
203

            
204
using IoHandleImplPtr = std::unique_ptr<IoHandleImpl>;
205
class IoHandleFactory {
206
public:
207
  /**
208
   * @return a pair of connected IoHandleImpl instances.
209
   * @param state optional existing value to use as the shared PassthroughState. If omitted, a
210
   * newly constructed PassthroughStateImpl will be used.
211
   */
212
  static std::pair<IoHandleImplPtr, IoHandleImplPtr>
213
  createIoHandlePair(PassthroughStatePtr state = nullptr);
214

            
215
  /**
216
   * @return a pair of connected IoHandleImpl instances with pre-configured watermarks.
217
   * @param buffer_size buffer watermark size in bytes
218
   * @param state optional existing value to use as the shared PassthroughState. If omitted, a
219
   * newly constructed PassthroughStateImpl will be used.
220
   */
221
  static std::pair<IoHandleImplPtr, IoHandleImplPtr>
222
  createBufferLimitedIoHandlePair(uint32_t buffer_size, PassthroughStatePtr state = nullptr);
223
};
224
} // namespace UserSpace
225
} // namespace IoSocket
226
} // namespace Extensions
227
} // namespace Envoy