Coverage Report

Created: 2025-11-11 06:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/pistache/src/common/transport.cc
Line
Count
Source
1
/*
2
 * SPDX-FileCopyrightText: 2017 Mathieu Stefani
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5
 */
6
7
/* traqnsport.cc
8
   Mathieu Stefani, 02 July 2017
9
10
   TCP transport handling
11
12
*/
13
14
#include <pistache/eventmeth.h>
15
#include <pistache/pist_quote.h>
16
17
#ifdef _USE_LIBEVENT_LIKE_APPLE
18
19
#ifdef __NetBSD__
20
// For TCP_NODELAY
21
#include <netinet/in.h>
22
#include <netinet/tcp.h>
23
#include <sys/socket.h>
24
#else
25
#ifndef _IS_WINDOWS
26
// There is no TCP_NOPUSH/TCP_CORK in Windows or NetBSD
27
#include <netinet/tcp.h> // for TCP_NOPUSH
28
#endif
29
#endif // of ifdef __NetBSD__ ... else ...
30
31
#endif // of ifdef _USE_LIBEVENT_LIKE_APPLE
32
33
// ps_sendfile.h includes sys/uio.h in macOS, and sys/sendfile.h in Linux
34
#include <pistache/ps_sendfile.h>
35
36
#include PST_MISC_IO_HDR // unistd.h/lseek in BSD.
37
38
#include PIST_SOCKFNS_HDR // socket read, write and close
39
40
#ifndef _USE_LIBEVENT_LIKE_APPLE
41
// Note: sys/timerfd.h is linux-only (and certainly POSIX only)
42
#include <sys/timerfd.h>
43
#endif
44
45
#include <pistache/os.h>
46
#include <pistache/peer.h>
47
#include <pistache/tcp.h>
48
#include <pistache/transport.h>
49
#include <pistache/utils.h>
50
51
using std::to_string;
52
53
#ifdef _USE_LIBEVENT_LIKE_APPLE
54
#if defined(__NetBSD__) || defined(_IS_WINDOWS)
55
#define PS_USE_TCP_NODELAY 1
56
#endif
57
#endif
58
59
namespace Pistache::Tcp
60
{
61
    using namespace Polling;
62
63
    Transport::Transport(const std::shared_ptr<Tcp::Handler>& handler)
64
#ifdef _USE_LIBEVENT_LIKE_APPLE
65
        : tcp_prot_num_(-1)
66
#endif
67
0
    {
68
0
        init(handler);
69
0
    }
70
71
    void Transport::init(const std::shared_ptr<Tcp::Handler>& handler)
72
0
    {
73
        // Note: In the _USE_LIBEVENT case, EventMeth::init() called out of
74
        // EventMethFns::create
75
76
0
        handler_ = handler;
77
0
        handler_->associateTransport(this);
78
79
#ifdef _USE_LIBEVENT_LIKE_APPLE
80
        // SOL_TCP not defined in macOS Nov 2023
81
        const struct protoent* pe = getprotobyname("tcp");
82
83
        // TCP protocol num on this host
84
        tcp_prot_num_ = pe ? pe->p_proto : 6; // it's usually 6...
85
#endif
86
0
    }
87
88
    Transport::~Transport()
89
0
    {
90
0
        removeAllPeers();
91
0
    }
92
93
    std::shared_ptr<Aio::Handler> Transport::clone() const
94
0
    {
95
0
        return std::make_shared<Transport>(handler_->clone());
96
0
    }
97
98
    void Transport::flush()
99
0
    {
100
0
        handleWriteQueue(true);
101
0
    }
102
103
    void Transport::registerPoller(Polling::Epoll& poller)
104
0
    {
105
0
        PS_TIMEDBG_START_THIS;
106
107
0
        writesQueue.bind(poller);
108
0
        timersQueue.bind(poller);
109
0
        peersQueue.bind(poller);
110
0
        notifier.bind(poller);
111
112
#ifdef _USE_LIBEVENT
113
        epoll_fd = poller.getEventMethEpollEquiv();
114
#endif
115
0
    }
116
117
    void Transport::unregisterPoller(Polling::Epoll& poller)
118
0
    {
119
0
        PS_TIMEDBG_START_THIS;
120
121
#ifdef _USE_LIBEVENT
122
        epoll_fd = nullptr;
123
#endif
124
125
0
        notifier.unbind(poller);
126
0
        peersQueue.unbind(poller);
127
0
        timersQueue.unbind(poller);
128
0
        writesQueue.unbind(poller);
129
0
    }
130
131
    void Transport::handleNewPeer(const std::shared_ptr<Tcp::Peer>& peer)
132
0
    {
133
0
        auto ctx                   = context();
134
0
        const bool isInRightThread = std::this_thread::get_id() == ctx.thread();
135
136
0
        if (!isInRightThread)
137
0
        {
138
0
            PS_LOG_DEBUG("Pushing to peersQueue");
139
140
0
            PeerEntry entry(peer);
141
0
            peersQueue.push(std::move(entry));
142
0
        }
143
0
        else
144
0
        {
145
0
            PS_LOG_DEBUG("Not pushing to peersQueue, handling directly");
146
0
            handlePeer(peer);
147
0
        }
148
149
0
        Guard guard(toWriteLock);
150
0
        Fd fd = peer->fd();
151
0
        if (fd == PS_FD_EMPTY)
152
0
        {
153
0
            PS_LOG_DEBUG("Empty Fd");
154
0
            return;
155
0
        }
156
157
0
        toWrite.emplace(fd, std::deque<WriteEntry> {});
158
0
    }
159
160
#ifdef DEBUG
161
    static void logFdAndNotifyOn(const Aio::FdSet::Entry& entry)
162
    {
163
#ifdef _USE_LIBEVENT
164
        std::string str("entry");
165
#else
166
        std::string str("fd ");
167
168
        std::stringstream ss;
169
        ss << (PS_NUM_CAST_TO_FD(entry.getTag().value()));
170
        str += ss.str();
171
#endif
172
173
        const char* flag_str = nullptr;
174
175
        if (entry.isReadable())
176
            flag_str = " readable";
177
        if (entry.isWritable())
178
            flag_str = " writable";
179
        if (entry.isHangup())
180
            flag_str = " hangup";
181
        if (!flag_str)
182
            flag_str = " <unknown>";
183
        str += flag_str;
184
185
        PS_LOG_DEBUG_ARGS("%s", str.c_str());
186
    }
187
188
#define PS_LOG_DBG_FD_AND_NOTIFY logFdAndNotifyOn(entry)
189
#else
190
#define PS_LOG_DBG_FD_AND_NOTIFY
191
#endif
192
193
    void Transport::onReady(const Aio::FdSet& fds)
194
0
    {
195
0
        PS_LOG_DEBUG_ARGS("%d fds", fds.size());
196
197
0
        for (const auto& entry : fds)
198
0
        {
199
0
            PS_LOG_DBG_FD_AND_NOTIFY;
200
201
0
            if (entry.getTag() == writesQueue.tag())
202
0
            {
203
0
                PS_LOG_DEBUG("Write queue");
204
0
                handleWriteQueue();
205
0
            }
206
0
            else if (entry.getTag() == timersQueue.tag())
207
0
            {
208
0
                PS_LOG_DEBUG("Timers queue");
209
0
                handleTimerQueue();
210
0
            }
211
0
            else if (entry.getTag() == peersQueue.tag())
212
0
            {
213
0
                PS_LOG_DEBUG("Peers queue");
214
0
                handlePeerQueue();
215
0
            }
216
0
            else if (entry.getTag() == notifier.tag())
217
0
            {
218
0
                PS_LOG_DEBUG("notifier");
219
0
                handleNotify();
220
0
            }
221
222
0
            else if (entry.isReadable())
223
0
            {
224
0
                auto tag = entry.getTag();
225
0
                PS_LOG_DEBUG_ARGS("entry isReadable fd %" PIST_QUOTE(PS_FD_PRNTFCD),
226
0
                                  tag.value()); // TagValue type := Fd
227
228
0
                if (isPeerFd(tag))
229
0
                {
230
0
                    auto peer = getPeer(tag);
231
0
                    PS_LOG_DEBUG("handleIncoming");
232
0
                    handleIncoming(peer);
233
0
                }
234
0
                else if (isTimerFd(tag))
235
0
                {
236
0
                    auto it      = timers.find(static_cast<decltype(timers)::key_type>(tag.value()));
237
0
                    auto& entry_ = it->second;
238
0
                    PS_LOG_DEBUG("handleTimer");
239
0
                    handleTimer(std::move(entry_));
240
0
                    PS_LOG_DEBUG_ARGS("Timer %" PIST_QUOTE(PS_FD_PRNTFCD) " erased from timers",
241
0
                                      it->first);
242
243
0
                    PS_LOG_DEBUG_ARGS("Timer %" PIST_QUOTE(PS_FD_PRNTFCD) " erasing from timers",
244
0
                                      it->first);
245
0
                    timers.erase(it->first);
246
0
                }
247
0
                else
248
0
                {
249
0
                    PS_LOG_DEBUG("neither peer nor timer");
250
0
                }
251
0
            }
252
0
            else if (entry.isWritable())
253
0
            {
254
0
                PS_LOG_DEBUG("isWritable");
255
256
0
                auto tag        = entry.getTag();
257
0
                FdConst fdconst = static_cast<FdConst>(tag.value());
258
                // Since fd is about to be written to, it isn't really const,
259
                // and we cast away the const
260
0
                Fd fd = PS_CAST_AWAY_CONST_FD(fdconst);
261
262
0
                {
263
0
                    Guard guard(toWriteLock);
264
0
                    auto it = toWrite.find(fd);
265
0
                    if (it == std::end(toWrite))
266
0
                    {
267
                        // This can happen if another thread triggered an explicit
268
                        // flush right when this thread woke up to write as well
269
0
                        PS_LOG_DEBUG("fd not in toWrite, nothing to write");
270
0
                    }
271
0
                }
272
273
0
                reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
274
275
0
                PS_LOG_DEBUG("asyncWriteImpl (drain queue)");
276
                // Try to drain the queue
277
0
                asyncWriteImpl(fd);
278
0
            }
279
0
        }
280
0
    }
281
282
    void Transport::disarmTimer(Fd fd)
283
0
    {
284
0
        PS_TIMEDBG_START_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
285
286
0
        auto it = timers.find(fd);
287
0
        if (it == std::end(timers))
288
0
            throw std::runtime_error("Timer has not been armed");
289
290
0
        auto& entry = it->second;
291
0
        entry.disable();
292
0
    }
293
294
    void Transport::handleIncoming(const std::shared_ptr<Peer>& peer)
295
0
    {
296
0
        if (!peer)
297
0
        {
298
0
            PS_LOG_DEBUG("Null peer");
299
0
            return;
300
0
        }
301
302
0
        char buffer[Const::MaxBuffer] = { 0 };
303
304
0
        PST_SSIZE_T totalBytes = 0;
305
0
        em_socket_t fdactual   = peer->actualFd();
306
0
        if (fdactual < 0)
307
0
        {
308
0
            PS_LOG_DEBUG_ARGS("Peer %p has no actual Fd", peer.get());
309
0
            return;
310
0
        }
311
312
0
        for (;;)
313
0
        {
314
315
0
            PST_SSIZE_T bytes = -1;
316
0
            bool retry        = false;
317
318
#ifdef PISTACHE_USE_SSL
319
            if (peer->ssl() != nullptr)
320
            {
321
                PS_LOG_DEBUG("SSL_read");
322
323
                bytes = SSL_read(reinterpret_cast<SSL*>(peer->ssl()),
324
                                 buffer + totalBytes,
325
                                 static_cast<int>(Const::MaxBuffer - totalBytes));
326
                if (bytes <= 0)
327
                {
328
                    int ssl_get_error_res = SSL_get_error(
329
                        reinterpret_cast<SSL*>(peer->ssl()),
330
                        static_cast<int>(bytes));
331
                    PS_LOG_DEBUG_ARGS("SSL_read err %d, bytes %d", ssl_get_error_res, bytes);
332
333
                    switch (ssl_get_error_res)
334
                    {
335
                    case SSL_ERROR_SYSCALL:
336
                    case SSL_ERROR_SSL:
337
                        PS_LOG_DEBUG_ARGS("SSL_read clearing error queue, last 0x%08X", ERR_peek_last_error());
338
                        ERR_clear_error();
339
                        break;
340
341
                    case SSL_ERROR_WANT_READ:
342
                        bytes = -1; // To force handling of reset in the logic that follows.
343
                        retry = true;
344
                        break;
345
346
                    default:
347
                        break;
348
                    }
349
                }
350
            }
351
            else
352
            {
353
#endif /* PISTACHE_USE_SSL */
354
0
                PS_LOG_DEBUG("recv (read)");
355
0
                bytes = PST_SOCK_READ(fdactual, buffer + totalBytes,
356
0
                                      Const::MaxBuffer - totalBytes);
357
0
                if (bytes < 0)
358
0
                    retry = (errno == EAGAIN || errno == EWOULDBLOCK);
359
#ifdef PISTACHE_USE_SSL
360
            }
361
#endif /* PISTACHE_USE_SSL */
362
363
0
            PST_DBG_DECL_SE_ERR_P_EXTRA;
364
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", "
365
0
                                                               "bytes read %d, totalBytes %d, retry %s,"
366
0
                                                               "err %d %s",
367
0
                              peer->fd(), bytes, totalBytes,
368
0
                              (retry) ? "true" : "false",
369
0
                              (bytes < 0) ? errno : 0,
370
0
                              (bytes < 0) ? (PST_STRERROR_R_ERRNO) : "");
371
372
0
            if (bytes == -1)
373
0
            {
374
0
                if (retry)
375
0
                {
376
0
                    if (totalBytes > 0)
377
0
                    {
378
0
                        handler_->onInput(buffer, totalBytes, peer);
379
0
                    }
380
0
                }
381
0
                else
382
0
                {
383
0
                    handlePeerDisconnection(peer);
384
0
                }
385
0
                break;
386
0
            }
387
0
            else if (bytes == 0)
388
0
            {
389
0
                handlePeerDisconnection(peer);
390
0
                break;
391
0
            }
392
393
0
            else
394
0
            {
395
0
                handler_->onInput(buffer, bytes, peer);
396
0
            }
397
0
        }
398
0
    }
