1
#pragma once
2

            
3
#include <functional>
4

            
5
#include "envoy/common/pure.h"
6
#include "envoy/network/address.h"
7
#include "envoy/thread_local/thread_local.h"
8

            
9
namespace Envoy {
10
namespace Io {
11

            
12
class IoUringSocket;
13

            
14
/**
15
 * Abstract for io_uring I/O Request.
16
 */
17
class Request {
18
public:
19
  /**
20
   * io_uring request type.
21
   */
22
  enum class RequestType : uint8_t {
23
    Accept = 0x1,
24
    Connect = 0x2,
25
    Read = 0x4,
26
    Write = 0x8,
27
    Close = 0x10,
28
    Cancel = 0x20,
29
    Shutdown = 0x40,
30
  };
31

            
32
250
  Request(RequestType type, IoUringSocket& socket) : type_(type), socket_(socket) {}
33
250
  virtual ~Request() = default;
34

            
35
  /**
36
   * Return the request type.
37
   */
38
223
  RequestType type() const { return type_; }
39

            
40
  /**
41
   * Returns the io_uring socket the request belongs to.
42
   */
43
223
  IoUringSocket& socket() const { return socket_; }
44

            
45
private:
46
  RequestType type_;
47
  IoUringSocket& socket_;
48
};
49

            
50
/**
51
 * Callback invoked when iterating over entries in the completion queue.
52
 * @param user_data is any data attached to an entry submitted to the submission
53
 * queue.
54
 * @param result is a return code of submitted system call.
55
 * @param injected indicates whether the completion is injected or not.
56
 */
57
using CompletionCb = std::function<void(Request* user_data, int32_t result, bool injected)>;
58

            
59
/**
60
 * Callback for releasing the user data.
61
 * @param user_data the pointer to the user data.
62
 */
63
using InjectedCompletionUserDataReleasor = std::function<void(Request* user_data)>;
64

            
65
enum class IoUringResult { Ok, Busy, Failed };
66

            
67
/**
68
 * Abstract wrapper around `io_uring`.
69
 */
70
class IoUring {
71
public:
72
80
  virtual ~IoUring() = default;
73

            
74
  /**
75
   * Registers an eventfd file descriptor for the ring and returns it.
76
   * It can be used for integration with event loops.
77
   */
78
  virtual os_fd_t registerEventfd() PURE;
79

            
80
  /**
81
   * Resets the eventfd file descriptor for the ring.
82
   */
83
  virtual void unregisterEventfd() PURE;
84

            
85
  /**
86
   * Returns true if an eventfd file descriptor is registered with the ring.
87
   */
88
  virtual bool isEventfdRegistered() const PURE;
89

            
90
  /**
91
   * Iterates over entries in the completion queue, calls the given callback for
92
   * every entry and marks them consumed.
93
   */
94
  virtual void forEveryCompletion(const CompletionCb& completion_cb) PURE;
95

            
96
  /**
97
   * Prepares an accept system call and puts it into the submission queue.
98
   * Returns IoUringResult::Failed in case the submission queue is full already
99
   * and IoUringResult::Ok otherwise.
100
   */
101
  virtual IoUringResult prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
102
                                      socklen_t* remote_addr_len, Request* user_data) PURE;
103

            
104
  /**
105
   * Prepares a connect system call and puts it into the submission queue.
106
   * Returns IoUringResult::Failed in case the submission queue is full already
107
   * and IoUringResult::Ok otherwise.
108
   */
109
  virtual IoUringResult prepareConnect(os_fd_t fd,
110
                                       const Network::Address::InstanceConstSharedPtr& address,
111
                                       Request* user_data) PURE;
112

            
113
  /**
114
   * Prepares a readv system call and puts it into the submission queue.
115
   * Returns IoUringResult::Failed in case the submission queue is full already
116
   * and IoUringResult::Ok otherwise.
117
   */
118
  virtual IoUringResult prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
119
                                     off_t offset, Request* user_data) PURE;
120

            
121
  /**
122
   * Prepares a writev system call and puts it into the submission queue.
123
   * Returns IoUringResult::Failed in case the submission queue is full already
124
   * and IoUringResult::Ok otherwise.
125
   */
126
  virtual IoUringResult prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
127
                                      off_t offset, Request* user_data) PURE;
