Coverage Report

Created: 2024-10-13 14:14

/src/pistache/src/common/transport.cc
Line
Count
Source (jump to first uncovered line)
1
/*
2
 * SPDX-FileCopyrightText: 2017 Mathieu Stefani
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5
 */
6
7
/* traqnsport.cc
8
   Mathieu Stefani, 02 July 2017
9
10
   TCP transport handling
11
12
*/
13
14
#include <pistache/eventmeth.h>
15
16
#ifdef _USE_LIBEVENT_LIKE_APPLE
17
// For sendfile(...) function
18
#include <sys/types.h>
19
#include <sys/uio.h>
20
21
#include <netinet/tcp.h> // for TCP_NOPUSH
22
#endif
23
24
#ifdef __linux__
25
#include <sys/sendfile.h>
26
#endif
27
28
#ifdef _IS_BSD
29
#include <unistd.h> // for lseek
30
#endif
31
32
#ifndef _USE_LIBEVENT_LIKE_APPLE
33
// Note: sys/timerfd.h is linux-only (and certainly POSIX only)
34
#include <sys/timerfd.h>
35
#endif
36
37
#include <pistache/os.h>
38
#include <pistache/peer.h>
39
#include <pistache/tcp.h>
40
#include <pistache/transport.h>
41
#include <pistache/utils.h>
42
43
using std::to_string;
44
45
namespace Pistache::Tcp
46
{
47
    using namespace Polling;
48
49
    Transport::Transport(const std::shared_ptr<Tcp::Handler>& handler)
50
#ifdef _USE_LIBEVENT_LIKE_APPLE
51
        : tcp_prot_num_(-1)
52
#endif
53
0
    {
54
0
        init(handler);
55
0
    }
56
57
    void Transport::init(const std::shared_ptr<Tcp::Handler>& handler)
58
0
    {
59
        // Note: In the _USE_LIBEVENT case, EventMeth::init() called out of
60
        // EventMethFns::create
61
62
0
        handler_ = handler;
63
0
        handler_->associateTransport(this);
64
65
#ifdef _USE_LIBEVENT_LIKE_APPLE
66
        // SOL_TCP not defined in macOS Nov 2023
67
        const struct protoent* pe = getprotobyname("tcp");
68
69
        // TCP protocol num on this host
70
        tcp_prot_num_ = pe ? pe->p_proto : 6; // it's usually 6...
71
#endif
72
0
    }
73
74
    Transport::~Transport()
75
0
    {
76
0
        removeAllPeers();
77
0
    }
78
79
    std::shared_ptr<Aio::Handler> Transport::clone() const
80
0
    {
81
0
        return std::make_shared<Transport>(handler_->clone());
82
0
    }
83
84
    void Transport::flush()
85
0
    {
86
0
        handleWriteQueue(true);
87
0
    }
88
89
    void Transport::registerPoller(Polling::Epoll& poller)
90
0
    {
91
0
        PS_TIMEDBG_START_THIS;
92
93
0
        writesQueue.bind(poller);
94
0
        timersQueue.bind(poller);
95
0
        peersQueue.bind(poller);
96
0
        notifier.bind(poller);
97
98
#ifdef _USE_LIBEVENT
99
        epoll_fd = poller.getEventMethEpollEquiv();
100
#endif
101
0
    }
102
103
    void Transport::unregisterPoller(Polling::Epoll& poller)
104
0
    {
105
0
        PS_TIMEDBG_START_THIS;
106
107
#ifdef _USE_LIBEVENT
108
        epoll_fd = NULL;
109
#endif
110
111
0
        notifier.unbind(poller);
112
0
        peersQueue.unbind(poller);
113
0
        timersQueue.unbind(poller);
114
0
        writesQueue.unbind(poller);
115
0
    }
116
117
    void Transport::handleNewPeer(const std::shared_ptr<Tcp::Peer>& peer)
118
0
    {
119
0
        auto ctx                   = context();
120
0
        const bool isInRightThread = std::this_thread::get_id() == ctx.thread();
121
0
        if (!isInRightThread)
122
0
        {
123
0
            PeerEntry entry(peer);
124
0
            peersQueue.push(std::move(entry));
125
0
        }
126
0
        else
127
0
        {
128
0
            handlePeer(peer);
129
0
        }
130
131
0
        Guard guard(toWriteLock);
132
0
        Fd fd = peer->fd();
133
0
        if (fd == PS_FD_EMPTY)
134
0
        {
135
0
            PS_LOG_DEBUG("Empty Fd");
136
0
            return;
137
0
        }
138
139
0
        toWrite.emplace(fd, std::deque<WriteEntry> {});
140
0
    }
141
142
#ifdef DEBUG
143
    static void logFdAndNotifyOn(const Aio::FdSet::Entry& entry)
144
    {
145
#ifdef _USE_LIBEVENT
146
        std::string str("entry");
147
#else
148
        std::string str("fd ");
149
150
        std::stringstream ss;
151
        ss << ((Fd)entry.getTag().value());
152
        str += ss.str();
153
#endif
154
155
        const char* flag_str = NULL;
156
157
        if (entry.isReadable())
158
            flag_str = " readable";
159
        if (entry.isWritable())
160
            flag_str = " writable";
161
        if (entry.isHangup())
162
            flag_str = " hangup";
163
        if (!flag_str)
164
            flag_str = " <unknown>";
165
        str += flag_str;
166
167
        PS_LOG_DEBUG_ARGS("%s", str.c_str());
168
    }
169
170
#define PS_LOG_DBG_FD_AND_NOTIFY logFdAndNotifyOn(entry)
171
#else
172
#define PS_LOG_DBG_FD_AND_NOTIFY
173
#endif
174
175
    void Transport::onReady(const Aio::FdSet& fds)
176
0
    {
177
0
        PS_LOG_DEBUG_ARGS("%d fds", fds.size());
178
179
0
        for (const auto& entry : fds)
180
0
        {
181
0
            PS_LOG_DBG_FD_AND_NOTIFY;
182
183
0
            if (entry.getTag() == writesQueue.tag())
184
0
            {
185
0
                PS_LOG_DEBUG("Write queue");
186
0
                handleWriteQueue();
187
0
            }
188
0
            else if (entry.getTag() == timersQueue.tag())
189
0
            {
190
0
                PS_LOG_DEBUG("Timers queue");
191
0
                handleTimerQueue();
192
0
            }
193
0
            else if (entry.getTag() == peersQueue.tag())
194
0
            {
195
0
                PS_LOG_DEBUG("Peers queue");
196
0
                handlePeerQueue();
197
0
            }
198
0
            else if (entry.getTag() == notifier.tag())
199
0
            {
200
0
                PS_LOG_DEBUG("notifier");
201
0
                handleNotify();
202
0
            }
203
204
0
            else if (entry.isReadable())
205
0
            {
206
0
                auto tag = entry.getTag();
207
0
                PS_LOG_DEBUG_ARGS("entry isReadable fd %" PIST_QUOTE(PS_FD_PRNTFCD),
208
0
                                  ((Fd)tag.value()));
209
210
0
                if (isPeerFd(tag))
211
0
                {
212
0
                    auto peer = getPeer(tag);
213
0
                    PS_LOG_DEBUG("handleIncoming");
214
0
                    handleIncoming(peer);
215
0
                }
216
0
                else if (isTimerFd(tag))
217
0
                {
218
0
                    auto it      = timers.find(static_cast<decltype(timers)::key_type>(tag.value()));
219
0
                    auto& entry_ = it->second;
220
0
                    PS_LOG_DEBUG("handleTimer");
221
0
                    handleTimer(std::move(entry_));
222
0
                    PS_LOG_DEBUG_ARGS("Timer %" PIST_QUOTE(PS_FD_PRNTFCD) " erased from timers",
223
0
                                      it->first);
224
225
0
                    PS_LOG_DEBUG_ARGS("Timer %" PIST_QUOTE(PS_FD_PRNTFCD) " erasing from timers",
226
0
                                      it->first);
227
0
                    timers.erase(it->first);
228
0
                }
229
0
                else
230
0
                {
231
0
                    PS_LOG_DEBUG("neither peer nor timer");
232
0
                }
233
0
            }
234
0
            else if (entry.isWritable())
235
0
            {
236
0
                PS_LOG_DEBUG("isWritable");
237
238
0
                auto tag        = entry.getTag();
239
0
                FdConst fdconst = static_cast<FdConst>(tag.value());
240
                // Since fd is about to be written to, it isn't really const,
241
                // and we cast away the const
242
0
                Fd fd = ((Fd)fdconst);
243
244
0
                {
245
0
                    Guard guard(toWriteLock);
246
0
                    auto it = toWrite.find(fd);
247
0
                    if (it == std::end(toWrite))
248
0
                    {
249
0
                        throw std::runtime_error(
250
0
                            "Assertion Error: could not find write data");
251
0
                    }
252
0
                }
253
254
0
                reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
255
256
0
                PS_LOG_DEBUG("asyncWriteImpl (drain queue)");
257
                // Try to drain the queue
258
0
                asyncWriteImpl(fd);
259
0
            }
260
0
        }
261
0
    }
262
263
    void Transport::disarmTimer(Fd fd)
264
0
    {
265
0
        PS_TIMEDBG_START_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
266
267
0
        auto it = timers.find(fd);
268
0
        if (it == std::end(timers))
269
0
            throw std::runtime_error("Timer has not been armed");
270
271
0
        auto& entry = it->second;
272
0
        entry.disable();
273
0
    }
274
275
    void Transport::handleIncoming(const std::shared_ptr<Peer>& peer)
276
0
    {
277
0
        if (!peer)
278
0
        {
279
0
            PS_LOG_DEBUG("Null peer");
280
0
            return;
281
0
        }
282
283
0
        char buffer[Const::MaxBuffer] = { 0 };
284
285
0
        ssize_t totalBytes = 0;
286
0
        int fdactual       = peer->actualFd();
287
0
        if (fdactual < 0)
288
0
        {
289
0
            PS_LOG_DEBUG_ARGS("Peer %p has no actual Fd", peer.get());
290
0
            return;
291
0
        }
292
293
0
        for (;;)
294
0
        {
295
296
0
            ssize_t bytes;
297
298
#ifdef PISTACHE_USE_SSL
299
            if (peer->ssl() != NULL)
300
            {
301
                PS_LOG_DEBUG("SSL_read");
302
303
                bytes = SSL_read((SSL*)peer->ssl(), buffer + totalBytes,
304
                                 static_cast<int>(Const::MaxBuffer - totalBytes));
305
            }
306
            else
307
            {
308
#endif /* PISTACHE_USE_SSL */
309
0
                PS_LOG_DEBUG("recv (read)");
310
0
                bytes = recv(fdactual, buffer + totalBytes, Const::MaxBuffer - totalBytes, 0);
311
#ifdef PISTACHE_USE_SSL
312
            }
313
#endif /* PISTACHE_USE_SSL */
314
315
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", "
316
0
                                                               "bytes read %d, totalBytes %d, "
317
0
                                                               "err %d %s",
318
0
                              peer->fd(), bytes, totalBytes,
319
0
                              (bytes < 0) ? errno : 0,
320
0
                              (bytes < 0) ? strerror(errno) : "");
321
322
0
            if (bytes == -1)
323
0
            {
324
0
                if (errno == EAGAIN || errno == EWOULDBLOCK)
325
0
                {
326
0
                    if (totalBytes > 0)
327
0
                    {
328
0
                        handler_->onInput(buffer, totalBytes, peer);
329
0
                    }
330
0
                }
331
0
                else
332
0
                {
333
0
                    handlePeerDisconnection(peer);
334
0
                }
335
0
                break;
336
0
            }
337
0
            else if (bytes == 0)
338
0
            {
339
0
                handlePeerDisconnection(peer);
340
0
                break;
341
0
            }
342
343
0
            else
344
0
            {
345
0
                handler_->onInput(buffer, bytes, peer);
346
0
            }
347
0
        }
348
0
    }