399
400
    void Transport::handlePeerDisconnection(const std::shared_ptr<Peer>& peer)
401
0
    {
402
0
        handler_->onDisconnection(peer);
403
404
0
        removePeer(peer);
405
0
    }
406
407
    void Transport::removePeer(const std::shared_ptr<Peer>& peer)
408
0
    {
409
0
        Fd fd = peer->fd();
410
0
        if (fd == PS_FD_EMPTY)
411
0
        {
412
0
            PS_LOG_DEBUG("Empty Fd");
413
0
            return;
414
0
        }
415
0
        {
416
            // See comment in transport.h on why peers_ must be mutex-protected
417
0
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
418
419
0
            auto it = peers_.find(fd);
420
0
            if (it == std::end(peers_))
421
0
            {
422
0
                PS_LOG_WARNING_ARGS("peer %p not found in peers_", peer.get());
423
0
            }
424
0
            else
425
0
            {
426
0
                peers_.erase(it);
427
0
            }
428
0
        }
429
430
        // Don't rely on close deleting this FD from the epoll "interest" list.
431
        // This is needed in case the FD has been shared with another process.
432
        // Sharing should no longer happen by accident as SOCK_CLOEXEC is now set on
433
        // listener accept. This should then guarantee that the next call to
434
        // epoll_wait will not give us any events relating to this FD even if they
435
        // have been queued in the kernel since the last call to epoll_wait.
436
0
        Aio::Reactor* r = reactor();
437
0
        if (r) // or if r is NULL then reactor has been detached already
438
0
            r->removeFd(key(), fd);
439
440
0
        peer->closeFd();
441
0
    }
