1
#include "source/common/io/io_uring_impl.h"
2

            
3
#include <sys/eventfd.h>
4

            
5
namespace Envoy {
6
namespace Io {
7

            
8
65
bool isIoUringSupported() {
9
65
  struct io_uring_params p {};
10
65
  struct io_uring ring;
11

            
12
65
  bool is_supported = io_uring_queue_init_params(2, &ring, &p) == 0;
13
65
  if (is_supported) {
14
65
    io_uring_queue_exit(&ring);
15
65
  }
16

            
17
65
  return is_supported;
18
65
}
19

            
20
IoUringImpl::IoUringImpl(uint32_t io_uring_size, bool use_submission_queue_polling)
21
66
    : cqes_(io_uring_size, nullptr) {
22
66
  struct io_uring_params p {};
23
66
  if (use_submission_queue_polling) {
24
    p.flags |= IORING_SETUP_SQPOLL;
25
  }
26
  // TODO (soulxu): According to the man page: `By default, the CQ ring will have twice the number
27
  // of entries as specified by entries for the SQ ring`. But currently we only use the same size
28
  // with SQ ring. We will figure out better handle of entries number in the future.
29
66
  int ret = io_uring_queue_init_params(io_uring_size, &ring_, &p);
30
66
  RELEASE_ASSERT(ret == 0, fmt::format("unable to initialize io_uring: {}", errorDetails(-ret)));
31
66
}
32

            
33
66
IoUringImpl::~IoUringImpl() { io_uring_queue_exit(&ring_); }
34

            
35
66
os_fd_t IoUringImpl::registerEventfd() {
36
66
  ASSERT(!isEventfdRegistered());
37
  // Mark the eventfd as non-blocking. since after injected completion is added. the eventfd
38
  // will be activated to trigger the event callback. For the case of only injected completion
39
  // is added and no actual iouring event. Then non-blocking can avoid the reading of eventfd
40
  // blocking.
41
66
  event_fd_ = eventfd(0, EFD_NONBLOCK);
42
66
  int res = io_uring_register_eventfd(&ring_, event_fd_);
43
66
  RELEASE_ASSERT(res == 0, fmt::format("unable to register eventfd: {}", errorDetails(-res)));
44
66
  return event_fd_;
45
66
}
46

            
47
13
void IoUringImpl::unregisterEventfd() {
48
13
  ASSERT(isEventfdRegistered());
49
13
  int res = io_uring_unregister_eventfd(&ring_);
50
13
  RELEASE_ASSERT(res == 0, fmt::format("unable to unregister eventfd: {}", errorDetails(-res)));
51
13
  SET_SOCKET_INVALID(event_fd_);
52
13
}
53

            
54
16
bool IoUringImpl::isEventfdRegistered() const { return SOCKET_VALID(event_fd_); }
55

            
56
177
void IoUringImpl::forEveryCompletion(const CompletionCb& completion_cb) {
57
177
  ASSERT(SOCKET_VALID(event_fd_));
58

            
59
177
  eventfd_t v;
60
323
  while (true) {
61
323
    int ret = eventfd_read(event_fd_, &v);
62
323
    if (ret != 0) {
63
177
      ASSERT(errno == EAGAIN);
64
177
      break;
65
177
    }
66
323
  }
67

            
68
177
  unsigned count = io_uring_peek_batch_cqe(&ring_, cqes_.data(), cqes_.size());
69

            
70
365
  for (unsigned i = 0; i < count; ++i) {
71
188
    struct io_uring_cqe* cqe = cqes_[i];
72
188
    completion_cb(reinterpret_cast<Request*>(cqe->user_data), cqe->res, false);
73
188
  }
74

            
75
177
  io_uring_cq_advance(&ring_, count);
76

            
77
177
  ENVOY_LOG(trace, "the num of injected completion is {}", injected_completions_.size());
78
  // TODO(soulxu): Add bound here to avoid too many completion to stuck the thread too
79
  // long.
80
  // Iterate the injected completion.
81
218
  while (!injected_completions_.empty()) {
82
42
    auto& completion = injected_completions_.front();
83
42
    completion_cb(completion.user_data_, completion.result_, true);
84
    // The socket may closed in the completion_cb and all the related completions are
85
    // removed.
86
42
    if (injected_completions_.empty()) {
87
1
      break;
88
1
    }
89
41
    injected_completions_.pop_front();
90
41
  }
91
177
}
92

            
93
IoUringResult IoUringImpl::prepareAccept(os_fd_t fd, struct sockaddr* remote_addr,
94
3
                                         socklen_t* remote_addr_len, Request* user_data) {
95
3
  ENVOY_LOG(trace, "prepare close for fd = {}", fd);
96
  // TODO (soulxu): Handling the case of CQ ring is overflow.
97
3
  ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
98
3
  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
99
3
  if (sqe == nullptr) {
100
1
    return IoUringResult::Failed;
101
1
  }
102

            
103
2
  io_uring_prep_accept(sqe, fd, remote_addr, remote_addr_len, 0);
104
2
  io_uring_sqe_set_data(sqe, user_data);
105
2
  return IoUringResult::Ok;
106
3
}
107

            
108
IoUringResult IoUringImpl::prepareConnect(os_fd_t fd,
109
                                          const Network::Address::InstanceConstSharedPtr& address,
110
25
                                          Request* user_data) {
111
25
  ENVOY_LOG(trace, "prepare connect for fd = {}", fd);
112
  // TODO (soulxu): Handling the case of CQ ring is overflow.
113
25
  ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
114
25
  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
115
25
  if (sqe == nullptr) {
116
1
    return IoUringResult::Failed;
117
1
  }
118

            
119
24
  io_uring_prep_connect(sqe, fd, address->sockAddr(), address->sockAddrLen());
120
24
  io_uring_sqe_set_data(sqe, user_data);
121
24
  return IoUringResult::Ok;
122
25
}
123

            
124
IoUringResult IoUringImpl::prepareReadv(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
125
69
                                        off_t offset, Request* user_data) {
126
69
  ENVOY_LOG(trace, "prepare readv for fd = {}", fd);
127
  // TODO (soulxu): Handling the case of CQ ring is overflow.
128
69
  ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
129
69
  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
130
69
  if (sqe == nullptr) {
131
2
    return IoUringResult::Failed;
132
2
  }
133

            
134
67
  io_uring_prep_readv(sqe, fd, iovecs, nr_vecs, offset);
135
67
  io_uring_sqe_set_data(sqe, user_data);
136
67
  return IoUringResult::Ok;
137
69
}
138

            
139
IoUringResult IoUringImpl::prepareWritev(os_fd_t fd, const struct iovec* iovecs, unsigned nr_vecs,
140
14
                                         off_t offset, Request* user_data) {
141
14
  ENVOY_LOG(trace, "prepare writev for fd = {}", fd);
142
  // TODO (soulxu): Handling the case of CQ ring is overflow.
143
14
  ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
144
14
  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
145
14
  if (sqe == nullptr) {
146
1
    return IoUringResult::Failed;
147
1
  }
148

            
149
13
  io_uring_prep_writev(sqe, fd, iovecs, nr_vecs, offset);
150
13
  io_uring_sqe_set_data(sqe, user_data);
151
13
  return IoUringResult::Ok;
152
14
}
153

            
154
46
IoUringResult IoUringImpl::prepareClose(os_fd_t fd, Request* user_data) {
155
46
  ENVOY_LOG(trace, "prepare close for fd = {}", fd);
156
  // TODO (soulxu): Handling the case of CQ ring is overflow.
157
46
  ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
158
46
  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
159
46
  if (sqe == nullptr) {
160
1
    return IoUringResult::Failed;
161
1
  }
162

            
163
45
  io_uring_prep_close(sqe, fd);
164
45
  io_uring_sqe_set_data(sqe, user_data);
165
45
  return IoUringResult::Ok;
166
46
}
167

            
168
31
IoUringResult IoUringImpl::prepareCancel(Request* cancelling_user_data, Request* user_data) {
169
31
  ENVOY_LOG(trace, "prepare cancels for user data = {}", fmt::ptr(cancelling_user_data));
170
  // TODO (soulxu): Handling the case of CQ ring is overflow.
171
31
  ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
172
31
  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
173
31
  if (sqe == nullptr) {
174
    ENVOY_LOG(trace, "failed to prepare cancel for user data = {}", fmt::ptr(cancelling_user_data));
175
    return IoUringResult::Failed;
176
  }
177

            
178
31
  io_uring_prep_cancel(sqe, cancelling_user_data, 0);
179
31
  io_uring_sqe_set_data(sqe, user_data);
180
31
  return IoUringResult::Ok;
181
31
}
182

            
183
7
IoUringResult IoUringImpl::prepareShutdown(os_fd_t fd, int how, Request* user_data) {
184
7
  ENVOY_LOG(trace, "prepare shutdown for fd = {}, how = {}", fd, how);
185
  // TODO (soulxu): Handling the case of CQ ring is overflow.
186
7
  ASSERT(!(*(ring_.sq.kflags) & IORING_SQ_CQ_OVERFLOW));
187
7
  struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_);
188
7
  if (sqe == nullptr) {
189
1
    ENVOY_LOG(trace, "failed to prepare shutdown for fd = {}", fd);
190
1
    return IoUringResult::Failed;
191
1
  }
192

            
193
6
  io_uring_prep_shutdown(sqe, fd, how);
194
6
  io_uring_sqe_set_data(sqe, user_data);
195
6
  return IoUringResult::Ok;
196
7
}
197

            
198
272
IoUringResult IoUringImpl::submit() {
199
272
  int res = io_uring_submit(&ring_);
200
272
  RELEASE_ASSERT(res >= 0 || res == -EBUSY, "unable to submit io_uring queue entries");
201
272
  return res == -EBUSY ? IoUringResult::Busy : IoUringResult::Ok;
202
272
}
203

            
204
43
void IoUringImpl::injectCompletion(os_fd_t fd, Request* user_data, int32_t result) {
205
43
  injected_completions_.emplace_back(fd, user_data, result);
206
43
  ENVOY_LOG(trace, "inject completion, fd = {}, req = {}, num injects = {}", fd,
207
43
            fmt::ptr(user_data), injected_completions_.size());
208
43
}
209

            
210
49
void IoUringImpl::removeInjectedCompletion(os_fd_t fd) {
211
49
  ENVOY_LOG(trace, "remove injected completions for fd = {}, size = {}", fd,
212
49
            injected_completions_.size());
213
50
  injected_completions_.remove_if([fd](InjectedCompletion& completion) {
214
3
    if (fd == completion.fd_) {
215
      // Release the user data before remove this completion.
216
2
      delete reinterpret_cast<Request*>(completion.user_data_);
217
2
    }
218
3
    return fd == completion.fd_;
219
3
  });
220
49
}
221

            
222
} // namespace Io
223
} // namespace Envoy