128

            
129
  /**
130
   * Prepares a close system call and puts it into the submission queue.
131
   * Returns IoUringResult::Failed in case the submission queue is full already
132
   * and IoUringResult::Ok otherwise.
133
   */
134
  virtual IoUringResult prepareClose(os_fd_t fd, Request* user_data) PURE;
135

            
136
  /**
137
   * Prepares a cancellation and puts it into the submission queue.
138
   * Returns IoUringResult::Failed in case the submission queue is full already
139
   * and IoUringResult::Ok otherwise.
140
   */
141
  virtual IoUringResult prepareCancel(Request* cancelling_user_data, Request* user_data) PURE;
142

            
143
  /**
144
   * Prepares a shutdown operation and puts it into the submission queue.
145
   * Returns IoUringResult::Failed in case the submission queue is full already
146
   * and IoUringResult::Ok otherwise.
147
   */
148
  virtual IoUringResult prepareShutdown(os_fd_t fd, int how, Request* user_data) PURE;
149

            
150
  /**
151
   * Submits the entries in the submission queue to the kernel using the
152
   * `io_uring_enter()` system call.
153
   * Returns IoUringResult::Ok in case of success and may return
154
   * IoUringResult::Busy if we over commit the number of requests. In the latter
155
   * case the application should drain the completion queue by handling some completions
156
   * with the forEveryCompletion() method and try again.
157
   */
158
  virtual IoUringResult submit() PURE;
159

            
160
  /**
161
   * Inject a request completion into the io_uring. Those completions will be iterated
162
   * when calling the `forEveryCompletion`. This is used to inject an emulated iouring
163
   * request completion by the upper-layer, then trigger the request completion processing.
164
   * it is used to emulate an activation of READ/WRITE/CLOSED event on the specific file
165
   * descriptor by the IoSocketHandle.
166
   * @param fd is the file descriptor of this completion refer to.
167
   * @param user_data is the user data related to this completion.
168
   * @param result is request result for this completion.
169
   */
170
  virtual void injectCompletion(os_fd_t fd, Request* user_data, int32_t result) PURE;
171

            
172
  /**
173
   * Remove all the injected completions for the specific file descriptor. This is used
174
   * to cleanup all the injected completions when a socket closed and remove from the iouring.
175
   * @param fd is used to refer to the completions will be removed.
176
   */
177
  virtual void removeInjectedCompletion(os_fd_t fd) PURE;
178
};
179

            
180
using IoUringPtr = std::unique_ptr<IoUring>;
181
class IoUringWorker;
182

            
183
/**
184
 * The Status of IoUringSocket.
185
 */
186
enum IoUringSocketStatus {
187
  Initialized,
188
  ReadEnabled,
189
  ReadDisabled,
190
  RemoteClosed,
191
  Closed,
192
};
193

            
194
/**
195
 * A callback will be invoked when a close requested done on the socket.
196
 */
197
using IoUringSocketOnClosedCb = std::function<void(Buffer::Instance& read_buffer)>;
198

            
199
/**
200
 * The data returned from the read request.
201
 */
202
struct ReadParam {
203
  Buffer::Instance& buf_;
204
  int32_t result_;
205
};
206

            
207
/**
208
 * The data returned from the write request.
209
 */
210
struct WriteParam {
211
  int32_t result_;
212
};
213

            
214
/**
215
 * Abstract for each socket.
216
 */
