1
#pragma once
2

            
3
#include "envoy/common/io/io_uring.h"
4

            
5
#include "source/common/buffer/buffer_impl.h"
6
#include "source/common/common/linked_object.h"
7
#include "source/common/common/logger.h"
8
#include "source/common/io/io_uring_impl.h"
9

            
10
namespace Envoy {
11
namespace Io {
12

            
13
class ReadRequest : public Request {
14
public:
15
  ReadRequest(IoUringSocket& socket, uint32_t size);
16

            
17
  std::unique_ptr<uint8_t[]> buf_;
18
  std::unique_ptr<struct iovec> iov_;
19
};
20

            
21
class WriteRequest : public Request {
22
public:
23
  WriteRequest(IoUringSocket& socket, const Buffer::RawSliceVector& slices);
24

            
25
  std::unique_ptr<struct iovec[]> iov_;
26
};
27

            
28
class IoUringSocketEntry;
29
using IoUringSocketEntryPtr = std::unique_ptr<IoUringSocketEntry>;
30

            
31
class IoUringWorkerImpl : public IoUringWorker, private Logger::Loggable<Logger::Id::io> {
32
public:
33
  IoUringWorkerImpl(uint32_t io_uring_size, bool use_submission_queue_polling,
34
                    uint32_t read_buffer_size, uint32_t write_timeout_ms,
35
                    Event::Dispatcher& dispatcher);
36
  IoUringWorkerImpl(IoUringPtr&& io_uring, uint32_t read_buffer_size, uint32_t write_timeout_ms,
37
                    Event::Dispatcher& dispatcher);
38
  ~IoUringWorkerImpl() override;
39

            
40
  // IoUringWorker
41
  IoUringSocket& addServerSocket(os_fd_t fd, Event::FileReadyCb cb,
42
                                 bool enable_close_event) override;
43
  IoUringSocket& addServerSocket(os_fd_t fd, Buffer::Instance& read_buf, Event::FileReadyCb cb,
44
                                 bool enable_close_event) override;
45
  IoUringSocket& addClientSocket(os_fd_t fd, Event::FileReadyCb cb,
46
                                 bool enable_close_event) override;
47

            
48
  Request* submitConnectRequest(IoUringSocket& socket,
49
                                const Network::Address::InstanceConstSharedPtr& address) override;
50
  Request* submitReadRequest(IoUringSocket& socket) override;
51
  Request* submitWriteRequest(IoUringSocket& socket, const Buffer::RawSliceVector& slices) override;
52
  Request* submitCloseRequest(IoUringSocket& socket) override;
53
  Request* submitCancelRequest(IoUringSocket& socket, Request* request_to_cancel) override;
54
  Request* submitShutdownRequest(IoUringSocket& socket, int how) override;
55

            
56
  Event::Dispatcher& dispatcher() override;
57

            
58
  // Remove a socket from this worker.
59
  IoUringSocketEntryPtr removeSocket(IoUringSocketEntry& socket);
60

            
61
  // Inject a request completion into the iouring instance for a specific socket.
62
  void injectCompletion(IoUringSocket& socket, Request::RequestType type, int32_t result);
63

            
64
  // Return the number of sockets in this worker.
65
3
  uint32_t getNumOfSockets() const override { return sockets_.size(); }
66

            
67
protected:
68
  // Add a socket to the worker.
69
  IoUringSocketEntry& addSocket(IoUringSocketEntryPtr&& socket);
70
  void onFileEvent();
71
  void submit();
72

            
73
  // The iouring instance.
74
  IoUringPtr io_uring_;
75
  const uint32_t read_buffer_size_;
76
  const uint32_t write_timeout_ms_;
77
  // The dispatcher of this worker is running on.
78
  Event::Dispatcher& dispatcher_;
79
  // The file event of iouring's eventfd.
80
  Event::FileEventPtr file_event_{nullptr};
81
  // All the sockets in this worker.
82
  std::list<IoUringSocketEntryPtr> sockets_;
83
  // This is used to mark whether delay submit is enabled.
84
  // The IoUringWorker will delay the submit the requests which are submitted in request completion
85
  // callback.
86
  bool delay_submit_{false};
87
};
88

            
89
class IoUringSocketEntry : public IoUringSocket,
90
                           public LinkedObject<IoUringSocketEntry>,
91
                           public Event::DeferredDeletable,
92
                           protected Logger::Loggable<Logger::Id::io> {
93
public:
94
  IoUringSocketEntry(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb,
95
                     bool enable_close_event);
96

            
97
  // IoUringSocket
98
16
  IoUringWorker& getIoUringWorker() const override { return parent_; }
99
268
  os_fd_t fd() const override { return fd_; }
100

            
101
54
  void close(bool, IoUringSocketOnClosedCb cb = nullptr) override {
102
54
    status_ = Closed;
103
54
    on_closed_cb_ = cb;
104
54
  }
105
65
  void enableRead() override { status_ = ReadEnabled; }
106
10
  void disableRead() override { status_ = ReadDisabled; }
107
18
  void enableCloseEvent(bool enable) override { enable_close_event_ = enable; }
108
  void connect(const Network::Address::InstanceConstSharedPtr&) override { PANIC("not implement"); }
109

            
110
1
  void onAccept(Request*, int32_t, bool injected) override {
111
1
    if (injected && (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Accept))) {
112
1
      injected_completions_ &= ~static_cast<uint8_t>(Request::RequestType::Accept);
113
1
    }
114
1
  }
115
25
  void onConnect(Request*, int32_t, bool injected) override {
116
25
    if (injected && (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Connect))) {
117
1
      injected_completions_ &= ~static_cast<uint8_t>(Request::RequestType::Connect);
118
1
    }
119
25
  }
120
74
  void onRead(Request*, int32_t, bool injected) override {
121
74
    if (injected && (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Read))) {
122
6
      injected_completions_ &= ~static_cast<uint8_t>(Request::RequestType::Read);
123
6
    }
124
74
  }