349
350
    void Transport::handlePeerDisconnection(const std::shared_ptr<Peer>& peer)
351
0
    {
352
0
        handler_->onDisconnection(peer);
353
354
0
        removePeer(peer);
355
0
    }
356
357
    void Transport::removePeer(const std::shared_ptr<Peer>& peer)
358
0
    {
359
0
        Fd fd = peer->fd();
360
0
        if (fd == PS_FD_EMPTY)
361
0
        {
362
0
            PS_LOG_DEBUG("Empty Fd");
363
0
            return;
364
0
        }
365
0
        {
366
            // See comment in transport.h on why peers_ must be mutex-protected
367
0
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
368
369
0
            auto it = peers_.find(fd);
370
0
            if (it == std::end(peers_))
371
0
            {
372
0
                PS_LOG_WARNING_ARGS("peer %p not found in peers_", peer.get());
373
0
            }
374
0
            else
375
0
            {
376
0
                peers_.erase(it);
377
0
            }
378
0
        }
379
380
        // Don't rely on close deleting this FD from the epoll "interest" list.
381
        // This is needed in case the FD has been shared with another process.
382
        // Sharing should no longer happen by accident as SOCK_CLOEXEC is now set on
383
        // listener accept. This should then guarantee that the next call to
384
        // epoll_wait will not give us any events relating to this FD even if they
385
        // have been queued in the kernel since the last call to epoll_wait.
386
0
        Aio::Reactor* r = reactor();
387
0
        if (r) // or if r is NULL then reactor has been detached already
388
0
            r->removeFd(key(), fd);
389
390
0
        peer->closeFd();
391
0
    }