442
443
    void Transport::closeFd(Fd fd)
444
0
    {
445
0
        if (fd == PS_FD_EMPTY)
446
0
        {
447
0
            PS_LOG_DEBUG("Trying to close empty Fd");
448
0
            return;
449
0
        }
450
451
0
        Guard guard(toWriteLock);
452
0
        toWrite.erase(fd); // Clean up write buffers
453
454
0
        CLOSE_FD(fd);
455
0
    }
456
457
    void Transport::removeAllPeers()
458
0
    {
459
0
        PS_TIMEDBG_START_THIS;
460
461
0
        for (;;)
462
0
        {
463
0
            std::shared_ptr<Peer> peer;
464
465
0
            { // encapsulate
466
0
                std::lock_guard<std::mutex> l_guard(peers_mutex_);
467
0
                auto it = peers_.begin();
468
0
                if (it == peers_.end())
469
0
                    break;
470
471
0
                peer = it->second;
472
0
                if (!peer)
473
0
                {
474
0
                    peers_.erase(it);
475
0
                    PS_LOG_DEBUG("peer NULL");
476
0
                    continue;
477
0
                }
478
0
            }
479
480
0
            removePeer(peer); // removePeer locks mutex, erases peer from peers_
481
0
        }
482
0
    }
483
484
    void Transport::asyncWriteImpl(Fd fd)