217
class IoUringSocket {
218
public:
219
79
  virtual ~IoUringSocket() = default;
220

            
221
  /**
222
   * Get the IoUringWorker this socket bind to.
223
   */
224
  virtual IoUringWorker& getIoUringWorker() const PURE;
225

            
226
  /**
227
   * Return the raw fd.
228
   */
229
  virtual os_fd_t fd() const PURE;
230

            
231
  /**
232
   * Close the socket.
233
   * @param keep_fd_open indicates the file descriptor of the socket will be closed or not in the
234
   * end. The value of `true` is used for destroy the IoUringSocket but keep the file descriptor
235
   * open. This is used for migrating the IoUringSocket between worker threads.
236
   * @param cb will be invoked when the close request is done. This is also used for migrating the
237
   * IoUringSocket between worker threads.
238
   */
239
  virtual void close(bool keep_fd_open, IoUringSocketOnClosedCb cb = nullptr) PURE;
240

            
241
  /**
242
   * Enable the read on the socket. The socket will be begin to submit the read request and deliver
243
   * read event when the request is done. This is used when the socket is listening on the file read
244
   * event.
245
   */
246
  virtual void enableRead() PURE;
247

            
248
  /**
249
   * Disable the read on the socket. The socket stops to submit the read request, although the
250
   * existing read request won't be canceled and no read event will be delivered. This is used when
251
   * the socket isn't listening on the file read event.
252
   */
253
  virtual void disableRead() PURE;
254

            
255
  /**
256
   * Enable close event. This is used for the case the socket is listening on the file close event.
257
   * Then a remote close is found by a read request will be delivered as a file close event.
258
   */
259
  virtual void enableCloseEvent(bool enable) PURE;
260

            
261
  /**
262
   * Connect to an address.
263
   * @param address the peer of address which is connected to.
264
   */
265
  virtual void connect(const Network::Address::InstanceConstSharedPtr& address) PURE;
266

            
267
  /**
268
   * Write data to the socket.
269
   * @param data is going to write.
270
   */
271
  virtual void write(Buffer::Instance& data) PURE;
272

            
273
  /**
274
   * Write data to the socket.
275
   * @param slices includes the data to write.
276
   * @param num_slice the number of slices.
277
   */
278
  virtual uint64_t write(const Buffer::RawSlice* slices, uint64_t num_slice) PURE;
279

            
280
  /**
281
   * Shutdown the socket.
282
   * @param how is SHUT_RD, SHUT_WR and SHUT_RDWR.
283
   */
284
  virtual void shutdown(int how) PURE;
285

            
286
  /**
287
   * On accept request completed.
288
   * TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
289
   * @param req the AcceptRequest object which is as request user data.
290
   * @param result the result of operation in the request.
291
   * @param injected indicates the completion is injected or not.
292
   */
293
  virtual void onAccept(Request* req, int32_t result, bool injected) PURE;
294

            
295
  /**
296
   * On connect request completed.
297
   * TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
298
   * @param req the request object which is as request user data.
299
   * @param result the result of operation in the request.
300
   * @param injected indicates the completion is injected or not.
301
   */
302
  virtual void onConnect(Request* req, int32_t result, bool injected) PURE;
303

            
304
  /**
305
   * On read request completed.
306
   * TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
307
   * @param req the ReadRequest object which is as request user data.
308
   * @param result the result of operation in the request.
309
   * @param injected indicates the completion is injected or not.
310
   */
311
  virtual void onRead(Request* req, int32_t result, bool injected) PURE;
312

            
313
  /**
314
   * On write request completed.
315
   * TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
316
   * @param req the WriteRequest object which is as request user data.
317
   * @param result the result of operation in the request.
318
   * @param injected indicates the completion is injected or not.
319
   */
320
  virtual void onWrite(Request* req, int32_t result, bool injected) PURE;
321

            
322
  /**
323
   * On close request completed.
324
   * TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
325
   * @param req the request object which is as request user data.
326
   * @param result the result of operation in the request.
327
   * @param injected indicates the completion is injected or not.
328
   */
329
  virtual void onClose(Request* req, int32_t result, bool injected) PURE;
330

            
331
  /**
332
   * On cancel request completed.
333
   * TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
334
   * @param req the request object which is as request user data.
335
   * @param result the result of operation in the request.
336
   * @param injected indicates the completion is injected or not.
337
   */
338
  virtual void onCancel(Request* req, int32_t result, bool injected) PURE;
339

            
340
  /**
341
   * On shutdown request completed.
342
   * TODO (soulxu): wrap the raw result into a type. It can be `IoCallUint64Result`.
343
   * @param req the request object which is as request user data.
344
   * @param result the result of operation in the request.
345
   * @param injected indicates the completion is injected or not.
346
   */
347
  virtual void onShutdown(Request* req, int32_t result, bool injected) PURE;
348

            
349
  /**
350
   * Inject a request completion to the io uring instance.
351
   * @param type the request type of injected completion.
352
   */
353
  virtual void injectCompletion(Request::RequestType type) PURE;
354

            
355
  /**
356
   * Return the current status of IoUringSocket.
357
   * @return the status.
358
   */
359
  virtual IoUringSocketStatus getStatus() const PURE;
360

            
361
  /**
362
   * Return the data get from the read request.
363
   * @return Only return valid ReadParam when the callback is invoked with
364
   * `Event::FileReadyType::Read`, otherwise `absl::nullopt` returned.
365
   */
366
  virtual const OptRef<ReadParam>& getReadParam() const PURE;
367
  /**
368
   * Return the data get from the write request.
369
   * @return Only return valid WriteParam when the callback is invoked with
370
   * `Event::FileReadyType::Write`, otherwise `absl::nullopt` returned.
371
   */
372
  virtual const OptRef<WriteParam>& getWriteParam() const PURE;
373

            
374
  /**
375
   * Set the callback when file ready event triggered.
376
   * @param cb the callback function.
377
   */
378
  virtual void setFileReadyCb(Event::FileReadyCb cb) PURE;
379
};
380

            
381
using IoUringSocketPtr = std::unique_ptr<IoUringSocket>;
382

            
383
/**
384
 * Abstract for per-thread worker.
385
 */