392
393
    void Transport::closeFd(Fd fd)
394
0
    {
395
0
        if (fd == PS_FD_EMPTY)
396
0
        {
397
0
            PS_LOG_DEBUG("Trying to close empty Fd");
398
0
            return;
399
0
        }
400
401
0
        Guard guard(toWriteLock);
402
0
        toWrite.erase(fd); // Clean up write buffers
403
404
0
        CLOSE_FD(fd);
405
0
    }
406
407
    void Transport::removeAllPeers()
408
0
    {
409
0
        PS_TIMEDBG_START_THIS;
410
411
0
        for (;;)
412
0
        {
413
0
            std::shared_ptr<Peer> peer;
414
415
0
            { // encapsulate
416
0
                std::lock_guard<std::mutex> l_guard(peers_mutex_);
417
0
                auto it = peers_.begin();
418
0
                if (it == peers_.end())
419
0
                    break;
420
421
0
                peer = it->second;
422
0
                if (!peer)
423
0
                {
424
0
                    peers_.erase(it);
425
0
                    PS_LOG_DEBUG("peer NULL");
426
0
                    continue;
427
0
                }
428
0
            }
429
430
0
            removePeer(peer); // removePeer locks mutex, erases peer from peers_
431
0
        }
432
0
    }
433
434
    void Transport::asyncWriteImpl(Fd fd)
435
0
    {
436
0
        PS_TIMEDBG_START_THIS;
437
438
0
        bool stop = false;
439
0
        while (!stop)
440
0
        {
441
0
            std::unique_lock<std::mutex> lock(toWriteLock);
442
443
0
            auto it = toWrite.find(fd);
444
445
            // cleanup will have been handled by handlePeerDisconnection
446
0
            if (it == std::end(toWrite))
447
0
            {
448
0
                PS_LOG_DEBUG_ARGS("Failed to find fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
449
0
                return;
450
0
            }
451
0
            auto& wq = it->second;
452
0
            if (wq.empty())
453
0
            {
454
0
                PS_LOG_DEBUG("wq empty");
455
0
                break;
456
0
            }
457
458
0
            auto& entry = wq.front();
459
0
            int flags   = entry.flags;
460
#ifdef _USE_LIBEVENT_LIKE_APPLE
461
            bool msg_more_style = entry.msg_more_style;
462
#endif
463
0
            BufferHolder& buffer              = entry.buffer;
464
0
            Async::Deferred<ssize_t> deferred = std::move(entry.deferred);
465
466
0
            auto cleanUp = [&]() {
467
0
                wq.pop_front();
468
0
                if (wq.empty())
469
0
                {
470
0
                    PS_LOG_DEBUG_ARGS("Erasing fd %" PIST_QUOTE(PS_FD_PRNTFCD) " from toWrite", fd);
471
0
                    toWrite.erase(fd);
472
0
                    reactor()->modifyFd(key(), fd, NotifyOn::Read, Polling::Mode::Edge);
473
0
                    stop = true;
474
0
                }
475
0
                lock.unlock();
476
0
            };
477
478
0
            size_t totalWritten = buffer.offset();
479
0
            for (;;)
480
0
            {
481
0
                ssize_t bytesWritten = 0;
482
0
                auto len             = buffer.size() - totalWritten;
483
484
0
                if (buffer.isRaw())
485
0
                {
486
0
                    PS_LOG_DEBUG_ARGS("sendRawBuffer fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", len %d",
487
0
                                      fd, len);
488
489
0
                    auto raw        = buffer.raw();
490
0
                    const auto* ptr = raw.data().c_str() + totalWritten;
491
0
                    bytesWritten    = sendRawBuffer(fd, ptr, len, flags
492
#ifdef _USE_LIBEVENT_LIKE_APPLE
493
                                                 ,
494
                                                 msg_more_style
495
#endif
496
0
                    );
497
0
                }
498
0
                else
499
0
                {
500
0
                    PS_LOG_DEBUG_ARGS("sendFile fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", len %d",
501
0
                                      fd, len);
502
503
0
                    auto file    = buffer.fd();
504
0
                    off_t offset = totalWritten;
505
0
                    bytesWritten = sendFile(fd, file, offset, len);
506
0
                }
507
0
                if (bytesWritten < 0)
508
0
                {
509
0
                    PS_LOG_DEBUG_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD) " errno %d %s",
510
0
                                      fd, errno, strerror(errno));
511
512
0
                    if (errno == EAGAIN || errno == EWOULDBLOCK)
513
0
                    {
514
0
                        auto bufferHolder = buffer.detach(totalWritten);
515
516
                        // pop_front kills buffer - so we cannot continue loop or use buffer
517
                        // after this point
518
0
                        wq.pop_front();
519
0
                        wq.push_front(WriteEntry(std::move(deferred),
520
0
                                                 bufferHolder, fd, flags
521
#ifdef _USE_LIBEVENT_LIKE_APPLE
522
                                                 ,
523
                                                 msg_more_style
524
#endif
525
0
                                                 ));
526
0
                        reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
527
0
                                            Polling::Mode::Edge);
528
0
                    }
529
                    // EBADF can happen when the HTTP parser, in the case of
530
                    // an error, closes fd before the entire request is processed.
531
                    // https://github.com/pistacheio/pistache/issues/501
532
0
                    else if (errno == EBADF || errno == EPIPE || errno == ECONNRESET)
533
0
                    {
534
0
                        PS_LOG_DEBUG_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD) " EBADF/EPIPE/ECONNRESET so erasing",
535
0
                                          fd);
536
0
                        wq.pop_front();
537
0
                        toWrite.erase(fd);
538
0
                        stop = true;
539
0
                    }