485
0
    {
486
0
        PS_TIMEDBG_START_THIS;
487
488
0
        bool stop = false;
489
0
        while (!stop)
490
0
        {
491
0
            std::unique_lock<std::mutex> lock(toWriteLock);
492
493
0
            auto it = toWrite.find(fd);
494
495
            // cleanup will have been handled by handlePeerDisconnection
496
0
            if (it == std::end(toWrite))
497
0
            {
498
0
                PS_LOG_DEBUG_ARGS("Failed to find fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
499
0
                return;
500
0
            }
501
0
            auto& wq = it->second;
502
0
            if (wq.empty())
503
0
            {
504
0
                PS_LOG_DEBUG("wq empty");
505
0
                break;
506
0
            }
507
508
0
            auto& entry = wq.front();
509
0
            int flags   = entry.flags;
510
#ifdef _USE_LIBEVENT_LIKE_APPLE
511
            bool msg_more_style = entry.msg_more_style;
512
#endif
513
0
            BufferHolder& buffer                  = entry.buffer;
514
0
            Async::Deferred<PST_SSIZE_T> deferred = std::move(entry.deferred);
515
516
0
            auto cleanUp = [&]() {
517
0
                wq.pop_front();
518
0
                if (wq.empty())
519
0
                {
520
0
                    PS_LOG_DEBUG_ARGS("Erasing fd %" PIST_QUOTE(PS_FD_PRNTFCD) " from toWrite", fd);
521
0
                    toWrite.erase(fd);
522
0
                    reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
523
0
                    stop = true;
524
0
                }
525
0
                lock.unlock();
526
0
            };
527
528
0
            size_t totalWritten = buffer.offset();
529
0
            for (;;)
530
0
            {
531
0
                PST_SSIZE_T bytesWritten = 0;
532
0
                auto len                 = buffer.size() - totalWritten;
533
534
0
                if (buffer.isRaw())
535
0
                {
536
0
                    PS_LOG_DEBUG_ARGS("sendRawBuffer fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", len %d",
537
0
                                      fd, len);
538
539
0
                    auto raw        = buffer.raw();
540
0
                    const auto* ptr = raw.data().c_str() + totalWritten;
541
0
                    bytesWritten    = sendRawBuffer(fd, ptr, len, flags
542
#ifdef _USE_LIBEVENT_LIKE_APPLE
543
                                                 ,
544
                                                 msg_more_style
545
#endif
546
0
                    );
547
0
                }
548
0
                else
549
0
                {
550
0
                    PS_LOG_DEBUG_ARGS("sendFile fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", len %d",
551
0
                                      fd, len);
552
553
0
                    auto file    = buffer.fd();
554
0
                    off_t offset = static_cast<off_t>(totalWritten);
555
0
                    bytesWritten = sendFile(fd, file, offset, len);
556
0
                }
557
0
                if (bytesWritten < 0)
558
0
                {
559
0
                    PST_DBG_DECL_SE_ERR_P_EXTRA;
560
0
                    PS_LOG_DEBUG_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD) " errno %d %s",
561
0
                                      fd, errno, PST_STRERROR_R_ERRNO);
562
563
0
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
564
0
                    {
565
0
                        auto bufferHolder = buffer.detach(static_cast<off_t>(totalWritten));
566
567
                        // pop_front kills buffer - so we cannot continue loop or use buffer
568
                        // after this point
569
0
                        wq.pop_front();
570
0
                        wq.push_front(WriteEntry(std::move(deferred),
571
0
                                                 bufferHolder, fd, flags
572
#ifdef _USE_LIBEVENT_LIKE_APPLE
573
                                                 ,
574
                                                 msg_more_style
575
#endif
576
0
                                                 ));
577
0
                        reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
578
0
                                            Polling::Mode::Edge);
579
0
                        stop = true;
580
0
                    }
581
                    // EBADF can happen when the HTTP parser, in the case of
582
                    // an error, closes fd before the entire request is processed.
583
                    // https://github.com/pistacheio/pistache/issues/501
584
0
                    else if (errno == EBADF || errno == EPIPE || errno == ECONNRESET)
585
0
                    {
586
0
                        PS_LOG_DEBUG_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD) " EBADF/EPIPE/ECONNRESET so erasing",
587
0
                                          fd);
588
0
                        wq.pop_front();
589
0
                        toWrite.erase(fd);
590
0
                        stop = true;
591
0
                    }
592
0
                    else
593
0
                    {
594
0
                        PS_LOG_DEBUG_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD) " rejecting write attempt", fd);
595
0
                        cleanUp();
596
0
                        deferred.reject(Pistache::Error::system("Could not write data"));
597
0
                    }
598
0
                    break;
599
0
                }
600
0
                else
601
0
                {
602
0
                    totalWritten += bytesWritten;
603
0
                    if (totalWritten >= buffer.size())
604
0
                    {
605
0
                        if (buffer.isFile())
606
0
                        {
607
0
                            PS_LOG_DEBUG_ARGS("file ::close actual_fd %d",
608
0
                                              buffer.fd());
609
610
                            // done with the file buffer, nothing else knows
611
                            // whether to close it with the way the code is
612
                            // written.
613
0
                            PST_FILE_CLOSE(buffer.fd());
614
0
                        }
615
616
0
                        cleanUp();
617
618
                        // Cast to match the type of defered template
619
                        // to avoid a BadType exception
620
0
                        deferred.resolve(static_cast<PST_SSIZE_T>(totalWritten));
621
0
                        break;
622
0
                    }
623
0
                }
624
0
            }
625
0
        }
626
0
    }
627
628
#ifdef _USE_LIBEVENT_LIKE_APPLE
629
    void Transport::configureMsgMoreStyle(Fd fd, bool msg_more_style)