386
class IoUringWorker : public ThreadLocal::ThreadLocalObject {
387
public:
388
75
  ~IoUringWorker() override = default;
389

            
390
  /**
391
   * Add a server socket to the worker.
392
   */
393
  virtual IoUringSocket& addServerSocket(os_fd_t fd, Event::FileReadyCb cb,
394
                                         bool enable_close_event) PURE;
395

            
396
  /**
397
   * Add a server socket from an existing socket from another thread.
398
   */
399
  virtual IoUringSocket& addServerSocket(os_fd_t fd, Buffer::Instance& read_buf,
400
                                         Event::FileReadyCb cb, bool enable_close_event) PURE;
401

            
402
  /**
403
   * Add a client socket to the worker.
404
   */
405
  virtual IoUringSocket& addClientSocket(os_fd_t fd, Event::FileReadyCb cb,
406
                                         bool enable_close_event) PURE;
407

            
408
  /**
409
   * Return the current thread's dispatcher.
410
   */
411
  virtual Event::Dispatcher& dispatcher() PURE;
412

            
413
  /**
414
   * Submit a connect request for a socket.
415
   */
416
  virtual Request*
417
  submitConnectRequest(IoUringSocket& socket,
418
                       const Network::Address::InstanceConstSharedPtr& address) PURE;
419

            
420
  /**
421
   * Submit a read request for a socket.
422
   */
423
  virtual Request* submitReadRequest(IoUringSocket& socket) PURE;
424

            
425
  /**
426
   * Submit a write request for a socket.
427
   */
428
  virtual Request* submitWriteRequest(IoUringSocket& socket,
429
                                      const Buffer::RawSliceVector& slices) PURE;
430

            
431
  /**
432
   * Submit a close request for a socket.
433
   */
434
  virtual Request* submitCloseRequest(IoUringSocket& socket) PURE;
435

            
436
  /**
437
   * Submit a cancel request for a socket.
438
   */
439
  virtual Request* submitCancelRequest(IoUringSocket& socket, Request* request_to_cancel) PURE;
440

            
441
  /**
442
   * Submit a shutdown request for a socket.
443
   */
444
  virtual Request* submitShutdownRequest(IoUringSocket& socket, int how) PURE;
445

            
446
  /**
447
   * Return the number of sockets in the worker.
448
   */
449
  virtual uint32_t getNumOfSockets() const PURE;
450
};
451

            
452
/**
453
 * Abstract factory for IoUringWorker wrappers.
454
 */
455
class IoUringWorkerFactory {
456
public:
457
36
  virtual ~IoUringWorkerFactory() = default;
458

            
459
  /**
460
   * Returns the current thread's IoUringWorker. If the thread have not registered a IoUringWorker,
461
   * an absl::nullopt will be returned.
462
   */
463
  virtual OptRef<IoUringWorker> getIoUringWorker() PURE;
464

            
465
  /**
466
   * Initializes a IoUringWorkerFactory upon server readiness. The method is used to set the TLS.
467
   */
468
  virtual void onWorkerThreadInitialized() PURE;
469

            
470
  /**
471
   * Indicates whether the current thread has been registered for a IoUringWorker.
472
   */
473
  virtual bool currentThreadRegistered() PURE;
474
};
475

            
476
} // namespace Io
477
} // namespace Envoy