125
42
  void onWrite(Request*, int32_t, bool injected) override {
126
42
    if (injected && (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Write))) {
127
3
      injected_completions_ &= ~static_cast<uint8_t>(Request::RequestType::Write);
128
3
    }
129
42
  }
130
48
  void onClose(Request*, int32_t, bool injected) override {
131
48
    if (injected && (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Close))) {
132
1
      injected_completions_ &= ~static_cast<uint8_t>(Request::RequestType::Close);
133
1
    }
134
48
  }
135
37
  void onCancel(Request*, int32_t, bool injected) override {
136
37
    if (injected && (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Cancel))) {
137
1
      injected_completions_ &= ~static_cast<uint8_t>(Request::RequestType::Cancel);
138
1
    }
139
37
  }
140
6
  void onShutdown(Request*, int32_t, bool injected) override {
141
6
    if (injected &&
142
6
        (injected_completions_ & static_cast<uint8_t>(Request::RequestType::Shutdown))) {
143
1
      injected_completions_ &= ~static_cast<uint8_t>(Request::RequestType::Shutdown);
144
1
    }
145
6
  }
146
  void injectCompletion(Request::RequestType type) override;
147

            
148
15
  IoUringSocketStatus getStatus() const override { return status_; }
149

            
150
63
  const OptRef<ReadParam>& getReadParam() const override { return read_param_; }
151
31
  const OptRef<WriteParam>& getWriteParam() const override { return write_param_; }
152

            
153
15
  void setFileReadyCb(Event::FileReadyCb cb) override { cb_ = std::move(cb); }
154

            
155
protected:
156
  /**
157
   * For the socket to remove itself from the IoUringWorker and defer deletion.
158
   */
159
  void cleanup();
160
  void onReadCompleted();
161
  void onWriteCompleted();
162
  void onRemoteClose();
163

            
164
  os_fd_t fd_{INVALID_SOCKET};
165
  IoUringWorkerImpl& parent_;
166
  // This records already injected completion request type to
167
  // avoid duplicated injections.
168
  uint8_t injected_completions_{0};
169
  // The current status of socket.
170
  IoUringSocketStatus status_{Initialized};
171
  // Deliver the remote close as file read event or file close event.
172
  bool enable_close_event_{false};
173
  // The callback will be invoked when close request is done.
174
  IoUringSocketOnClosedCb on_closed_cb_{nullptr};
175
  // This object stores the data get from read request.
176
  OptRef<ReadParam> read_param_;
177
  // This object stores the data get from write request.
178
  OptRef<WriteParam> write_param_;
179

            
180
  Event::FileReadyCb cb_;