630
    {
631
        // PS_USE_TCP_NODELAY defined (or not) at top of file
632
633
        PST_SOCK_OPT_VAL_TYPICAL_T tcp_no_push = 0;
634
        PST_SOCKLEN_T len                      = sizeof(tcp_no_push);
635
        int sock_opt_res                       = -1;
636
637
#ifdef PS_USE_TCP_NODELAY
638
        { // encapsulate
639
            PST_SOCK_OPT_VAL_TYPICAL_T tcp_nodelay = 0;
640
641
            sock_opt_res = getsockopt(
642
                GET_ACTUAL_FD(fd), tcp_prot_num_, TCP_NODELAY,
643
                reinterpret_cast<PST_SOCK_OPT_VAL_PTR_T>(&tcp_nodelay), &len);
644
            if (sock_opt_res == 0)
645
                tcp_no_push = !tcp_nodelay;
646
        }
647
#else
648
        sock_opt_res = getsockopt(GET_ACTUAL_FD(fd), tcp_prot_num_,
649
#if defined __APPLE__ || defined _IS_BSD
650
                                  TCP_NOPUSH,
651
#else
652
                                  TCP_CORK,
653
#endif
654
                                  &tcp_no_push, &len);
655
#endif // of if defined(__NetBSD__) || defined(_IS_WINDOWS) ... else
656
657
        if (sock_opt_res == 0)
658
        {
659
            if (((tcp_no_push == 0) && (msg_more_style)) || ((tcp_no_push != 0) && (!msg_more_style)))
660
            {
661
                PS_LOG_DEBUG_ARGS("Setting MSG_MORE style to %s",
662
                                  (msg_more_style) ? "on" : "off");
663
664
                PST_SOCK_OPT_VAL_TYPICAL_T optval =
665
#ifdef PS_USE_TCP_NODELAY
666
                    // In NetBSD case we're getting/setting (or resetting) the
667
                    // TCP_NODELAY socket option, which _stops_ data being held
668
                    // prior to send, whereas in Linux, macOS, FreeBSD or
669
                    // OpenBSD we're using TCP_CORK/TCP_NOPUSH which may
670
                    // _cause_ data to be held prior to send. I.e. they're
671
                    // opposites.
672
                    msg_more_style ? 0 : 1;
673
#else
674
                    msg_more_style ? 1 : 0;
675
#endif
676
677
                sock_opt_res = setsockopt(
678
                    GET_ACTUAL_FD(fd), tcp_prot_num_,
679
#ifdef PS_USE_TCP_NODELAY
680
                    TCP_NODELAY,
681
#elif defined __APPLE__ || defined _IS_BSD
682
                    TCP_NOPUSH,
683
#else
684
                        TCP_CORK,
685
#endif
686
                    reinterpret_cast<PST_SOCK_OPT_VAL_PTR_T>(&optval),
687
                    len);
688
                if (sock_opt_res < 0)
689
                    throw std::runtime_error("setsockopt failed");
690
            }
691
#ifdef DEBUG
692
            else
693
            {
694
                PS_LOG_DEBUG_ARGS("MSG_MORE style is already %s",
695
                                  (msg_more_style ? 1 : 0) ? "on" : "off");
696
            }
697
#endif
698
        }
699
        else
700
        {
701
            PST_DBG_DECL_SE_ERR_P_EXTRA;
702
            PS_LOG_DEBUG_ARGS("getsockopt failed for fd %p, actual fd %d, "
703
                              "errno %d, err %s",
704
                              fd, GET_ACTUAL_FD(fd), errno,
705
                              PST_STRERROR_R_ERRNO);
706
707
            throw std::runtime_error("getsockopt failed");
708
        }
709
    }
710
#endif // of ifdef _USE_LIBEVENT_LIKE_APPLE
711
712
    PST_SSIZE_T Transport::sendRawBuffer(Fd fd, const char* buffer, size_t len,
713
                                         int flags
714
#ifdef _USE_LIBEVENT_LIKE_APPLE
715
                                         ,
716
                                         bool msg_more_style
717
#endif
718
    )
719
0
    {
720
0
        PST_SSIZE_T bytesWritten = 0;
721
722
#ifdef PISTACHE_USE_SSL
723
        bool it_second_ssl_is_null = false;
724
725
        {
726
            // See comment in transport.h on why peers_ must be mutex-protected
727
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
728
729
            auto it_ = peers_.find(fd);
730
731
            if (it_ == std::end(peers_))
732
                throw std::runtime_error(
733
                    "No peer found for fd: " + to_string(fd));
734
735
            it_second_ssl_is_null = (it_->second->ssl() == nullptr);
736
737
            if (!it_second_ssl_is_null)
738
            {
739
                auto ssl_ = static_cast<SSL*>(it_->second->ssl());
740
                PS_LOG_DEBUG_ARGS("SSL_write, len %d", static_cast<int>(len));
741
742
                bytesWritten = SSL_write(ssl_, buffer, static_cast<int>(len));
743
                if (bytesWritten <= 0)
744
                {
745
                    int ssl_get_error_res = SSL_get_error(
746
                        ssl_, static_cast<int>(bytesWritten));
747
                    PS_LOG_DEBUG_ARGS("SSL_write err %d, bytes %d", ssl_get_error_res, bytesWritten);
748
749
                    switch (ssl_get_error_res)
750
                    {
751
                    case SSL_ERROR_WANT_WRITE:
752
                        errno = EAGAIN;
753
                        break;
754
755
                    case SSL_ERROR_ZERO_RETURN:
756
                        errno = ECONNRESET;
757
                        break;
758
759
                    case SSL_ERROR_WANT_CONNECT:
760
                    case SSL_ERROR_WANT_ACCEPT:
761
                        errno = ENOTCONN;
762
                        break;
763
764
                    case SSL_ERROR_SYSCALL:
765
                    case SSL_ERROR_SSL:
766
                        PS_LOG_DEBUG_ARGS("SSL_write clearing error queue, last 0x%08X", ERR_peek_last_error());
767
                        ERR_clear_error();
768
                        errno = EBADF;
769
                        break;
770
771
                    case SSL_ERROR_NONE:
772
                        PS_LOG_INFO("SSL_write returned <0, "
773
                                    "but SSL_get_error returned ERROR_NONE");
774
                        // Deliberately fall through to default
775
                        break;
776
777
                    default:
778
                        errno = EINVAL;
779
                        break;
780
                    }
781
                }
782
            }
783
        }
784
785
        if (it_second_ssl_is_null)
786
        {
787
#endif /* PISTACHE_USE_SSL */
788
789
#ifdef PS_USE_TCP_NODELAY
790
            // Note re: TCP_NODELAY. Per the Linux tcp man page, "setting this
791
            // option forces an explicit flush of pending output". However, we
792
            // don't want the waiting content to be sent until after the
793
            // _current_ send, which can then be included in the data being
794
            // flushed; i.e. we want to send any already-pending output, plus
795
            // the new output we're adding here with "send", in one go.
796
            // Accordingly, when TCP_NODELAY is used, if we are turning
797
            // TCP_NODELAY to OFF (i.e. msg_more_style is true), we want to do
798
            // so _before_ calling send; but if we are turning it ON, we want
799
            // to do so _after_ calling send.
800
            if (msg_more_style)
801
#endif
802
#ifdef _USE_LIBEVENT_LIKE_APPLE
803
                configureMsgMoreStyle(fd, msg_more_style);
804
#endif
805
806
0
            PS_LOG_DEBUG_ARGS("::send, fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", actual_fd %d, len %d",
807
0
                              fd, GET_ACTUAL_FD(fd), static_cast<int>(len));
808
809
0
            bytesWritten =
810
#ifdef _IS_WINDOWS
811
                // Comparing with PST_SOCK_SEND below, there's no SIGPIPE in
812
                // Windows and MSG_NOSIGNAL is not defined in Windows
813
                PST_SOCK_SEND(GET_ACTUAL_FD(fd), buffer, len, flags);
814
#else
815
0
            PST_SOCK_SEND(GET_ACTUAL_FD(fd), buffer, len,
816
0
                          flags | MSG_NOSIGNAL);
817
        // MSG_NOSIGNAL is used to prevent SIGPIPE on client connection
818
        // termination
819
0
#endif
820
#ifdef _USE_LIBEVENT_LIKE_APPLE
821
            PS_LOG_DEBUG_ARGS("bytesWritten = %d, msg_more_style = %s",
822
                              bytesWritten, msg_more_style ? "on" : "off");
823
#else
824
0
        PS_LOG_DEBUG_ARGS("bytesWritten = %d", bytesWritten);
825
0
#endif
826
827
#ifdef PS_USE_TCP_NODELAY
828
            // See comment above on why configureMsgMoreStyle is done after
829
            // "send" in TCP_NODELAY case.
830
            if (!msg_more_style)
831
                configureMsgMoreStyle(fd, msg_more_style);
832
#endif
833
834
#ifdef PISTACHE_USE_SSL
835
        }
836
#endif /* PISTACHE_USE_SSL */
837
838
0
        return bytesWritten;
839
0
    }
