1
#include "source/common/io/io_uring_worker_impl.h"
2

            
3
namespace Envoy {
4
namespace Io {
5

            
6
ReadRequest::ReadRequest(IoUringSocket& socket, uint32_t size)
7
69
    : Request(RequestType::Read, socket), buf_(std::make_unique<uint8_t[]>(size)),
8
69
      iov_(std::make_unique<struct iovec>()) {
9
69
  iov_->iov_base = buf_.get();
10
69
  iov_->iov_len = size;
11
69
}
12

            
13
WriteRequest::WriteRequest(IoUringSocket& socket, const Buffer::RawSliceVector& slices)
14
14
    : Request(RequestType::Write, socket), iov_(std::make_unique<struct iovec[]>(slices.size())) {
15
27
  for (size_t i = 0; i < slices.size(); i++) {
16
13
    iov_[i].iov_base = slices[i].mem_;
17
13
    iov_[i].iov_len = slices[i].len_;
18
13
  }
19
14
}
20

            
21
IoUringSocketEntry::IoUringSocketEntry(os_fd_t fd, IoUringWorkerImpl& parent, Event::FileReadyCb cb,
22
                                       bool enable_close_event)
23
61
    : fd_(fd), parent_(parent), enable_close_event_(enable_close_event), cb_(std::move(cb)) {}
24

            
25
55
void IoUringSocketEntry::cleanup() {
26
55
  IoUringSocketEntryPtr socket = parent_.removeSocket(*this);
27
55
  parent_.dispatcher().deferredDelete(std::move(socket));
28
55
}
29

            
30
15
void IoUringSocketEntry::injectCompletion(Request::RequestType type) {
31
  // Avoid injecting the same completion type multiple times.
32
15
  if (injected_completions_ & static_cast<uint8_t>(type)) {
33
1
    ENVOY_LOG(trace,
34
1
              "ignore injected completion since there already has one, injected_completions_: {}, "
35
1
              "type: {}",
36
1
              injected_completions_, static_cast<uint8_t>(type));
37
1
    return;
38
1
  }
39
14
  injected_completions_ |= static_cast<uint8_t>(type);
40
14
  parent_.injectCompletion(*this, type, -EAGAIN);
41
14
}
42

            
43
26
void IoUringSocketEntry::onReadCompleted() {
44
26
  ENVOY_LOG(trace,
45
26
            "calling event callback since pending read buf has {} size data, data = {}, "
46
26
            "fd = {}",
47
26
            getReadParam()->buf_.length(), getReadParam()->buf_.toString(), fd_);
48
26
  THROW_IF_NOT_OK(cb_(Event::FileReadyType::Read));
49
26
}
50

            
51
28
void IoUringSocketEntry::onWriteCompleted() {
52
28
  ENVOY_LOG(trace, "call event callback for write since result = {}", getWriteParam()->result_);
53
28
  THROW_IF_NOT_OK(cb_(Event::FileReadyType::Write));
54
28
}
55

            
56
5
void IoUringSocketEntry::onRemoteClose() {
57
5
  ENVOY_LOG(trace, "onRemoteClose fd = {}", fd_);
58
5
  THROW_IF_NOT_OK(cb_(Event::FileReadyType::Closed));
59
5
}
60

            
61
IoUringWorkerImpl::IoUringWorkerImpl(uint32_t io_uring_size, bool use_submission_queue_polling,
62
                                     uint32_t read_buffer_size, uint32_t write_timeout_ms,
63
                                     Event::Dispatcher& dispatcher)
64
30
    : IoUringWorkerImpl(std::make_unique<IoUringImpl>(io_uring_size, use_submission_queue_polling),
65
30
                        read_buffer_size, write_timeout_ms, dispatcher) {}
66

            
67
IoUringWorkerImpl::IoUringWorkerImpl(IoUringPtr&& io_uring, uint32_t read_buffer_size,
68
                                     uint32_t write_timeout_ms, Event::Dispatcher& dispatcher)
69
67
    : io_uring_(std::move(io_uring)), read_buffer_size_(read_buffer_size),
70
67
      write_timeout_ms_(write_timeout_ms), dispatcher_(dispatcher) {
71
67
  const os_fd_t event_fd = io_uring_->registerEventfd();
72
  // We only care about the read event of Eventfd, since we only receive the
73
  // event here.
74
67
  file_event_ = dispatcher_.createFileEvent(
75
67
      event_fd,
76
168
      [this](uint32_t) {
77
163
        onFileEvent();
78
163
        return absl::OkStatus();
79
163
      },
80
67
      Event::PlatformDefaultTriggerType, Event::FileReadyType::Read);
81
67
}
82

            
83
67
IoUringWorkerImpl::~IoUringWorkerImpl() {
84
67
  ENVOY_LOG(trace, "destruct io uring worker, existing sockets = {}", sockets_.size());
85

            
86
67
  for (auto& socket : sockets_) {
87
9
    if (socket->getStatus() != Closed) {
88
6
      socket->close(false);
89
6
    }
90
9
  }
91

            
92
79
  while (!sockets_.empty()) {
93
12
    ENVOY_LOG(trace, "still left {} sockets are not closed", sockets_.size());
94
12
    for (auto& socket : sockets_) {
95
12
      ENVOY_LOG(trace, "the socket fd = {} not closed", socket->fd());
96
12
    }
97
12
    onFileEvent();
98
12
  }
99

            
100
67
  dispatcher_.clearDeferredDeleteList();
101
67
}
102

            
103
IoUringSocket& IoUringWorkerImpl::addServerSocket(os_fd_t fd, Event::FileReadyCb cb,
104
26
                                                  bool enable_close_event) {
105
26
  ENVOY_LOG(trace, "add server socket, fd = {}", fd);
106
26
  std::unique_ptr<IoUringServerSocket> socket = std::make_unique<IoUringServerSocket>(
107
26
      fd, *this, std::move(cb), write_timeout_ms_, enable_close_event);
108
26
  socket->enableRead();
109
26
  return addSocket(std::move(socket));
110
26
}
111

            
112
IoUringSocket& IoUringWorkerImpl::addServerSocket(os_fd_t fd, Buffer::Instance& read_buf,
113
2
                                                  Event::FileReadyCb cb, bool enable_close_event) {
114
2
  ENVOY_LOG(trace, "add server socket through existing socket, fd = {}", fd);
115
2
  std::unique_ptr<IoUringServerSocket> socket = std::make_unique<IoUringServerSocket>(
116
2
      fd, read_buf, *this, std::move(cb), write_timeout_ms_, enable_close_event);
117
2
  socket->enableRead();
118
2
  return addSocket(std::move(socket));
119
2
}
120

            
121
IoUringSocket& IoUringWorkerImpl::addClientSocket(os_fd_t fd, Event::FileReadyCb cb,
122
22
                                                  bool enable_close_event) {
123
22
  ENVOY_LOG(trace, "add client socket, fd = {}", fd);
124
  // The client socket should not be read enabled until it is connected.
125
22
  std::unique_ptr<IoUringClientSocket> socket = std::make_unique<IoUringClientSocket>(
126
22
      fd, *this, std::move(cb), write_timeout_ms_, enable_close_event);
127
22
  return addSocket(std::move(socket));
128
22
}
129

            
130
94
Event::Dispatcher& IoUringWorkerImpl::dispatcher() { return dispatcher_; }
131

            
132
55
IoUringSocketEntry& IoUringWorkerImpl::addSocket(IoUringSocketEntryPtr&& socket) {
133
55
  LinkedList::moveIntoListBack(std::move(socket), sockets_);
134
55
  return *sockets_.back();
135
55
}
136

            
137
Request*
138
IoUringWorkerImpl::submitConnectRequest(IoUringSocket& socket,
139
24
                                        const Network::Address::InstanceConstSharedPtr& address) {
140
24
  Request* req = new Request(Request::RequestType::Connect, socket);
141

            
142
24
  ENVOY_LOG(trace, "submit connect request, fd = {}, req = {}", socket.fd(), fmt::ptr(req));
143

            
144
24
  auto res = io_uring_->prepareConnect(socket.fd(), address, req);
145
24
  if (res == IoUringResult::Failed) {
146
    // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
147
    submit();
148
    res = io_uring_->prepareConnect(socket.fd(), address, req);
149
    RELEASE_ASSERT(res == IoUringResult::Ok, "unable to prepare connect");
150
  }
151
24
  submit();
152
24
  return req;
153
24
}
154

            
155
69
Request* IoUringWorkerImpl::submitReadRequest(IoUringSocket& socket) {
156
69
  ReadRequest* req = new ReadRequest(socket, read_buffer_size_);
157

            
158
69
  ENVOY_LOG(trace, "submit read request, fd = {}, read req = {}", socket.fd(), fmt::ptr(req));
159

            
160
69
  auto res = io_uring_->prepareReadv(socket.fd(), req->iov_.get(), 1, 0, req);
161
69
  if (res == IoUringResult::Failed) {
162
    // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
163
1
    submit();
164
1
    res = io_uring_->prepareReadv(socket.fd(), req->iov_.get(), 1, 0, req);
165
1
    RELEASE_ASSERT(res == IoUringResult::Ok, "unable to prepare readv");
166
1
  }
167
69
  submit();
168
69
  return req;
169
69
}
170

            
171
Request* IoUringWorkerImpl::submitWriteRequest(IoUringSocket& socket,
172
14
                                               const Buffer::RawSliceVector& slices) {
173
14
  WriteRequest* req = new WriteRequest(socket, slices);
174

            
175
14
  ENVOY_LOG(trace, "submit write request, fd = {}, req = {}", socket.fd(), fmt::ptr(req));
176

            
177
14
  auto res = io_uring_->prepareWritev(socket.fd(), req->iov_.get(), slices.size(), 0, req);
178
14
  if (res == IoUringResult::Failed) {
179
    // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
180
1
    submit();
181
1
    res = io_uring_->prepareWritev(socket.fd(), req->iov_.get(), slices.size(), 0, req);
182
1
    RELEASE_ASSERT(res == IoUringResult::Ok, "unable to prepare writev");
183
1
  }
184
14
  submit();
185
14
  return req;
186
14
}
187

            
188
50
Request* IoUringWorkerImpl::submitCloseRequest(IoUringSocket& socket) {
189
50
  Request* req = new Request(Request::RequestType::Close, socket);
190

            
191
50
  ENVOY_LOG(trace, "submit close request, fd = {}, close req = {}", socket.fd(), fmt::ptr(req));
192

            
193
50
  auto res = io_uring_->prepareClose(socket.fd(), req);
194
50
  if (res == IoUringResult::Failed) {
195
    // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
196
1
    submit();
197
1
    res = io_uring_->prepareClose(socket.fd(), req);
198
1
    RELEASE_ASSERT(res == IoUringResult::Ok, "unable to prepare close");
199
1
  }
200
50
  submit();
201
50
  return req;
202
50
}
203

            
204
37
Request* IoUringWorkerImpl::submitCancelRequest(IoUringSocket& socket, Request* request_to_cancel) {
205
37
  Request* req = new Request(Request::RequestType::Cancel, socket);
206

            
207
37
  ENVOY_LOG(trace, "submit cancel request, fd = {}, cancel req = {}, req to cancel = {}",
208
37
            socket.fd(), fmt::ptr(req), fmt::ptr(request_to_cancel));
209

            
210
37
  auto res = io_uring_->prepareCancel(request_to_cancel, req);
211
37
  if (res == IoUringResult::Failed) {
212
    // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
213
1
    submit();
214
1
    res = io_uring_->prepareCancel(request_to_cancel, req);
215
1
    RELEASE_ASSERT(res == IoUringResult::Ok, "unable to prepare cancel");
216
1
  }
217
37
  submit();
218
37
  return req;
219
37
}
220

            
221
7
Request* IoUringWorkerImpl::submitShutdownRequest(IoUringSocket& socket, int how) {
222
7
  Request* req = new Request(Request::RequestType::Shutdown, socket);
223

            
224
7
  ENVOY_LOG(trace, "submit shutdown request, fd = {}, shutdown req = {}", socket.fd(),
225
7
            fmt::ptr(req));
226

            
227
7
  auto res = io_uring_->prepareShutdown(socket.fd(), how, req);
228
7
  if (res == IoUringResult::Failed) {
229
    // TODO(rojkov): handle `EBUSY` in case the completion queue is never reaped.
230
1
    submit();
231
1
    res = io_uring_->prepareShutdown(socket.fd(), how, req);
232
1
    RELEASE_ASSERT(res == IoUringResult::Ok, "unable to prepare cancel");
233
1
  }
234
7
  submit();
235
7
  return req;
236
7
}
237

            
238
55
IoUringSocketEntryPtr IoUringWorkerImpl::removeSocket(IoUringSocketEntry& socket) {
239
  // Remove all the injection completion for this socket.
240
55
  io_uring_->removeInjectedCompletion(socket.fd());
241
55
  return socket.removeFromList(sockets_);
242
55
}
243

            
244
void IoUringWorkerImpl::injectCompletion(IoUringSocket& socket, Request::RequestType type,
245
37
                                         int32_t result) {
246
37
  Request* req = new Request(type, socket);
247
37
  io_uring_->injectCompletion(socket.fd(), req, result);
248
37
  file_event_->activate(Event::FileReadyType::Read);
249
37
}
250

            
251
175
void IoUringWorkerImpl::onFileEvent() {
252
175
  ENVOY_LOG(trace, "io uring worker, on file event");
253
175
  delay_submit_ = true;
254
223
  io_uring_->forEveryCompletion([](Request* req, int32_t result, bool injected) {
255
223
    ENVOY_LOG(trace, "receive request completion, type = {}, req = {}",
256
223
              static_cast<uint8_t>(req->type()), fmt::ptr(req));
257
223
    ASSERT(req != nullptr);
258

            
259
223
    switch (req->type()) {
260
1
    case Request::RequestType::Accept:
261
1
      ENVOY_LOG(trace, "receive accept request completion, fd = {}, req = {}", req->socket().fd(),
262
1
                fmt::ptr(req));
263
1
      req->socket().onAccept(req, result, injected);
264
1
      break;
265
23
    case Request::RequestType::Connect:
266
23
      ENVOY_LOG(trace, "receive connect request completion, fd = {}, req = {}", req->socket().fd(),
267
23
                fmt::ptr(req));
268
23
      req->socket().onConnect(req, result, injected);
269
23
      break;
270
71
    case Request::RequestType::Read:
271
71
      ENVOY_LOG(trace, "receive Read request completion, fd = {}, req = {}", req->socket().fd(),
272
71
                fmt::ptr(req));
273
71
      req->socket().onRead(req, result, injected);
274
71
      break;
275
40
    case Request::RequestType::Write:
276
40
      ENVOY_LOG(trace, "receive write request completion, fd = {}, req = {}", req->socket().fd(),
277
40
                fmt::ptr(req));
278
40
      req->socket().onWrite(req, result, injected);
279
40
      break;
280
48
    case Request::RequestType::Close:
281
48
      ENVOY_LOG(trace, "receive close request completion, fd = {}, req = {}", req->socket().fd(),
282
48
                fmt::ptr(req));
283
48
      req->socket().onClose(req, result, injected);
284
48
      break;
285
35
    case Request::RequestType::Cancel:
286
35
      ENVOY_LOG(trace, "receive cancel request completion, fd = {}, req = {}", req->socket().fd(),
287
35
                fmt::ptr(req));
288
35
      req->socket().onCancel(req, result, injected);
289
35
      break;
290
5
    case Request::RequestType::Shutdown:
291
5
      ENVOY_LOG(trace, "receive shutdown request completion, fd = {}, req = {}", req->socket().fd(),
292
5
                fmt::ptr(req));
293
5
      req->socket().onShutdown(req, result, injected);
294
5
      break;
295
223
    }
296

            
297
223
    delete req;
298
223
  });
299
175
  delay_submit_ = false;
300
175
  submit();
301
175
}
302

            
303
383
void IoUringWorkerImpl::submit() {
304
383
  if (!delay_submit_) {
305
298
    io_uring_->submit();
306
298
  }
307
383
}
308

            
309
IoUringServerSocket::IoUringServerSocket(os_fd_t fd, IoUringWorkerImpl& parent,
310
                                         Event::FileReadyCb cb, uint32_t write_timeout_ms,
311
                                         bool enable_close_event)
312
54
    : IoUringSocketEntry(fd, parent, std::move(cb), enable_close_event),
313
54
      write_timeout_ms_(write_timeout_ms) {}
314

            
315
IoUringServerSocket::IoUringServerSocket(os_fd_t fd, Buffer::Instance& read_buf,
316
                                         IoUringWorkerImpl& parent, Event::FileReadyCb cb,
317
                                         uint32_t write_timeout_ms, bool enable_close_event)
318
2
    : IoUringSocketEntry(fd, parent, std::move(cb), enable_close_event),
319
2
      write_timeout_ms_(write_timeout_ms) {
320
2
  read_buf_.move(read_buf);
321
2
}
322

            
323
56
IoUringServerSocket::~IoUringServerSocket() {
324
56
  if (write_timeout_timer_) {
325
8
    write_timeout_timer_->disableTimer();
326
8
  }
327
56
}
328

            
329
54
void IoUringServerSocket::close(bool keep_fd_open, IoUringSocketOnClosedCb cb) {
330
54
  ENVOY_LOG(trace, "close the socket, fd = {}, status = {}", fd_, static_cast<int>(status_));
331

            
332
54
  IoUringSocketEntry::close(keep_fd_open, cb);
333
54
  keep_fd_open_ = keep_fd_open;
334

            
335
  // Delay close until read request and write (or shutdown) request are drained.
336
54
  if (read_req_ == nullptr && write_or_shutdown_req_ == nullptr) {
337
17
    closeInternal();
338
17
    return;
339
17
  }
340

            
341
37
  if (read_req_ != nullptr) {
342
35
    ENVOY_LOG(trace, "cancel the read request, fd = {}", fd_);
343
35
    read_cancel_req_ = parent_.submitCancelRequest(*this, read_req_);
344
35
  }
345

            
346
37
  if (write_or_shutdown_req_ != nullptr) {
347
9
    ENVOY_LOG(trace, "delay cancel the write request, fd = {}", fd_);
348
9
    if (write_timeout_ms_ > 0) {
349
8
      write_timeout_timer_ = parent_.dispatcher().createTimer([this]() {
350
1
        if (write_or_shutdown_req_ != nullptr) {
351
1
          ENVOY_LOG(trace, "cancel the write or shutdown request, fd = {}", fd_);
352
1
          write_or_shutdown_cancel_req_ =
353
1
              parent_.submitCancelRequest(*this, write_or_shutdown_req_);
354
1
        }
355
1
      });
356
8
      write_timeout_timer_->enableTimer(std::chrono::milliseconds(write_timeout_ms_));
357
8
    }
358
9
  }
359
37
}
360

            
361
65
void IoUringServerSocket::enableRead() {
362
65
  IoUringSocketEntry::enableRead();
363
65
  ENVOY_LOG(trace, "enable read, fd = {}", fd_);
364

            
365
  // Continue processing read buffer remained by the previous read.
366
65
  if (read_buf_.length() > 0 || read_error_.has_value()) {
367
3
    ENVOY_LOG(trace, "continue reading from socket, fd = {}, size = {}", fd_, read_buf_.length());
368
3
    injectCompletion(Request::RequestType::Read);
369
3
    return;
370
3
  }
371

            
372
62
  submitReadRequest();
373
62
}
374

            
375
10
void IoUringServerSocket::disableRead() { IoUringSocketEntry::disableRead(); }
376

            
377
7
void IoUringServerSocket::write(Buffer::Instance& data) {
378
7
  ENVOY_LOG(trace, "write, buffer size = {}, fd = {}", data.length(), fd_);
379
7
  ASSERT(!shutdown_.has_value());
380

            
381
  // We need to reset the drain trackers, since the write and close is async in
382
  // the iouring. When the write is actually finished the above layer may already
383
  // release the drain trackers.
384
7
  write_buf_.move(data, data.length(), true);
385

            
386
7
  submitWriteOrShutdownRequest();
387
7
}
388

            
389
7
uint64_t IoUringServerSocket::write(const Buffer::RawSlice* slices, uint64_t num_slice) {
390
7
  ENVOY_LOG(trace, "write, num_slices = {}, fd = {}", num_slice, fd_);
391
7
  ASSERT(!shutdown_.has_value());
392

            
393
7
  uint64_t bytes_written = 0;
394
14
  for (uint64_t i = 0; i < num_slice; i++) {
395
7
    write_buf_.add(slices[i].mem_, slices[i].len_);
396
7
    bytes_written += slices[i].len_;
397
7
  }
398

            
399
7
  submitWriteOrShutdownRequest();
400
7
  return bytes_written;
401
7
}
402

            
403
6
void IoUringServerSocket::shutdown(int how) {
404
6
  ENVOY_LOG(trace, "shutdown the socket, fd = {}, how = {}", fd_, how);
405
6
  ASSERT(how == SHUT_WR);
406
6
  shutdown_ = false;
407
6
  submitWriteOrShutdownRequest();
408
6
}
409

            
410
47
void IoUringServerSocket::onClose(Request* req, int32_t result, bool injected) {
411
47
  IoUringSocketEntry::onClose(req, result, injected);
412
47
  ASSERT(!injected);
413
47
  cleanup();
414
47
}
415

            
416
36
void IoUringServerSocket::onCancel(Request* req, int32_t result, bool injected) {
417
36
  IoUringSocketEntry::onCancel(req, result, injected);
418
36
  ASSERT(!injected);
419
36
  if (read_cancel_req_ == req) {
420
35
    read_cancel_req_ = nullptr;
421
35
  }
422
36
  if (write_or_shutdown_cancel_req_ == req) {
423
1
    write_or_shutdown_cancel_req_ = nullptr;
424
1
  }
425
36
  if (status_ == Closed && write_or_shutdown_req_ == nullptr && read_req_ == nullptr &&
426
36
      write_or_shutdown_cancel_req_ == nullptr) {
427
16
    closeInternal();
428
16
  }
429
36
}
430

            
431
20
void IoUringServerSocket::moveReadDataToBuffer(Request* req, size_t data_length) {
432
20
  ReadRequest* read_req = static_cast<ReadRequest*>(req);
433
20
  Buffer::BufferFragment* fragment = new Buffer::BufferFragmentImpl(
434
20
      read_req->buf_.release(), data_length,
435
20
      [](const void* data, size_t, const Buffer::BufferFragmentImpl* this_fragment) {
436
20
        delete[] reinterpret_cast<const uint8_t*>(data);
437
20
        delete this_fragment;
438
20
      });
439
20
  read_buf_.addBufferFragment(*fragment);
440
20
}
441

            
442
26
void IoUringServerSocket::onReadCompleted(int32_t result) {
443
26
  ENVOY_LOG(trace, "read from socket, fd = {}, result = {}", fd_, result);
444
26
  ReadParam param{read_buf_, result};
445
26
  read_param_ = param;
446
26
  IoUringSocketEntry::onReadCompleted();
447
26
  read_param_ = absl::nullopt;
448
26
  ENVOY_LOG(trace, "after read from socket, fd = {}, remain = {}", fd_, read_buf_.length());
449
26
}
450

            
451
// TODO(zhxie): concern submit multiple read requests or submit read request in advance to improve
452
// performance in the next iteration.
453
72
void IoUringServerSocket::onRead(Request* req, int32_t result, bool injected) {
454
72
  IoUringSocketEntry::onRead(req, result, injected);
455

            
456
72
  ENVOY_LOG(trace,
457
72
            "onRead with result {}, fd = {}, injected = {}, status_ = {}, enable_close_event = {}",
458
72
            result, fd_, injected, static_cast<int>(status_), enable_close_event_);
459
72
  if (!injected) {
460
68
    read_req_ = nullptr;
461
    // If the socket is going to close, discard all results.
462
68
    if (status_ == Closed && write_or_shutdown_req_ == nullptr && read_cancel_req_ == nullptr &&
463
68
        write_or_shutdown_cancel_req_ == nullptr) {
464
13
      if (result > 0 && keep_fd_open_) {
465
1
        moveReadDataToBuffer(req, result);
466
1
      }
467
13
      closeInternal();
468
13
      return;
469
13
    }
470
68
  }
471

            
472
  // Move read data from request to buffer or store the error.
473
59
  if (result > 0) {
474
19
    moveReadDataToBuffer(req, result);
475
40
  } else {
476
40
    if (result != -ECANCELED) {
477
35
      read_error_ = result;
478
35
    }
479
40
  }
480

            
481
  // Discard calling back since the socket is not ready or closed.
482
59
  if (status_ == Initialized || status_ == Closed) {
483
22
    return;
484
22
  }
485

            
486
  // If the socket is enabled and there are bytes to read, notify the handler.
487
37
  if (status_ == ReadEnabled) {
488
27
    if (read_buf_.length() > 0) {
489
19
      onReadCompleted(static_cast<int32_t>(read_buf_.length()));
490
19
    } else if (read_error_.has_value() && read_error_ < 0) {
491
1
      onReadCompleted(read_error_.value());
492
1
      read_error_.reset();
493
1
    }
494
    // Handle remote closed at last.
495
    // Depending on the event listened to, calling different event back to handle remote closed.
496
    // * events & (Read | Closed): Callback Closed,
497
    // * events & (Read)         : Callback Read,
498
    // * events & (Closed)       : Callback Closed,
499
    // * ...else                 : Callback Write.
500
27
    if (read_error_.has_value() && read_error_ == 0 && !enable_close_event_) {
501
6
      ENVOY_LOG(trace, "read remote closed from socket, fd = {}", fd_);
502
6
      onReadCompleted(read_error_.value());
503
6
      read_error_.reset();
504
6
      return;
505
6
    }
506
27
  }
507

            
508
  // If `enable_close_event_` is true, then deliver the remote close as close event.
509
31
  if (read_error_.has_value() && read_error_ == 0) {
510
6
    if (enable_close_event_) {
511
4
      ENVOY_LOG(trace,
512
4
                "remote closed and close event enabled, raise the close event, fd = "
513
4
                "{}, result = {}",
514
4
                fd_, read_error_.value());
515
4
      status_ = RemoteClosed;
516
4
      IoUringSocketEntry::onRemoteClose();
517
4
      read_error_.reset();
518
4
      return;
519
4
    } else {
520
      // In this case, the closed event isn't listened and the status is disabled.
521
      // It means we can't raise the closed or read event. So we only can raise the
522
      // write event.
523
2
      ENVOY_LOG(trace,
524
2
                "remote closed and close event disabled, raise the write event, fd = "
525
2
                "{}, result = {}",
526
2
                fd_, read_error_.value());
527
2
      status_ = RemoteClosed;
528
2
      onWriteCompleted(0);
529
2
      read_error_.reset();
530
2
      return;
531
2
    }
532
6
  }
533

            
534
  // The socket may be not readable during handler onRead callback, check it again here.
535
25
  if (status_ == ReadEnabled) {
536
    // If the read error is zero, it means remote close, then needn't new request.
537
19
    if (!read_error_.has_value() || read_error_.value() != 0) {
538
      // Submit a read request for the next read.
539
19
      submitReadRequest();
540
19
    }
541
21
  } else if (status_ == ReadDisabled) {
542
    // Since error in a disabled socket will not be handled by the handler, stop submit read
543
    // request if there is any error.
544
5
    if (!read_error_.has_value()) {
545
      // Submit a read request for monitoring the remote close event, otherwise there is no
546
      // way to know the connection is closed by the remote.
547
3
      submitReadRequest();
548
3
    }
549
5
  }
550
25
}
551

            
552
28
void IoUringServerSocket::onWriteCompleted(int32_t result) {
553
28
  WriteParam param{result};
554
28
  write_param_ = param;
555
28
  IoUringSocketEntry::onWriteCompleted();
556
28
  write_param_ = absl::nullopt;
557
28
}
558

            
559
41
void IoUringServerSocket::onWrite(Request* req, int32_t result, bool injected) {
560
41
  IoUringSocketEntry::onWrite(req, result, injected);
561

            
562
41
  ENVOY_LOG(trace, "onWrite with result {}, fd = {}, injected = {}, status_ = {}", result, fd_,
563
41
            injected, static_cast<int>(status_));
564
41
  if (!injected) {
565
13
    write_or_shutdown_req_ = nullptr;
566
13
  }
567

            
568
  // Notify the handler directly since it is an injected request.
569
41
  if (injected) {
570
28
    ENVOY_LOG(trace,
571
28
              "there is a inject event, and same time we have regular write request, fd = {}", fd_);
572
    // There is case where write injection may come after shutdown or close which should be ignored
573
    // since the I/O handle or connection may be released after closing.
574
28
    if (!shutdown_.has_value() && status_ != Closed) {
575
25
      onWriteCompleted(result);
576
25
    }
577
28
    return;
578
28
  }
579

            
580
13
  if (result > 0) {
581
8
    write_buf_.drain(result);
582
8
    ENVOY_LOG(trace, "drain write buf, drain size = {}, fd = {}", result, fd_);
583
10
  } else {
584
    // Drain all write buf since the write failed.
585
5
    write_buf_.drain(write_buf_.length());
586
5
    if (!shutdown_.has_value() && status_ != Closed) {
587
2
      status_ = RemoteClosed;
588
2
      if (result == -EPIPE) {
589
1
        IoUringSocketEntry::onRemoteClose();
590
1
      } else {
591
1
        onWriteCompleted(result);
592
1
      }
593
2
    }
594
5
  }
595

            
596
13
  submitWriteOrShutdownRequest();
597
13
}
598

            
599
5
void IoUringServerSocket::onShutdown(Request* req, int32_t result, bool injected) {
600
5
  IoUringSocketEntry::onShutdown(req, result, injected);
601

            
602
5
  ENVOY_LOG(trace, "onShutdown with result {}, fd = {}, injected = {}", result, fd_, injected);
603
5
  ASSERT(!injected);
604
5
  write_or_shutdown_req_ = nullptr;
605
5
  shutdown_ = true;
606

            
607
5
  submitWriteOrShutdownRequest();
608
5
}
609

            
610
54
void IoUringServerSocket::closeInternal() {
611
54
  if (keep_fd_open_) {
612
3
    if (on_closed_cb_) {
613
2
      on_closed_cb_(read_buf_);
614
2
    }
615
3
    cleanup();
616
3
    return;
617
3
  }
618
51
  if (close_req_ == nullptr) {
619
49
    close_req_ = parent_.submitCloseRequest(*this);
620
49
  }
621
51
}
622

            
623
84
void IoUringServerSocket::submitReadRequest() {
624
84
  if (!read_req_) {
625
68
    read_req_ = parent_.submitReadRequest(*this);
626
68
  }
627
84
}
628

            
629
38
void IoUringServerSocket::submitWriteOrShutdownRequest() {
630
38
  if (!write_or_shutdown_req_) {
631
34
    if (write_buf_.length() > 0) {
632
13
      Buffer::RawSliceVector slices = write_buf_.getRawSlices(IOV_MAX);
633
13
      ENVOY_LOG(trace, "submit write request, write_buf size = {}, num_iovecs = {}, fd = {}",
634
13
                write_buf_.length(), slices.size(), fd_);
635
13
      write_or_shutdown_req_ = parent_.submitWriteRequest(*this, slices);
636
21
    } else if (shutdown_.has_value() && !shutdown_.value()) {
637
6
      write_or_shutdown_req_ = parent_.submitShutdownRequest(*this, SHUT_WR);
638
15
    } else if (status_ == Closed && read_req_ == nullptr && read_cancel_req_ == nullptr &&
639
15
               write_or_shutdown_cancel_req_ == nullptr) {
640
7
      closeInternal();
641
7
    }
642
34
  }
643
38
}
644

            
645
IoUringClientSocket::IoUringClientSocket(os_fd_t fd, IoUringWorkerImpl& parent,
646
                                         Event::FileReadyCb cb, uint32_t write_timeout_ms,
647
                                         bool enable_close_event)
648
24
    : IoUringServerSocket(fd, parent, cb, write_timeout_ms, enable_close_event) {}
649

            
650
24
void IoUringClientSocket::connect(const Network::Address::InstanceConstSharedPtr& address) {
651
  // Reuse read request since there is no read on connecting and connect is cancellable.
652
24
  ASSERT(read_req_ == nullptr);
653
24
  read_req_ = parent_.submitConnectRequest(*this, address);
654
24
}
655

            
656
24
void IoUringClientSocket::onConnect(Request* req, int32_t result, bool injected) {
657
24
  IoUringSocketEntry::onConnect(req, result, injected);
658
24
  ASSERT(!injected);
659
24
  ENVOY_LOG(trace, "onConnect with result {}, fd = {}, injected = {}, status_ = {}", result, fd_,
660
24
            injected, static_cast<int>(status_));
661

            
662
24
  read_req_ = nullptr;
663
  // Socket may be closed on connecting like binding error. In this situation we may not callback
664
  // on connecting completion.
665
24
  if (status_ == Closed) {
666
1
    if (write_or_shutdown_req_ == nullptr && read_cancel_req_ == nullptr &&
667
1
        write_or_shutdown_cancel_req_ == nullptr) {
668
1
      closeInternal();
669
1
    }
670
1
    return;
671
1
  }
672

            
673
23
  if (result == 0) {
674
20
    enableRead();
675
20
  }
676
  // Calls parent injectCompletion() directly since we want to send connect result back to the IO
677
  // handle.
678
23
  parent_.injectCompletion(*this, Request::RequestType::Write, result);
679
23
}
680

            
681
} // namespace Io
682
} // namespace Envoy