540
0
                    else
541
0
                    {
542
0
                        PS_LOG_DEBUG_ARGS("fd %" PIST_QUOTE(PS_FD_PRNTFCD) " rejecting write attempt", fd);
543
0
                        cleanUp();
544
0
                        deferred.reject(Pistache::Error::system("Could not write data"));
545
0
                    }
546
0
                    break;
547
0
                }
548
0
                else
549
0
                {
550
0
                    totalWritten += bytesWritten;
551
0
                    if (totalWritten >= buffer.size())
552
0
                    {
553
0
                        if (buffer.isFile())
554
0
                        {
555
0
                            PS_LOG_DEBUG_ARGS("file ::close actual_fd %d",
556
0
                                              buffer.fd());
557
558
                            // done with the file buffer, nothing else knows
559
                            // whether to close it with the way the code is
560
                            // written.
561
0
                            ::close(buffer.fd());
562
0
                        }
563
564
0
                        cleanUp();
565
566
                        // Cast to match the type of defered template
567
                        // to avoid a BadType exception
568
0
                        deferred.resolve(static_cast<ssize_t>(totalWritten));
569
0
                        break;
570
0
                    }
571
0
                }
572
0
            }
573
0
        }
574
0
    }
575
576
#ifdef _USE_LIBEVENT_LIKE_APPLE
577
    void Transport::configureMsgMoreStyle(Fd fd, bool msg_more_style)
578
    {
579
        int tcp_no_push  = 0;
580
        socklen_t len    = sizeof(tcp_no_push);
581
        int sock_opt_res = -1;
582
583
#ifdef __NetBSD__
584
        { // encapsulate
585
            int tcp_nodelay = 0;
586
            sock_opt_res    = getsockopt(GET_ACTUAL_FD(fd), tcp_prot_num_,
587
                                         TCP_NODELAY, &tcp_nodelay, &len);
588
            if (sock_opt_res == 0)
589
                tcp_no_push = !tcp_nodelay;
590
        }
591
#else
592
        sock_opt_res = getsockopt(GET_ACTUAL_FD(fd), tcp_prot_num_,
593
#if defined __APPLE__ || defined _IS_BSD
594
                                  TCP_NOPUSH,
595
#else
596
                                  TCP_CORK,
597
#endif
598
                                  &tcp_no_push, &len);
599
#endif // of ifdef __NetBSD__ ... else
600
601
        if (sock_opt_res == 0)
602
        {
603
            if (((tcp_no_push == 0) && (msg_more_style)) || ((tcp_no_push != 0) && (!msg_more_style)))
604
            {
605
                PS_LOG_DEBUG_ARGS("Setting MSG_MORE style to %s",
606
                                  (msg_more_style) ? "on" : "off");
607
608
                int optval =
609
#ifdef __NetBSD__
610
                    // In NetBSD case we're getting/setting (or resetting) the
611
                    // TCP_NODELAY socket option, which _stops_ data being held
612
                    // prior to send, whereas in Linux, macOS, FreeBSD or
613
                    // OpenBSD we're using TCP_CORK/TCP_NOPUSH which may
614
                    // _cause_ data to be held prior to send. I.e. they're
615
                    // opposites.
616
                    msg_more_style ? 0 : 1;
617
#else
618
                    msg_more_style ? 1 : 0;
619
#endif
620
621
                tcp_no_push  = msg_more_style ? 1 : 0;
622
                sock_opt_res = setsockopt(GET_ACTUAL_FD(fd), tcp_prot_num_,
623
#ifdef __NetBSD__
624
                                          TCP_NODELAY,
625
#elif defined __APPLE__ || defined _IS_BSD
626
                                          TCP_NOPUSH,
627
#else
628
                                          TCP_CORK,
629
#endif
630
                                          &optval, len);
631
                if (sock_opt_res < 0)
632
                    throw std::runtime_error("setsockopt failed");
633
            }
634
#ifdef DEBUG
635
            else
636
            {
637
                PS_LOG_DEBUG_ARGS("MSG_MORE style is already %s",
638
                                  (tcp_no_push) ? "on" : "off");
639
            }
640
#endif
641
        }
642
        else
643
        {
644
            PS_LOG_DEBUG_ARGS("getsockopt failed for fd %p, actual fd %d, "
645
                              "errno %d, err %s",
646
                              fd, GET_ACTUAL_FD(fd), errno, strerror(errno));
647
            throw std::runtime_error("getsockopt failed");
648
        }
649
    }