840
841
#ifdef _IS_BSD
842
#define SENDFILE my_sendfile
843
#else
844
#define SENDFILE ::sendfile
845
#endif // ifdef _IS_BSD
846
847
    PST_SSIZE_T Transport::sendFile(Fd fd, int file, off_t offset, size_t len)
848
0
    {
849
0
        PST_SSIZE_T bytesWritten = 0;
850
851
#ifdef PISTACHE_USE_SSL
852
        bool it_second_ssl_is_null = false;
853
854
        {
855
            // See comment in transport.h on why peers_ must be mutex-protected
856
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
857
            auto it_ = peers_.find(fd);
858
859
            if (it_ == std::end(peers_))
860
            {
861
                PS_LOG_WARNING_ARGS("No peer for fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
862
                PS_LOG_WARNING_ARGS("No peer found for fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", actual-fd %d",
863
                                    fd, GET_ACTUAL_FD(fd));
864
865
                throw std::runtime_error(
866
                    "No peer found for fd: " + to_string(fd));
867
            }
868
            it_second_ssl_is_null = (it_->second->ssl() == nullptr);
869
870
            if (!it_second_ssl_is_null)
871
            {
872
                PS_LOG_DEBUG_ARGS("SSL_sendfile, len %d", len);
873
874
                auto ssl_    = static_cast<SSL*>(it_->second->ssl());
875
                bytesWritten = SSL_sendfile(ssl_, file, &offset, len);
876
            }
877
        }
878
879
        if (it_second_ssl_is_null)
880
        {
881
#endif /* PISTACHE_USE_SSL */
882
883
#ifdef DEBUG
884
            const char* sendfile_fn_name = PIST_QUOTE(PS_SENDFILE);
885
#endif
886
0
            PS_LOG_DEBUG_ARGS(
887
0
                "%s fd %" PIST_QUOTE(PS_FD_PRNTFCD) " actual-fd %d, file fd %d, len %d", sendfile_fn_name,
888
0
                fd, GET_ACTUAL_FD(fd), file, len);
889
890
#if defined(_USE_LIBEVENT_LIKE_APPLE) && !defined(PS_USE_TCP_NODELAY)
891
            // See prior comment on why configureMsgMoreStyle is done after
892
            // "send" in TCP_NODELAY case.
893
            configureMsgMoreStyle(fd, false /*msg_more_style*/);
894
895
            // !!!! Should we do configureMsgMoreStyle for SSL as well? And
896
            // same question in sendRawBuffer
897
#endif
898
899
#ifdef __APPLE__
900
            off_t len_as_off_t = (off_t)len;
901
902
            // NB: The order of the first two parameters for ::sendfile are the
903
            // opposite way round in macOS vs. Linux
904
            // Also, in macOS sendfile returns zero for success, whereas in
905
            // Linux upon success it returns number of bytes written
906
            //
907
            // Although macOS sendfile man page is silent on the matter, by
908
            // experimentation it appears sendfile does not advance the file
909
            // position of "file", which is the same as the behavior described
910
            // in Linux sendfile man page
911
            int sendfile_res = PS_SENDFILE(file, GET_ACTUAL_FD(fd),
912
                                           offset, &len_as_off_t,
913
                                           nullptr, // no new prefix/suffix content
914
                                           0 /*reserved, must be zero*/);
915
916
            if (sendfile_res == 0)
917
            {
918
                bytesWritten = (PST_SSIZE_T)len_as_off_t;
919
                offset += len_as_off_t; // to match what Linux sendfile does
920
            }
921
            else
922
            {
923
                bytesWritten = -1;
924
            }
925
926
#else
927
0
        bytesWritten = PS_SENDFILE(GET_ACTUAL_FD(fd), file, &offset, len);
928
0
#endif
929
#ifdef PS_USE_TCP_NODELAY
930
            // See prior comment on why configureMsgMoreStyle is done after
931
            // "send" in TCP_NODELAY case.
932
            configureMsgMoreStyle(fd, false /*msg_more_style*/);
933
#endif
934
935
0
            PS_LOG_DEBUG_ARGS(
936
0
                "%s fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", bytesWritten %d",
937
0
                sendfile_fn_name, fd, bytesWritten);
938
939
#ifdef PISTACHE_USE_SSL
940
        }
941
#endif /* PISTACHE_USE_SSL */
942
943
0
        return bytesWritten;
944
0
    }
945
946
    void Transport::armTimerMs(Fd fd, std::chrono::milliseconds value,
947
                               Async::Deferred<uint64_t> deferred)
948
0
    {
949
0
        PS_TIMEDBG_START_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
950
951
0
        auto ctx                   = context();
952
0
        const bool isInRightThread = std::this_thread::get_id() == ctx.thread();
953
0
        TimerEntry entry(fd, value, std::move(deferred));
954
955
0
        if (!isInRightThread)
956
0
        {
957
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", timersQueue.push",
958
0
                              fd);
959
0
            timersQueue.push(std::move(entry));
960
0
        }
961
0
        else
962
0
        {
963
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", armTimerMsImpl",
964
0
                              fd);
965
0
            armTimerMsImpl(std::move(entry));
966
0
        }