181
};
182

            
183
class IoUringServerSocket : public IoUringSocketEntry {
184
public:
185
  IoUringServerSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb,
186
                      uint32_t write_timeout_ms, bool enable_close_event);
187
  IoUringServerSocket(os_fd_t fd, Buffer::Instance& read_buf, IoUringWorkerImpl& parent,
188
                      Event::FileReadyCb cb, uint32_t write_timeout_ms, bool enable_close_event);
189
  ~IoUringServerSocket() override;
190

            
191
  // IoUringSocket
192
  void close(bool keep_fd_open, IoUringSocketOnClosedCb cb = nullptr) override;
193
  void enableRead() override;
194
  void disableRead() override;
195
  void write(Buffer::Instance& data) override;
196
  uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) override;
197
  void shutdown(int how) override;
198
  void onClose(Request* req, int32_t result, bool injected) override;
199
  void onRead(Request* req, int32_t result, bool injected) override;
200
  void onWrite(Request* req, int32_t result, bool injected) override;
201
  void onShutdown(Request* req, int32_t result, bool injected) override;
202
  void onCancel(Request* req, int32_t result, bool injected) override;
203

            
204
  Buffer::OwnedImpl& getReadBuffer() { return read_buf_; }
205

            
206
protected:
207
  // Since the write of IoUringSocket is async, there may have write request is on the fly when
208
  // close the socket. This timeout is setting for a time to wait the write request done.
209
  const uint32_t write_timeout_ms_;
210
  // For read. iouring socket will read sequentially in the order of buf_ and read_error_. Unless
211
  // the buf_ is empty, the read_error_ will not be past to the handler. There is an exception that
212
  // when enable_close_event_ is set, the remote close read_error_(0) will always be past to the
213
  // handler.
214
  Request* read_req_{};
215
  // TODO (soulxu): Add water mark here.
216
  Buffer::OwnedImpl read_buf_;
217
  absl::optional<int32_t> read_error_;
218

            
219
  // TODO (soulxu): We need water mark for write buffer.
220
  // The upper layer will think the buffer released when the data copy into this write buffer.
221
  // This leads to the `IntegrationTest.TestFloodUpstreamErrors` timeout, since the http layer
222
  // always think the response is write successful, so flood protection is never kicked.
223
  //
224
  // For write. iouring socket will write sequentially in the order of write_buf_ and shutdown_
225
  // Unless the write_buf_ is empty, the shutdown operation will not be performed.
226
  Buffer::OwnedImpl write_buf_;
227
  // shutdown_ has 3 states. A absl::nullopt indicates the socket has not been shutdown, a false
228
  // value represents the socket wants to be shutdown but the shutdown has not been performed or
229
  // completed, and a true value means the socket has been shutdown.
230
  absl::optional<bool> shutdown_{};
231
  // If there is in progress write_or_shutdown_req_ during closing, a write timeout timer may be
232
  // setup to cancel the write_or_shutdown_req_, either a write request or a shutdown request. So
233
  // we can make sure all SQEs bounding to the iouring socket is completed and the socket can be
234
  // closed successfully.
235
  Request* write_or_shutdown_req_{nullptr};
236
  Event::TimerPtr write_timeout_timer_{nullptr};
237
  // Whether keep the fd open when close the IoUringSocket.
238
  bool keep_fd_open_{false};
239
  // This is used for tracking the read's cancel request.
240
  Request* read_cancel_req_{nullptr};
241
  // This is used for tracking the write or shutdown's cancel request.
242
  Request* write_or_shutdown_cancel_req_{nullptr};
243
  // This is used for tracking the close request.
244
  Request* close_req_{nullptr};
245

            
246
  void closeInternal();
247
  void submitReadRequest();
248
  void submitWriteOrShutdownRequest();
249
  void moveReadDataToBuffer(Request* req, size_t data_length);
250
  void onReadCompleted(int32_t result);
251
  void onWriteCompleted(int32_t result);
252
};
253

            
254
class IoUringClientSocket : public IoUringServerSocket {
255
public:
256
  IoUringClientSocket(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb,
257
                      uint32_t write_timeout_ms, bool enable_close_event);
258

            
259
  void connect(const Network::Address::InstanceConstSharedPtr& address) override;
260
  void onConnect(Request* req, int32_t result, bool injected) override;
261
};
262

            
263
} // namespace Io
264
} // namespace Envoy