650
#endif // of ifdef _USE_LIBEVENT_LIKE_APPLE
651
652
    ssize_t Transport::sendRawBuffer(Fd fd, const char* buffer, size_t len,
653
                                     int flags
654
#ifdef _USE_LIBEVENT_LIKE_APPLE
655
                                     ,
656
                                     bool msg_more_style
657
#endif
658
    )
659
0
    {
660
0
        ssize_t bytesWritten = 0;
661
662
#ifdef PISTACHE_USE_SSL
663
        bool it_second_ssl_is_null = false;
664
665
        {
666
            // See comment in transport.h on why peers_ must be mutex-protected
667
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
668
669
            auto it_ = peers_.find(fd);
670
671
            if (it_ == std::end(peers_))
672
                throw std::runtime_error(
673
                    "No peer found for fd: " + to_string(fd));
674
675
            it_second_ssl_is_null = (it_->second->ssl() == NULL);
676
677
            if (!it_second_ssl_is_null)
678
            {
679
                auto ssl_ = static_cast<SSL*>(it_->second->ssl());
680
                PS_LOG_DEBUG_ARGS("SSL_write, len %d", (int)len);
681
682
                bytesWritten = SSL_write(ssl_, buffer, static_cast<int>(len));
683
            }
684
        }
685
686
        if (it_second_ssl_is_null)
687
        {
688
#endif /* PISTACHE_USE_SSL */
689
            // MSG_NOSIGNAL is used to prevent SIGPIPE on client connection termination
690
691
#ifdef _USE_LIBEVENT_LIKE_APPLE
692
            configureMsgMoreStyle(fd, msg_more_style);
693
#endif
694
695
0
            PS_LOG_DEBUG_ARGS("::send, fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", actual_fd %d, len %d",
696
0
                              fd, GET_ACTUAL_FD(fd), (int)len);
697
698
0
            bytesWritten = ::send(GET_ACTUAL_FD(fd), buffer, len,
699
0
                                  flags | MSG_NOSIGNAL);
700
701
0
            PS_LOG_DEBUG_ARGS("bytesWritten = %d", bytesWritten);
702
703
#ifdef PISTACHE_USE_SSL
704
        }
705
#endif /* PISTACHE_USE_SSL */
706
707
0
        return bytesWritten;
708
0
    }
709
710
#ifdef _IS_BSD
711
    // This is the sendfile function prototype found in Linux. However,
712
    // sendfile does not exist in OpenBSD, so we make our own.
713
    //
714
    // https://www.man7.org/linux/man-pages/man2/sendfile.2.html
715
    // Copies FROM "in_fd" TO "out_fd"
716
    // Returns number of bytes written on success, -1 with errno set on error
717
    //
718
    // If offset is not NULL, then sendfile() does not modify the file offset
719
    // of in_fd; otherwise the file offset is adjusted to reflect the number of
720
    // bytes read from in_fd.
721
    ssize_t my_sendfile(int out_fd, int in_fd, off_t* offset, size_t count)
722
    {
723
        char buff[65536 + 16]; // 64KB is an efficient size for read/write
724
                               // blocks in most storage systems. The 16 bytes
725
                               // is to reduce risk of buffer overflow in the
726
                               // events of a bug.
727
728
        int read_errors           = 0;
729
        int write_errors          = 0;
730
        ssize_t bytes_written_res = 0;
731
732
        off_t in_fd_start_pos = -1;
733
734
        if (offset)
735
        {
736
            in_fd_start_pos = lseek(in_fd, 0, SEEK_CUR);
737
            if (in_fd_start_pos < 0)
738
            {
739
                PS_LOG_DEBUG("lseek error");
740
                return (in_fd_start_pos);
741
            }
742
743
            if (lseek(in_fd, *offset, SEEK_SET) < 0)
744
            {
745
                PS_LOG_DEBUG("lseek error");
746
                return (-1);
747
            }
748
        }
749
750
        for (;;)
751
        {
752
            size_t bytes_to_read = count ? std::min(sizeof(buff) - 16, count) : (sizeof(buff) - 16);
753
754
            ssize_t bytes_read = read(in_fd, &(buff[0]), bytes_to_read);
755
            if (bytes_read == 0) // End of file
756
                break;
757
758
            if (bytes_read < 0)
759
            {
760
                if ((errno == EINTR) || (errno == EAGAIN))
761
                {
762
                    PS_LOG_DEBUG("read-interrupted error");
763
764
                    read_errors++;
765
                    if (read_errors < 256)
766
                        continue;
767
768
                    PS_LOG_DEBUG("read-interrupted repeatedly error");
769
                    errno = EIO;
770
                }
771
772
                bytes_written_res = -1;
773
                break;
774
            }
775
            read_errors = 0;
776
777
            bool re_adjust_pos = false;
778
779
            if ((count) && (bytes_read > ((ssize_t)count)))
780
            {
781
                bytes_read    = ((ssize_t)count);
782
                re_adjust_pos = true;
783
            }
784
785
            if (offset)
786
            {
787
                *offset += bytes_read;
788
                if (re_adjust_pos)
789
                    lseek(in_fd, *offset, SEEK_SET);
790
            }
791
792
            auto p = &(buff[0]);
793
            while (bytes_read > 0)
794
            {
795
                ssize_t bytes_written = write(out_fd, p, bytes_read);
796
                if (bytes_written <= 0)
797
                {
798
                    if ((bytes_written == 0) || (errno == EINTR) || (errno == EAGAIN))
799
                    {
800
                        PS_LOG_DEBUG("write-interrupted error");
801
802
                        write_errors++;
803
                        if (write_errors < 256)
804
                            continue;
805
806
                        PS_LOG_DEBUG("write-interrupted repeatedly error");
807
                        errno = EIO;
808
                    }
809
810
                    bytes_written_res = -1;
811
                    break;
812
                }
813
                write_errors = 0;
814
815
                bytes_read -= bytes_written;
816
                p += bytes_written;
817
                bytes_written_res += bytes_written;
818
            }
819
820
            if (count)
821
                count -= bytes_read;
822
        }
823
824
        // if offset non null, set in_fd file pos to pos from start of this
825
        // function
826
        if ((offset) && (bytes_written_res >= 0) && (lseek(in_fd, in_fd_start_pos, SEEK_SET) < 0))
827
        {
828
            PS_LOG_DEBUG("lseek error");
829
            bytes_written_res = -1;
830
        }
831
832
        return (bytes_written_res);
833
    }