967
0
    }
968
969
    void Transport::armTimerMsImpl(TimerEntry entry)
970
0
    {
971
#ifdef DEBUG
972
        Fd entry_fd = entry.fd;
973
974
#ifdef _USE_LIBEVENT
975
        PS_LOG_DEBUG_ARGS("entry.fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", "
976
                                                                 "fd->type %d",
977
                          entry_fd,
978
                          EventMethFns::getEmEventType(entry_fd));
979
#else
980
        PS_LOG_DEBUG_ARGS("entry.fd %" PIST_QUOTE(PS_FD_PRNTFCD),
981
                          entry_fd);
982
#endif
983
984
#endif
985
986
0
        auto it = timers.find(entry.fd);
987
0
        if (it != std::end(timers))
988
0
        {
989
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD),
990
0
                              "timer already armed",
991
0
                              entry.fd);
992
993
0
            entry.deferred.reject(std::runtime_error("Timer is already armed"));
994
0
            return;
995
0
        }
996
997
0
        int res = -1;
998
#ifdef _USE_LIBEVENT
999
        assert(entry.fd != PS_FD_EMPTY);
1000
        res = EventMethFns::setEmEventTime(entry.fd, &(entry.value));
1001
#else
1002
0
        itimerspec spec;
1003
0
        spec.it_interval.tv_sec  = 0;
1004
0
        spec.it_interval.tv_nsec = 0;
1005
1006
0
        if (entry.value.count() < 1000)
1007
0
        {
1008
0
            spec.it_value.tv_sec  = 0;
1009
0
            spec.it_value.tv_nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(entry.value)
1010
0
                                        .count();
1011
0
        }
1012
0
        else
1013
0
        {
1014
0
            spec.it_value.tv_sec  = std::chrono::duration_cast<std::chrono::seconds>(entry.value).count();
1015
0
            spec.it_value.tv_nsec = 0;
1016
0
        }
1017
1018
0
        res = timerfd_settime(entry.fd, 0, &spec, nullptr);
1019
0
#endif
1020
0
        if (res == -1)
1021
0
        {
1022
0
            PST_DBG_DECL_SE_ERR_P_EXTRA;
1023
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", ernno %d %s",
1024
0
                              entry.fd, errno, PST_STRERROR_R_ERRNO);
1025
1026
0
            entry.deferred.reject(Pistache::Error::system("Could not set timer time"));
1027
0
            return;
1028
0
        }
1029
1030
0
        reactor()->registerFdOneShot(key(), entry.fd, NotifyOn::Read,
1031
0
                                     Polling::Mode::Edge);
1032
1033
0
        PS_LOG_DEBUG_ARGS("Timer %" PIST_QUOTE(PS_FD_PRNTFCD) " inserting into timers", entry.fd);
1034
0
        timers.insert(std::make_pair(entry.fd, std::move(entry)));
1035
0
    }
1036
1037
    void Transport::handleWriteQueue(bool flush)
1038
0
    {
1039
        // Let's drain the queue
1040
0
        for (;;)
1041
0
        {
1042
0
            auto write = writesQueue.popSafe();
1043
0
            if (!write)
1044
0
                break;
1045
1046
0
            auto fd = write->peerFd;
1047
0
            if (fd == PS_FD_EMPTY)
1048
0
                continue;
1049
0
            if (!isPeerFd(fd))
1050
0
                continue;
1051
1052
0
            {
1053
0
                Guard guard(toWriteLock);
1054
0
                toWrite[fd].push_back(std::move(*write));
1055
0
            }
1056
1057
0
            if (flush)
1058
0
                asyncWriteImpl(fd);
1059
0
            else
1060
0
                reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
1061
0
                                    Polling::Mode::Edge);
1062
0
        }
1063
0
    }
1064
1065
    void Transport::handleTimerQueue()
1066
0
    {
1067
0
        PS_TIMEDBG_START_THIS;
1068
1069
0
        for (;;)
1070
0
        {
1071
0
            auto timer = timersQueue.popSafe();
1072
0
            if (!timer)
1073
0
                break;
1074
1075
0
            armTimerMsImpl(std::move(*timer));
1076
0
        }
1077
0
    }
1078
1079
    void Transport::handlePeerQueue()
1080
0
    {
1081
0
        PS_TIMEDBG_START_THIS;
1082
1083
0
        for (;;)
1084
0
        {
1085
0
            auto data = peersQueue.popSafe();
1086
0
            PS_LOG_DEBUG_ARGS("data %p", data.get());
1087
0
            if (!data)
1088
0
                break;
1089
1090
0
            handlePeer(data->peer);
1091
0
        }
1092
0
    }
1093
1094
    void Transport::handlePeer(const std::shared_ptr<Peer>& peer)
1095
0
    {
1096
0
        PS_TIMEDBG_START_THIS;
1097
1098
0
        Fd fd = peer->fd();
1099
0
        if (fd == PS_FD_EMPTY)
1100
0
        {
1101
0
            PS_LOG_DEBUG("Empty Fd");
1102
0
            return;
1103
0
        }
1104
1105
0
        {
1106
            // See comment in transport.h on why peers_ must be mutex-protected
1107
0
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
1108
1109
0
            auto auto_insert_res_pr = peers_.insert(std::make_pair(fd, peer));
1110
0
            if (!auto_insert_res_pr.second)
1111
0
                PS_LOG_WARNING_ARGS("Failed to insert peer %p", peer.get());
1112
0
        }
1113
1114
0
        peer->associateTransport(this);
1115
1116
0
        handler_->onConnection(peer);
1117
0
        reactor()->registerFd(key(), fd, NotifyOn::Read | NotifyOn::Shutdown,
1118
0
                              Polling::Mode::Edge);
1119
0
    }