834
835
#define SENDFILE my_sendfile    
836
#else
837
0
#define SENDFILE ::sendfile
838
#endif // ifdef _IS_BSD
839
840
    ssize_t Transport::sendFile(Fd fd, int file, off_t offset, size_t len)
841
0
    {
842
0
        ssize_t bytesWritten = 0;
843
844
#ifdef PISTACHE_USE_SSL
845
        bool it_second_ssl_is_null = false;
846
847
        {
848
            // See comment in transport.h on why peers_ must be mutex-protected
849
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
850
            auto it_ = peers_.find(fd);
851
852
            if (it_ == std::end(peers_))
853
            {
854
                PS_LOG_WARNING_ARGS("No peer for fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
855
                PS_LOG_WARNING_ARGS("No peer found for fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", actual-fd %d",
856
                                    fd, GET_ACTUAL_FD(fd));
857
858
                throw std::runtime_error(
859
                    "No peer found for fd: " + to_string(fd));
860
            }
861
            it_second_ssl_is_null = (it_->second->ssl() == NULL);
862
863
            if (!it_second_ssl_is_null)
864
            {
865
                PS_LOG_DEBUG_ARGS("SSL_sendfile, len %d", len);
866
867
                auto ssl_    = static_cast<SSL*>(it_->second->ssl());
868
                bytesWritten = SSL_sendfile(ssl_, file, &offset, len);
869
            }
870
        }
871
872
        if (it_second_ssl_is_null)
873
        {
874
#endif /* PISTACHE_USE_SSL */
875
876
#ifdef DEBUG
877
            const char* sendfile_fn_name = PIST_QUOTE(SENDFILE);
878
#endif
879
0
            PS_LOG_DEBUG_ARGS(
880
0
                "%s fd %" PIST_QUOTE(PS_FD_PRNTFCD) " actual-fd %d, file fd %d, len %d", sendfile_fn_name,
881
0
                fd, GET_ACTUAL_FD(fd), file, len);
882
883
#ifdef _USE_LIBEVENT_LIKE_APPLE
884
            // !!!! Should we do configureMsgMoreStyle for SSL as well? And
885
            // same question in sendRawBuffer
886
            configureMsgMoreStyle(fd, false /*msg_more_style*/);
887
#endif
888
889
#ifdef __APPLE__
890
            off_t len_as_off_t = (off_t)len;
891
892
            // NB: The order of the first two parameters for ::sendfile are the
893
            // opposite way round in macOS vs. Linux
894
            // Also, in macOS sendfile returns zero for success, whereas in
895
            // Linux upon success it returns number of bytes written
896
            //
897
            // Although macOS sendfile man page is silent on the matter, by
898
            // experimentation it appears sendfile does not advance the file
899
            // position of "file", which is the same as the behavior described
900
            // in Linux sendfile man page
901
            int sendfile_res = ::sendfile(file, GET_ACTUAL_FD(fd),
902
                                          offset, &len_as_off_t,
903
                                          NULL, // no new prefix/suffix content
904
                                          0 /*reserved, must be zero*/);
905
906
            if (sendfile_res == 0)
907
            {
908
                bytesWritten = (ssize_t)len_as_off_t;
909
                offset += len_as_off_t; // to match what Linux sendfile does
910
            }
911
            else
912
            {
913
                bytesWritten = -1;
914
            }
915
916
#else
917
0
        bytesWritten = SENDFILE(GET_ACTUAL_FD(fd), file, &offset, len);
918
0
#endif
919
920
0
            PS_LOG_DEBUG_ARGS(
921
0
                "%s fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", bytesWritten %d",
922
0
                sendfile_fn_name, fd, bytesWritten);
923
924
#ifdef PISTACHE_USE_SSL
925
        }
926
#endif /* PISTACHE_USE_SSL */
927
928
0
        return bytesWritten;
929
0
    }
930
931
    void Transport::armTimerMs(Fd fd, std::chrono::milliseconds value,
932
                               Async::Deferred<uint64_t> deferred)
933
0
    {
934
0
        PS_TIMEDBG_START_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
935
936
0
        auto ctx                   = context();
937
0
        const bool isInRightThread = std::this_thread::get_id() == ctx.thread();
938
0
        TimerEntry entry(fd, value, std::move(deferred));
939
940
0
        if (!isInRightThread)
941
0
        {
942
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", timersQueue.push",
943
0
                              fd);
944
0
            timersQueue.push(std::move(entry));
945
0
        }
946
0
        else
947
0
        {
948
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", armTimerMsImpl",
949
0
                              fd);
950
0
            armTimerMsImpl(std::move(entry));
951
0
        }
952
0
    }