1120
1121
    void Transport::handleNotify()
1122
0
    {
1123
0
        PS_TIMEDBG_START_THIS;
1124
1125
0
        while (this->notifier.tryRead())
1126
0
            ;
1127
1128
0
        PST_RUSAGE now;
1129
1130
0
        auto res = PST_GETRUSAGE(
1131
#ifdef _USE_LIBEVENT_LIKE_APPLE
1132
            PST_RUSAGE_SELF, // usage for whole process, not just current
1133
                             // thread (macOS getrusage doesn't support
1134
                             // RUSAGE_THREAD)
1135
#else
1136
0
            RUSAGE_THREAD,
1137
0
#endif
1138
0
            &now);
1139
0
        if (res == -1)
1140
0
            loadRequest_.reject(std::runtime_error("Could not compute usage"));
1141
1142
0
        loadRequest_.resolve(now);
1143
0
        loadRequest_.clear();
1144
0
    }
1145
1146
    void Transport::handleTimer(TimerEntry entry)
1147
0
    {
1148
0
        PS_TIMEDBG_START_THIS;
1149
1150
0
        if (entry.isActive())
1151
0
        {
1152
0
            uint64_t numWakeups;
1153
1154
0
            auto res = READ_FD(entry.fd, &numWakeups, sizeof numWakeups);
1155
0
            if (res == -1)
1156
0
            {
1157
0
                if (errno == EAGAIN || errno == EWOULDBLOCK)
1158
0
                    return;
1159
0
                else
1160
0
                    entry.deferred.reject(
1161
0
                        Pistache::Error::system("Could not read timerfd"));
1162
0
            }
1163
0
            else
1164
0
            {
1165
0
                if (res != sizeof(numWakeups))
1166
0
                {
1167
0
                    entry.deferred.reject(
1168
0
                        Pistache::Error("Read invalid number of bytes for timer fd: " + std::to_string(GET_ACTUAL_FD(entry.fd))));
1169
0
                }
1170
0
                else
1171
0
                {
1172
0
                    entry.deferred.resolve(numWakeups);
1173
0
                }
1174
0
            }
1175
0
        }
1176
0
    }
1177
1178
    bool Transport::isPeerFd(FdConst fdconst) const
1179
0
    {
1180
0
        PS_TIMEDBG_START_THIS;
1181
1182
0
        std::lock_guard<std::mutex> l_guard(peers_mutex_);
1183
0
        return (isPeerFdNoPeersMutexLock(fdconst));
1184
0
    }
1185
1186
    // isPeerFdNoPeersMutexLock only when peers_mutex_ has been already locked
1187
    bool Transport::isPeerFdNoPeersMutexLock(FdConst fdconst) const
1188
0
    {
1189
0
        PS_TIMEDBG_START_THIS;
1190
1191
        // Can cast away const since we're not actually going to change fd
1192
0
        Fd fd = PS_CAST_AWAY_CONST_FD(fdconst);
1193
1194
0
        return peers_.find(fd) != std::end(peers_);
1195
0
    }
1196
1197
    bool Transport::isTimerFd(FdConst fdconst) const
1198
0
    {
1199
0
        PS_TIMEDBG_START_THIS;
1200
1201
        // Can cast away const since we're not actually going to change fd
1202
0
        Fd fd    = PS_CAST_AWAY_CONST_FD(fdconst);
1203
0
        bool res = (timers.find(fd) != std::end(timers));
1204
1205
0
        PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) " %s in timers",
1206
0
                          fdconst, (res ? "is" : "is not"));
1207
0
        return res;
1208
0
    }
1209
1210
    bool Transport::isPeerFd(Polling::Tag tag) const
1211
0
    {
1212
0
        PS_TIMEDBG_START_THIS;
1213
1214
0
        return isPeerFd(static_cast<FdConst>(tag.value()));
1215
0
    }
1216
    bool Transport::isTimerFd(Polling::Tag tag) const
1217
0
    {
1218
0
        return isTimerFd(static_cast<FdConst>(tag.value()));
1219
0
    }
1220
1221
    std::shared_ptr<Peer> Transport::getPeer(FdConst fdconst)
1222
0
    {
1223
0
        PS_TIMEDBG_START_THIS;
1224
1225
        // Can cast away const since we're not actually going to change fd
1226
0
        Fd fd = PS_CAST_AWAY_CONST_FD(fdconst);
1227
1228
        // See comment in transport.h on why peers_ must be mutex-protected
1229
0
        std::lock_guard<std::mutex> l_guard(peers_mutex_);
1230
1231
0
        auto it = peers_.find(fd);
1232
0
        if (it == std::end(peers_))
1233
0
        {
1234
0
            throw std::runtime_error("No peer found for fd: " + std::to_string(GET_ACTUAL_FD(fd)));
1235
0
        }
1236
0
        return it->second;
1237
0
    }
1238
1239
    std::shared_ptr<Peer> Transport::getPeer(Polling::Tag tag)
1240
0
    {
1241
0
        return getPeer(static_cast<FdConst>(tag.value()));
1242
0
    }
1243
1244
    std::deque<std::shared_ptr<Peer>> Transport::getAllPeer()
1245
0
    {
1246
0
        std::deque<std::shared_ptr<Peer>> dqPeers;
1247
1248
0
        {
1249
            // See comment in transport.h on why peers_ must be mutex-protected
1250
0
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
1251
1252
0
            for (const auto& peerPair : peers_)
1253
0
            {
1254
0
                if (isPeerFdNoPeersMutexLock(peerPair.first))
1255
0
                {
1256
0
                    dqPeers.push_back(peerPair.second);
1257
0
                }
1258
0
            }
1259
0
        }
1260
1261
0
        return dqPeers;
1262
0
    }
1263
1264
} // namespace Pistache::Tcp