953
954
    void Transport::armTimerMsImpl(TimerEntry entry)
955
0
    {
956
#ifdef DEBUG
957
        Fd entry_fd = entry.fd;
958
959
#ifdef _USE_LIBEVENT
960
        PS_LOG_DEBUG_ARGS("entry.fd %" PIST_QUOTE(PS_FD_PRNTFCD) ", "
961
                                                                 "fd->type %d",
962
                          entry_fd,
963
                          EventMethFns::getEmEventType(entry_fd));
964
#else
965
        PS_LOG_DEBUG_ARGS("entry.fd %" PIST_QUOTE(PS_FD_PRNTFCD),
966
                          entry_fd);
967
#endif
968
969
#endif
970
971
0
        auto it = timers.find(entry.fd);
972
0
        if (it != std::end(timers))
973
0
        {
974
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD),
975
0
                              "timer already armed",
976
0
                              entry.fd);
977
978
0
            entry.deferred.reject(std::runtime_error("Timer is already armed"));
979
0
            return;
980
0
        }
981
982
0
        int res = -1;
983
#ifdef _USE_LIBEVENT
984
        assert(entry.fd != PS_FD_EMPTY);
985
        res = EventMethFns::setEmEventTime(entry.fd, &(entry.value));
986
#else
987
0
        itimerspec spec;
988
0
        spec.it_interval.tv_sec  = 0;
989
0
        spec.it_interval.tv_nsec = 0;
990
991
0
        if (entry.value.count() < 1000)
992
0
        {
993
0
            spec.it_value.tv_sec  = 0;
994
0
            spec.it_value.tv_nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(entry.value)
995
0
                                        .count();
996
0
        }
997
0
        else
998
0
        {
999
0
            spec.it_value.tv_sec  = std::chrono::duration_cast<std::chrono::seconds>(entry.value).count();
1000
0
            spec.it_value.tv_nsec = 0;
1001
0
        }
1002
1003
0
        res = timerfd_settime(entry.fd, 0, &spec, nullptr);
1004
0
#endif
1005
0
        if (res == -1)
1006
0
        {
1007
0
            PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) ",  ernno %d %s",
1008
0
                              entry.fd, errno, strerror(errno));
1009
1010
0
            entry.deferred.reject(Pistache::Error::system("Could not set timer time"));
1011
0
            return;
1012
0
        }
1013
1014
0
        reactor()->registerFdOneShot(key(), entry.fd, NotifyOn::Read,
1015
0
                                     Polling::Mode::Edge);
1016
1017
0
        PS_LOG_DEBUG_ARGS("Timer %" PIST_QUOTE(PS_FD_PRNTFCD) " inserting into timers", entry.fd);
1018
0
        timers.insert(std::make_pair(entry.fd, std::move(entry)));
1019
0
    }
1020
1021
    void Transport::handleWriteQueue(bool flush)
1022
0
    {
1023
        // Let's drain the queue
1024
0
        for (;;)
1025
0
        {
1026
0
            auto write = writesQueue.popSafe();
1027
0
            if (!write)
1028
0
                break;
1029
1030
0
            auto fd = write->peerFd;
1031
0
            if (fd == PS_FD_EMPTY)
1032
0
                continue;
1033
0
            if (!isPeerFd(fd))
1034
0
                continue;
1035
1036
0
            {
1037
0
                Guard guard(toWriteLock);
1038
0
                toWrite[fd].push_back(std::move(*write));
1039
0
            }
1040
1041
0
            reactor()->modifyFd(key(), fd, NotifyOn::Read | NotifyOn::Write,
1042
0
                                Polling::Mode::Edge);
1043
1044
0
            if (flush)
1045
0
                asyncWriteImpl(fd);
1046
0
        }
1047
0
    }
1048
1049
    void Transport::handleTimerQueue()
1050
0
    {
1051
0
        PS_TIMEDBG_START_THIS;
1052
1053
0
        for (;;)
1054
0
        {
1055
0
            auto timer = timersQueue.popSafe();
1056
0
            if (!timer)
1057
0
                break;
1058
1059
0
            armTimerMsImpl(std::move(*timer));
1060
0
        }
1061
0
    }
1062
1063
    void Transport::handlePeerQueue()
1064
0
    {
1065
0
        PS_TIMEDBG_START_THIS;
1066
1067
0
        for (;;)
1068
0
        {
1069
0
            auto data = peersQueue.popSafe();
1070
0
            PS_LOG_DEBUG_ARGS("data %p", data.get());
1071
0
            if (!data)
1072
0
                break;
1073
1074
0
            handlePeer(data->peer);
1075
0
        }
1076
0
    }
1077
1078
    void Transport::handlePeer(const std::shared_ptr<Peer>& peer)
1079
0
    {
1080
0
        PS_TIMEDBG_START_THIS;
1081
1082
0
        Fd fd = peer->fd();
1083
0
        if (fd == PS_FD_EMPTY)
1084
0
        {
1085
0
            PS_LOG_DEBUG("Empty Fd");
1086
0
            return;
1087
0
        }
1088
1089
0
        {
1090
            // See comment in transport.h on why peers_ must be mutex-protected
1091
0
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
1092
1093
0
            auto auto_insert_res_pr = peers_.insert(std::make_pair(fd, peer));
1094
0
            if (!auto_insert_res_pr.second)
1095
0
                PS_LOG_WARNING_ARGS("Failed to insert peer %p", peer.get());
1096
0
        }
1097
1098
0
        peer->associateTransport(this);
1099
1100
0
        handler_->onConnection(peer);
1101
0
        reactor()->registerFd(key(), fd, NotifyOn::Read | NotifyOn::Shutdown,
1102
0
                              Polling::Mode::Edge);
1103
0
    }
1104
1105
    void Transport::handleNotify()
1106
0
    {
1107
0
        PS_TIMEDBG_START_THIS;
1108
1109
0
        while (this->notifier.tryRead())
1110
0
            ;
1111
1112
0
        rusage now;
1113
1114
0
        auto res = getrusage(
1115
#ifdef _USE_LIBEVENT_LIKE_APPLE
1116
            RUSAGE_SELF, // usage for whole process, not just current thread
1117
                         // (macOS getrusage doesn't support RUSAGE_THREAD)
1118
#else
1119
0
            RUSAGE_THREAD,
1120
0
#endif
1121
0
            &now);
1122
0
        if (res == -1)
1123
0
            loadRequest_.reject(std::runtime_error("Could not compute usage"));
1124
1125
0
        loadRequest_.resolve(now);
1126
0
        loadRequest_.clear();
1127
0
    }
1128
1129
    void Transport::handleTimer(TimerEntry entry)
1130
0
    {
1131
0
        PS_TIMEDBG_START_THIS;
1132
1133
0
        if (entry.isActive())
1134
0
        {
1135
0
            uint64_t numWakeups;
1136
1137
0
            auto res = READ_FD(entry.fd, &numWakeups, sizeof numWakeups);
1138
0
            if (res == -1)
1139
0
            {
1140
0
                if (errno == EAGAIN || errno == EWOULDBLOCK)
1141
0
                    return;
1142
0
                else
1143
0
                    entry.deferred.reject(
1144
0
                        Pistache::Error::system("Could not read timerfd"));
1145
0
            }
1146
0
            else
1147
0
            {
1148
0
                if (res != sizeof(numWakeups))
1149
0
                {
1150
0
                    entry.deferred.reject(
1151
0
                        Pistache::Error("Read invalid number of bytes for timer fd: " + std::to_string(GET_ACTUAL_FD(entry.fd))));
1152
0
                }
1153
0
                else
1154
0
                {
1155
0
                    entry.deferred.resolve(numWakeups);
1156
0
                }
1157
0
            }
1158
0
        }
1159
0
    }
1160
1161
    bool Transport::isPeerFd(FdConst fdconst) const
1162
0
    {
1163
0
        PS_TIMEDBG_START_THIS;
1164
1165
        // Cast away const so we can lock the mutex. Nonetheless, this function
1166
        // overall locks then unlocks the mutex, leaving it unchanged
1167
0
        Transport* non_const_this = (Transport*)this;
1168
1169
0
        std::lock_guard<std::mutex> l_guard(non_const_this->peers_mutex_);
1170
0
        return (isPeerFdNoPeersMutexLock(fdconst));
1171
0
    }
1172
1173
    // isPeerFdNoPeersMutexLock only when peers_mutex_ has been already locked
1174
    bool Transport::isPeerFdNoPeersMutexLock(FdConst fdconst) const
1175
0
    {
1176
0
        PS_TIMEDBG_START_THIS;
1177
1178
        // Can cast away const since we're not actually going to change fd
1179
0
        Fd fd = ((Fd)fdconst);
1180
1181
0
        return peers_.find(fd) != std::end(peers_);
1182
0
    }
1183
1184
    bool Transport::isTimerFd(FdConst fdconst) const
1185
0
    {
1186
0
        PS_TIMEDBG_START_THIS;
1187
1188
        // Can cast away const since we're not actually going to change fd
1189
0
        Fd fd    = ((Fd)fdconst);
1190
0
        bool res = (timers.find(fd) != std::end(timers));
1191
1192
0
        PS_LOG_DEBUG_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD) " %s in timers",
1193
0
                          fdconst, (res ? "is" : "is not"));
1194
0
        return res;
1195
0
    }
1196
1197
    bool Transport::isPeerFd(Polling::Tag tag) const
1198
0
    {
1199
0
        PS_TIMEDBG_START_THIS;
1200
1201
0
        return isPeerFd(static_cast<FdConst>(tag.value()));
1202
0
    }
1203
    bool Transport::isTimerFd(Polling::Tag tag) const
1204
0
    {
1205
0
        return isTimerFd(static_cast<FdConst>(tag.value()));
1206
0
    }
1207
1208
    std::shared_ptr<Peer> Transport::getPeer(FdConst fdconst)
1209
0
    {
1210
0
        PS_TIMEDBG_START_THIS;
1211
1212
        // Can cast away const since we're not actually going to change fd
1213
0
        Fd fd = ((Fd)fdconst);
1214
1215
        // See comment in transport.h on why peers_ must be mutex-protected
1216
0
        std::lock_guard<std::mutex> l_guard(peers_mutex_);
1217
1218
0
        auto it = peers_.find(fd);
1219
0
        if (it == std::end(peers_))
1220
0
        {
1221
0
            throw std::runtime_error("No peer found for fd: " + std::to_string(GET_ACTUAL_FD(fd)));
1222
0
        }
1223
0
        return it->second;
1224
0
    }
1225
1226
    std::shared_ptr<Peer> Transport::getPeer(Polling::Tag tag)
1227
0
    {
1228
0
        return getPeer(static_cast<FdConst>(tag.value()));
1229
0
    }
1230
1231
    std::deque<std::shared_ptr<Peer>> Transport::getAllPeer()
1232
0
    {
1233
0
        std::deque<std::shared_ptr<Peer>> dqPeers;
1234
1235
0
        {
1236
            // See comment in transport.h on why peers_ must be mutex-protected
1237
0
            std::lock_guard<std::mutex> l_guard(peers_mutex_);
1238
1239
0
            for (const auto& peerPair : peers_)
1240
0
            {
1241
0
                if (isPeerFdNoPeersMutexLock(peerPair.first))
1242
0
                {
1243
0
                    dqPeers.push_back(peerPair.second);
1244
0
                }
1245
0
            }
1246
0
        }
1247
1248
0
        return dqPeers;
1249
0
    }
1250
1251
} // namespace Pistache::Tcp