Coverage Report

Created: 2025-11-11 06:48

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/pistache/src/common/reactor.cc
Line
Count
Source
1
/*
2
 * SPDX-FileCopyrightText: 2016 Mathieu Stefani
3
 *
4
 * SPDX-License-Identifier: Apache-2.0
5
 */
6
7
/*
8
   Mathieu Stefani, 15 juin 2016
9
10
   Implementation of the Reactor
11
*/
12
13
#include <pistache/reactor.h>
14
15
#include <array>
16
#include <atomic>
17
#include <memory>
18
#include <mutex>
19
#include <string>
20
#include <unordered_map>
21
#include <vector>
22
23
#include <pistache/pist_quote.h>
24
#include <pistache/pist_timelog.h>
25
26
#ifdef _IS_BSD
27
// For pthread_set_name_np
28
#include PST_THREAD_HDR
29
#ifndef __NetBSD__
30
#include <pthread_np.h>
31
#endif
32
#endif
33
34
#ifdef _IS_WINDOWS
35
#include <windows.h> // Needed for PST_THREAD_HDR (processthreadsapi.h)
36
#include PST_THREAD_HDR // for SetThreadDescription
37
#endif
38
39
#ifdef _IS_WINDOWS
40
static std::atomic_bool lLoggedSetThreadDescriptionFail = false;
41
#ifdef __MINGW32__
42
43
#include <windows.h>
44
#include <libloaderapi.h> // for GetProcAddress and GetModuleHandleA
45
typedef HRESULT (WINAPI *TSetThreadDescription)(HANDLE, PCWSTR);
46
47
static std::atomic_bool lSetThreadDescriptionLoaded = false;
48
static std::mutex lSetThreadDescriptionLoadMutex;
49
static TSetThreadDescription lSetThreadDescriptionPtr = nullptr;
50
51
TSetThreadDescription getSetThreadDescriptionPtr()
52
{
53
    if (lSetThreadDescriptionLoaded)
54
        return(lSetThreadDescriptionPtr);
55
56
    GUARD_AND_DBG_LOG(lSetThreadDescriptionLoadMutex);
57
    if (lSetThreadDescriptionLoaded)
58
        return(lSetThreadDescriptionPtr);
59
60
    HMODULE hKernelBase = GetModuleHandleA("KernelBase.dll");
61
62
    if (!hKernelBase)
63
    {
64
        PS_LOG_WARNING(
65
            "Failed to get KernelBase.dll for SetThreadDescription");
66
        lSetThreadDescriptionLoaded = true;
67
        return(nullptr);
68
    }
69
70
    FARPROC set_thread_desc_fpptr =
71
        GetProcAddress(hKernelBase, "SetThreadDescription");
72
73
    // We do the cast in two steps, otherwise mingw-gcc complains about
74
    // incompatible types
75
    void * set_thread_desc_vptr =
76
        reinterpret_cast<void *>(set_thread_desc_fpptr);
77
    lSetThreadDescriptionPtr =
78
        reinterpret_cast<TSetThreadDescription>(set_thread_desc_vptr);
79
80
    lSetThreadDescriptionLoaded = true;
81
    if (!lSetThreadDescriptionPtr)
82
    {
83
        PS_LOG_WARNING(
84
            "Failed to get SetThreadDescription from KernelBase.dll");
85
    }
86
    return(lSetThreadDescriptionPtr);
87
}
88
#endif // of ifdef __MINGW32__
89
#endif // of ifdef _IS_WINDOWS
90
91
using namespace std::string_literals;
92
93
namespace Pistache::Aio
94
{
95
96
    class Reactor::Impl
97
    {
98
    public:
99
        Impl(Reactor* reactor)
100
0
            : reactor_(reactor)
101
0
        { }
102
103
0
        virtual ~Impl() = default;
104
105
        virtual Reactor::Key addHandler(
106
            const std::shared_ptr<Handler>& handler, bool setKey)
107
            = 0;
108
109
        virtual void detachFromReactor(
110
            const std::shared_ptr<Handler>& handler)
111
            = 0;
112
113
        virtual void detachAndRemoveAllHandlers() = 0;
114
115
        virtual std::vector<std::shared_ptr<Handler>>
116
        handlers(const Reactor::Key& key) const = 0;
117
118
        virtual void registerFd(const Reactor::Key& key, Fd fd,
119
                                Polling::NotifyOn interest, Polling::Tag tag,
120
                                Polling::Mode mode = Polling::Mode::Level)
121
            = 0;
122
123
        virtual void registerFdOneShot(const Reactor::Key& key, Fd fd,
124
                                       Polling::NotifyOn interest, Polling::Tag tag,
125
                                       Polling::Mode mode = Polling::Mode::Level)
126
            = 0;
127
128
        virtual void modifyFd(const Reactor::Key& key, Fd fd,
129
                              Polling::NotifyOn interest, Polling::Tag tag,
130
                              Polling::Mode mode = Polling::Mode::Level)
131
            = 0;
132
133
        virtual void removeFd(const Reactor::Key& key, Fd fd) = 0;
134
135
        virtual void runOnce() = 0;
136
        virtual void run()     = 0;
137
138
        virtual void shutdown() = 0;
139
140
        Reactor* reactor_;
141
    };
142
143
    /* Synchronous implementation of the reactor that polls in the context
144
     * of the same thread
145
     */
146
    class SyncImpl : public Reactor::Impl
147
    {
148
    public:
149
        explicit SyncImpl(Reactor* reactor)
150
0
            : Reactor::Impl(reactor)
151
0
            , handlers_()
152
0
            , shutdown_()
153
0
            , shutdownFd()
154
0
            , poller()
155
0
        {
156
0
            shutdownFd.bind(poller);
157
0
        }
158
159
        Reactor::Key addHandler(const std::shared_ptr<Handler>& handler,
160
                                bool setKey = true) override
161
0
        {
162
0
            handler->registerPoller(poller);
163
164
0
            handler->reactor_ = reactor_;
165
166
0
            std::mutex& poller_reg_unreg_mutex(poller.reg_unreg_mutex_);
167
0
            GUARD_AND_DBG_LOG(poller_reg_unreg_mutex);
168
0
            auto key = handlers_.add(handler);
169
0
            if (setKey)
170
0
                handler->key_ = key;
171
172
0
            return key;
173
0
        }
174
175
        // poller.reg_unreg_mutex_ must be locked before calling
176
        void detachFromReactor(const std::shared_ptr<Handler>& handler)
177
            override
178
0
        {
179
0
            PS_TIMEDBG_START_THIS;
180
181
0
            handler->unregisterPoller(poller);
182
0
            handler->reactor_ = nullptr;
183
0
        }
184
185
        void detachAndRemoveAllHandlers() override
186
0
        {
187
0
            std::mutex& poller_reg_unreg_mutex(poller.reg_unreg_mutex_);
188
0
            GUARD_AND_DBG_LOG(poller_reg_unreg_mutex);
189
190
0
            handlers_.forEachHandler([this](
191
0
                                         const std::shared_ptr<Handler> handler) {
192
0
                detachFromReactor(handler);
193
0
            });
194
195
0
            handlers_.removeAll();
196
0
        }
197
198
        std::shared_ptr<Handler> handler(const Reactor::Key& key) const
199
0
        {
200
0
            return handlers_.at(static_cast<size_t>(key.data()));
201
0
        }
202
203
        std::vector<std::shared_ptr<Handler>>
204
        handlers(const Reactor::Key& key) const override
205
0
        {
206
0
            std::vector<std::shared_ptr<Handler>> res;
207
208
0
            res.push_back(handler(key));
209
0
            return res;
210
0
        }
211
212
#ifdef DEBUG
213
        static void logNotifyOn(Fd fd, Polling::NotifyOn interest)
214
        {
215
            std::string str("Fd ");
216
217
            std::stringstream ss;
218
            ss << fd;
219
            str += ss.str();
220
221
            if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Read)))
222
                str += " read";
223
            if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Write)))
224
                str += " write";
225
            if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Hangup)))
226
                str += " hangup";
227
            if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Shutdown)))
228
                str += " shutdown";
229
230
            PS_LOG_DEBUG_ARGS("%s", str.c_str());
231
        }
232
233
#define PS_LOG_DBG_NOTIFY_ON logNotifyOn(fd, interest)
234
#else
235
#define PS_LOG_DBG_NOTIFY_ON
236
#endif
237
238
        void registerFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest,
239
                        Polling::Tag tag,
240
                        Polling::Mode mode = Polling::Mode::Level) override
241
0
        {
242
243
0
            auto pollTag = encodeTag(key, tag);
244
0
            PS_LOG_DBG_NOTIFY_ON;
245
0
            poller.addFd(fd, Flags<Polling::NotifyOn>(interest), pollTag, mode);
246
0
        }
247
248
        void registerFdOneShot(const Reactor::Key& key, Fd fd,
249
                               Polling::NotifyOn interest, Polling::Tag tag,
250
                               Polling::Mode mode = Polling::Mode::Level) override
251
0
        {
252
0
            PS_TIMEDBG_START_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd);
253
254
0
            auto pollTag = encodeTag(key, tag);
255
0
            PS_LOG_DBG_NOTIFY_ON;
256
0
            poller.addFdOneShot(fd, Flags<Polling::NotifyOn>(interest), pollTag, mode);
257
0
        }
258
259
        void modifyFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest,
260
                      Polling::Tag tag,
261
                      Polling::Mode mode = Polling::Mode::Level) override
262
0
        {
263
264
0
            auto pollTag = encodeTag(key, tag);
265
0
            poller.rearmFd(fd, Flags<Polling::NotifyOn>(interest), pollTag, mode);
266
0
        }
267
268
        void removeFd(const Reactor::Key& /*key*/, Fd fd) override
269
0
        {
270
0
            PS_TIMEDBG_START_ARGS("Reactor %p, Fd %" PIST_QUOTE(PS_FD_PRNTFCD),
271
0
                                  this, fd);
272
273
0
            poller.removeFd(fd);
274
0
        }
275
276
        void runOnce() override
277
0
        {
278
0
            PS_TIMEDBG_START;
279
280
0
            if (handlers_.empty())
281
0
                throw std::runtime_error("You need to set at least one handler");
282
283
0
            for (;;)
284
0
            {
285
0
                PS_TIMEDBG_START;
286
0
                { // encapsulate l_guard(poller.reg_unreg_mutex_)
287
                  // See comment in class Epoll regarding reg_unreg_mutex_
288
289
0
                    std::mutex&
290
0
                        poller_reg_unreg_mutex(poller.reg_unreg_mutex_);
291
0
                    GUARD_AND_DBG_LOG(poller_reg_unreg_mutex);
292
293
0
                    std::vector<Polling::Event> events;
294
0
                    int ready_fds = poller.poll(events);
295
296
0
                    switch (ready_fds)
297
0
                    {
298
0
                    case -1:
299
0
                        break;
300
0
                    case 0:
301
0
                        break;
302
0
                    default:
303
0
                        {
304
0
                            if (shutdown_)
305
0
                                return;
306
307
0
                            GUARD_AND_DBG_LOG(shutdown_mutex_);
308
0
                            if (shutdown_)
309
0
                                return;
310
311
0
                            handleFds(std::move(events));
312
0
                        }
313
0
                    }
314
0
                }
315
0
            }
316
0
        }
317
318
        void run() override
319
0
        {
320
0
            PS_TIMEDBG_START;
321
322
            // Note: poller_reg_unreg_mutex is already locked (by
323
            // Listener::run()) before calling here, so it is safe to call
324
            // handlers_.forEachHandler here
325
326
0
            handlers_.forEachHandler([](const std::shared_ptr<Handler> handler) {
327
0
                handler->context_.tid = std::this_thread::get_id();
328
0
            });
329
330
0
            while (!shutdown_)
331
0
            {
332
0
                PS_TIMEDBG_START;
333
0
                runOnce();
334
0
            }
335
0
        }
336
337
        void shutdown() override
338
0
        {
339
0
            PS_TIMEDBG_START_THIS;
340
341
0
            shutdown_.store(true);
342
343
0
            GUARD_AND_DBG_LOG(shutdown_mutex_);
344
0
            shutdownFd.notify();
345
0
        }
346
347
0
        static constexpr size_t MaxHandlers() { return HandlerList::MaxHandlers; }
348
349
    private:
350
        static Polling::Tag encodeTag(const Reactor::Key& key, Polling::Tag tag)
351
0
        {
352
0
            auto value = tag.value();
353
0
            return HandlerList::encodeTag(key, value);
354
0
        }
355
356
        static std::pair<size_t, Polling::TagValue>
357
        decodeTag(const Polling::Tag& tag)
358
0
        {
359
0
            return HandlerList::decodeTag(tag);
360
0
        }
361
362
        void handleFds(std::vector<Polling::Event> events) const
363
0
        {
364
            // Fast-path: if we only have one handler, do not bother scanning the fds to
365
            // find the right handlers
366
0
            if (handlers_.size() == 1)
367
0
                handlers_.at(0)->onReady(FdSet(std::move(events)));
368
0
            else
369
0
            {
370
0
                std::unordered_map<std::shared_ptr<Handler>, std::vector<Polling::Event>>
371
0
                    fdHandlers;
372
373
0
                for (auto& event : events)
374
0
                {
375
0
                    size_t index;
376
0
                    Polling::TagValue value;
377
378
0
                    std::tie(index, value) = decodeTag(event.tag);
379
0
                    auto handler_          = handlers_.at(index);
380
0
                    auto& evs              = fdHandlers.at(handler_);
381
0
                    evs.push_back(std::move(event));
382
0
                }
383
384
0
                for (auto& data : fdHandlers)
385
0
                {
386
0
                    data.first->onReady(FdSet(std::move(data.second)));
387
0
                }
388
0
            }
389
0
        }
390
391
        struct HandlerList
392
        {
393
394
            // We are using the highest 8 bits of the fd to encode the index of the
395
            // handler, which gives us a maximum of 2**8 - 1 handler, 255
396
            static constexpr size_t HandlerBits  = 8;
397
            static constexpr size_t HandlerShift = sizeof(uint64_t) - HandlerBits;
398
            static constexpr uint64_t DataMask   = uint64_t(-1) >> HandlerBits;
399
400
            static constexpr size_t MaxHandlers = (1 << HandlerBits) - 1;
401
402
            HandlerList()
403
0
                : handlers()
404
                , index_()
405
0
            {
406
0
                std::fill(std::begin(handlers), std::end(handlers), nullptr);
407
0
            }
408
409
            HandlerList(const HandlerList& other)            = delete;
410
            HandlerList& operator=(const HandlerList& other) = delete;
411
412
            // poller.reg_unreg_mutex_ must be locked before calling
413
            Reactor::Key add(const std::shared_ptr<Handler>& handler)
414
0
            {
415
0
                if (index_ == MaxHandlers)
416
0
                    throw std::runtime_error("Maximum handlers reached");
417
418
0
                Reactor::Key key(index_);
419
0
                handlers.at(index_++) = handler;
420
421
0
                return key;
422
0
            }
423
424
            // poller.reg_unreg_mutex_ must be locked before calling
425
            void removeAll()
426
0
            {
427
0
                index_ = 0;
428
0
                handlers.fill(nullptr);
429
0
            }
430
431
            // poller.reg_unreg_mutex_ must be locked before calling
432
            std::shared_ptr<Handler> at(size_t index) const
433
0
            {
434
0
                if (index >= index_)
435
0
                    throw std::runtime_error("Attempting to retrieve invalid handler");
436
0
                return handlers.at(index);
437
0
            }
438
439
0
            bool empty() const { return index_ == 0; }
440
441
0
            size_t size() const { return index_; }
442
443
            // Note that in the _USE_LIBEVENT case the the tag has type "struct
444
            // event *" but in fact may be that pointer with high bits set to
445
            // the value of "index". So in the _USE_LIBEVENT case we must be
446
            // careful to mask out those high bits to retrieve the actual
447
            // pointer, just as, in the non-_USE_LIBEVENT case, we have to mask
448
            // those high bits to retrieve the actual file descriptor.
449
            static Polling::Tag encodeTag(const Reactor::Key& key,
450
                                          Polling::TagValueConst value)
451
0
            {
452
0
                auto index = key.data();
453
                // The reason why we are using the most significant bits to
454
                // encode the index of the handler is that in the fast path, we
455
                // won't need to shift the value to retrieve the fd if there is
456
                // only one handler as all the bits will already be set to 0.
457
0
                auto encodedValue                =
458
0
                    (index << HandlerShift) |
459
0
                    PS_FD_CAST_TO_UNUM(uint64_t, static_cast<Fd>(value));
460
0
                Polling::TagValue encodedValueTV =
461
0
                    static_cast<Polling::TagValue>(PS_NUM_CAST_TO_FD(encodedValue));
462
0
                return Polling::Tag(encodedValueTV);
463
0
            }
464
465
            static std::pair<size_t, Polling::TagValue>
466
            decodeTag(const Polling::Tag& tag)
467
0
            {
468
0
                auto value                      = tag.valueU64();
469
0
                size_t index                    = value >> HandlerShift;
470
0
                uint64_t maskedValue            = value & DataMask;
471
0
                Polling::TagValue maskedValueTV =
472
0
                    static_cast<Polling::TagValue>(PS_NUM_CAST_TO_FD(maskedValue));
473
474
0
                return std::make_pair(index, maskedValueTV);
475
0
            }
476
477
            // poller.reg_unreg_mutex_ must be locked before calling
478
            template <typename Func>
479
            void forEachHandler(Func func) const
480
0
            {
481
0
                for (size_t i = 0; i < index_; ++i)
482
0
                    func(handlers.at(i));
483
0
            }
Unexecuted instantiation: void Pistache::Aio::SyncImpl::HandlerList::forEachHandler<Pistache::Aio::SyncImpl::detachAndRemoveAllHandlers()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}>(Pistache::Aio::SyncImpl::detachAndRemoveAllHandlers()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}) const
Unexecuted instantiation: void Pistache::Aio::SyncImpl::HandlerList::forEachHandler<Pistache::Aio::SyncImpl::run()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}>(Pistache::Aio::SyncImpl::run()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}) const
484
485
        private:
486
            std::array<std::shared_ptr<Handler>, MaxHandlers> handlers;
487
            size_t index_;
488
        };
489
490
        HandlerList handlers_;
491
492
        std::mutex shutdown_mutex_;
493
        std::atomic<bool> shutdown_;
494
        NotifyFd shutdownFd;
495
496
        Polling::Epoll poller;
497
    };
498
499
    /* Asynchronous implementation of the reactor that spawns a number N of threads
500
     * and creates a polling fd per thread
501
     *
502
     * Implementation detail:
503
     *
504
     *  Here is how it works: the implementation simply starts a synchronous variant
505
     *  of the implementation in its own std::thread. When adding an handler, it
506
     * will add a clone() of the handler to every worker (thread), and assign its
507
     * own key to the handler. Here is where things start to get interesting. Here
508
     * is how the key encoding works for every handler:
509
     *
510
     *  [     handler idx      ] [       worker idx         ]
511
     *  ------------------------ ----------------------------
512
     *       ^ 32 bits                   ^ 32 bits
513
     *  -----------------------------------------------------
514
     *                       ^ 64 bits
515
     *
516
     * Since we have up to 64 bits of data for every key, we encode the index of the
517
     * handler that has been assigned by the SyncImpl in the upper 32 bits, and
518
     * encode the index of the worker thread in the lowest 32 bits.
519
     *
520
     * When registering a fd for a given key, the AsyncImpl then knows which worker
521
     * to use by looking at the lowest 32 bits of the Key's data. The SyncImpl will
522
     * then use the highest 32 bits to retrieve the index of the handler.
523
     */
524
525
    class AsyncImpl : public Reactor::Impl
526
    {
527
    public:
528
        static constexpr uint32_t KeyMarker = 0xBADB0B;
529
530
        AsyncImpl(Reactor* reactor,
531
                  size_t threads, const std::string& threadsName)
532
0
            : Reactor::Impl(reactor)
533
0
        {
534
0
            PS_TIMEDBG_START_THIS;
535
536
0
            if (threads > SyncImpl::MaxHandlers())
537
0
                throw std::runtime_error("Too many worker threads requested (max "s + std::to_string(SyncImpl::MaxHandlers()) + ")."s);
538
539
0
            for (size_t i = 0; i < threads; ++i)
540
0
                workers_.emplace_back(std::make_unique<Worker>(reactor, threadsName));
541
0
            PS_LOG_DEBUG_ARGS("threads %d, workers_.size() %d",
542
0
                              threads, workers_.size());
543
0
        }
544
545
        Reactor::Key addHandler(const std::shared_ptr<Handler>& handler,
546
                                bool) override
547
0
        {
548
0
            PS_TIMEDBG_START_THIS;
549
550
0
            std::array<Reactor::Key, SyncImpl::MaxHandlers()> keys;
551
552
0
            for (size_t i = 0; i < workers_.size(); ++i)
553
0
            {
554
0
                auto& wrk = workers_.at(i);
555
556
0
                auto cl     = handler->clone();
557
0
                auto key    = wrk->sync->addHandler(cl, false /* setKey */);
558
0
                auto newKey = encodeKey(key, static_cast<uint32_t>(i));
559
0
                cl->key_    = newKey;
560
561
0
                keys.at(i) = key;
562
0
            }
563
564
0
            auto data = keys.at(0).data() << 32 | KeyMarker;
565
566
0
            return Reactor::Key(data);
567
0
        }
568
569
        void detachFromReactor(const std::shared_ptr<Handler>& handler)
570
            override
571
0
        {
572
0
            for (size_t i = 0; i < workers_.size(); ++i)
573
0
            {
574
0
                auto& wrk = workers_.at(i);
575
576
0
                wrk->sync->detachFromReactor(handler);
577
0
            }
578
0
        }
579
580
        void detachAndRemoveAllHandlers() override
581
0
        {
582
0
            for (size_t i = 0; i < workers_.size(); ++i)
583
0
            {
584
0
                auto& wrk = workers_.at(i);
585
586
0
                wrk->sync->detachAndRemoveAllHandlers();
587
0
            }
588
0
        }
589
590
        std::vector<std::shared_ptr<Handler>>
591
        handlers(const Reactor::Key& key) const override
592
0
        {
593
594
0
            const std::pair<uint32_t, uint32_t> idx_marker = decodeKey(key);
595
0
            if (idx_marker.second != KeyMarker)
596
0
                throw std::runtime_error("Invalid key");
597
598
0
            Reactor::Key originalKey(idx_marker.first);
599
600
0
            std::vector<std::shared_ptr<Handler>> res;
601
0
            res.reserve(workers_.size());
602
0
            for (const auto& wrk : workers_)
603
0
            {
604
0
                res.push_back(wrk->sync->handler(originalKey));
605
0
            }
606
607
0
            return res;
608
0
        }
609
610
        void registerFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest,
611
                        Polling::Tag tag,
612
                        Polling::Mode mode = Polling::Mode::Level) override
613
0
        {
614
0
            PS_TIMEDBG_START_THIS;
615
616
0
            dispatchCall(key, &SyncImpl::registerFd, fd, interest, tag, mode);
617
0
        }
618
619
        void registerFdOneShot(const Reactor::Key& key, Fd fd,
620
                               Polling::NotifyOn interest, Polling::Tag tag,
621
                               Polling::Mode mode = Polling::Mode::Level) override
622
0
        {
623
0
            PS_TIMEDBG_START_THIS;
624
625
0
            dispatchCall(key, &SyncImpl::registerFdOneShot, fd, interest, tag, mode);
626
0
        }
627
628
        void modifyFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest,
629
                      Polling::Tag tag,
630
                      Polling::Mode mode = Polling::Mode::Level) override
631
0
        {
632
0
            PS_TIMEDBG_START_THIS;
633
634
0
            dispatchCall(key, &SyncImpl::modifyFd, fd, interest, tag, mode);
635
0
        }
636
637
        void removeFd(const Reactor::Key& key, Fd fd) override
638
0
        {
639
0
            PS_TIMEDBG_START_ARGS("this %p, Fd %" PIST_QUOTE(PS_FD_PRNTFCD),
640
0
                                  this, fd);
641
0
            dispatchCall(key, &SyncImpl::removeFd, fd);
642
0
        }
643
644
0
        void runOnce() override { }
645
646
        void run() override
647
0
        {
648
0
            for (auto& wrk : workers_)
649
0
                wrk->run();
650
0
        }
651
652
        void shutdown() override
653
0
        {
654
0
            for (auto& wrk : workers_)
655
0
                wrk->shutdown();
656
0
        }
657
658
    private:
659
        static Reactor::Key encodeKey(const Reactor::Key& originalKey,
660
                                      uint32_t value)
661
0
        {
662
0
            auto data     = originalKey.data();
663
0
            auto newValue = data << 32 | value;
664
0
            return Reactor::Key(newValue);
665
0
        }
666
667
        static std::pair<uint32_t, uint32_t>
668
        decodeKey(const Reactor::Key& encodedKey)
669
0
        {
670
0
            auto data = encodedKey.data();
671
0
            auto hi   = static_cast<uint32_t>(data >> 32);
672
0
            auto lo   = static_cast<uint32_t>(data & 0xFFFFFFFF);
673
0
            return std::make_pair(hi, lo);
674
0
        }
675
676
0
#define CALL_MEMBER_FN(obj, pmf) (obj->*(pmf))
677
678
        template <typename Func, typename... Args>
679
        void dispatchCall(const Reactor::Key& key, Func func, Args&&... args) const
680
0
        {
681
0
            PS_TIMEDBG_START_THIS;
682
0
            PS_LOG_DEBUG_ARGS("workers_.size() %d", workers_.size());
683
684
0
            auto decoded    = decodeKey(key);
685
0
            const auto& wrk = workers_.at(decoded.second);
686
687
0
            Reactor::Key originalKey(decoded.first);
688
689
0
            CALL_MEMBER_FN(wrk->sync.get(), func)
690
0
            (originalKey, std::forward<Args>(args)...);
691
0
        }
Unexecuted instantiation: void Pistache::Aio::AsyncImpl::dispatchCall<void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int, Pistache::Polling::NotifyOn, Pistache::Polling::Tag, Pistache::Polling::Mode), int&, Pistache::Polling::NotifyOn&, Pistache::Polling::Tag&, Pistache::Polling::Mode&>(Pistache::Aio::Reactor::Key const&, void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int, Pistache::Polling::NotifyOn, Pistache::Polling::Tag, Pistache::Polling::Mode), int&, Pistache::Polling::NotifyOn&, Pistache::Polling::Tag&, Pistache::Polling::Mode&) const
Unexecuted instantiation: void Pistache::Aio::AsyncImpl::dispatchCall<void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int), int&>(Pistache::Aio::Reactor::Key const&, void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int), int&) const
692
693
#undef CALL_MEMBER_FN
694
695
        struct Worker
696
        {
697
698
            explicit Worker(Reactor* reactor, const std::string& threadsName)
699
0
                : thread()
700
0
                , sync(new SyncImpl(reactor))
701
0
                , threadsName_(threadsName)
702
0
            { }
703
704
            ~Worker()
705
0
            {
706
0
                if (thread.joinable())
707
0
                    thread.join();
708
0
            }
709
710
            void run()
711
0
            {
712
0
                PS_TIMEDBG_START;
713
714
0
                thread = std::thread([this]() {
715
0
                    PS_TIMEDBG_START;
716
717
0
                    if (!threadsName_.empty())
718
0
                    {
719
0
                        PS_LOG_DEBUG("Setting thread name/description");
720
#ifdef _IS_WINDOWS
721
                        const std::string threads_name(threadsName_.substr(0, 15));
722
                        const std::wstring temp(threads_name.begin(),
723
                                          threads_name.end());
724
                        const LPCWSTR wide_threads_name = temp.c_str();
725
726
                        HRESULT hr = E_NOTIMPL;
727
#ifdef __MINGW32__
728
                        TSetThreadDescription set_thread_description_ptr =
729
                            getSetThreadDescriptionPtr();
730
                        if (set_thread_description_ptr)
731
                        {
732
                            hr = set_thread_description_ptr(
733
                                GetCurrentThread(), wide_threads_name);
734
                        }
735
#else
736
                        hr = SetThreadDescription(GetCurrentThread(),
737
                                                  wide_threads_name);
738
#endif
739
                        if ((FAILED(hr)) && (!lLoggedSetThreadDescriptionFail))
740
                        {
741
                            lLoggedSetThreadDescriptionFail = true;
742
                            // Log it just once
743
                            PS_LOG_INFO("SetThreadDescription failed");
744
                        }
745
#else
746
#if defined _IS_BSD && !defined __NetBSD__
747
                        pthread_set_name_np(
748
#else
749
0
                        pthread_setname_np(
750
0
#endif
751
0
#ifndef __APPLE__
752
                            // Apple's macOS version of pthread_setname_np
753
                            // takes only "const char * name" as parm
754
                            // (Nov/2023), and assumes that the thread is the
755
                            // calling thread. Note that pthread_self returns
756
                            // calling thread in Linux, so this amounts to
757
                            // the same thing in the end
758
                            // It appears older FreeBSD (2003 ?) also behaves
759
                            // as per macOS, while newer FreeBSD (2021 ?)
760
                            // behaves as per Linux
761
0
                            pthread_self(),
762
0
#endif
763
#ifdef __NetBSD__
764
                            "%s", // NetBSD has 3 parms for pthread_setname_np
765
                            (void*)/*cast away const for NetBSD*/
766
#endif
767
0
                            threadsName_.substr(0, 15)
768
0
                                .c_str());
769
0
#endif // of ifdef _IS_WINDOWS... else...
770
0
                    }
771
0
                    PS_LOG_DEBUG("Calling sync->run()");
772
0
                    sync->run();
773
0
                });
774
0
            }
775
776
0
            void shutdown() { sync->shutdown(); }
777
778
            std::thread thread;
779
            std::unique_ptr<SyncImpl> sync;
780
            std::string threadsName_;
781
        };
782
783
        std::vector<std::unique_ptr<Worker>> workers_;
784
    };
785
786
    Reactor::Key::Key()
787
0
        : data_(0)
788
0
    { }
789
790
    Reactor::Key::Key(uint64_t data)
791
0
        : data_(data)
792
0
    { }
793
794
0
    Reactor::Reactor() = default;
795
796
    // Reactor::~Reactor() = default;
797
    Reactor::~Reactor()
798
0
    {
799
0
        PS_TIMEDBG_START_THIS;
800
801
0
        detachAndRemoveAllHandlers();
802
0
    }
803
804
    std::shared_ptr<Reactor> Reactor::create()
805
0
    {
806
0
        PS_TIMEDBG_START;
807
0
        return std::make_shared<Reactor>();
808
0
    }
809
810
    void Reactor::init()
811
0
    {
812
0
        SyncContext context;
813
0
        init(context);
814
0
    }
815
816
    void Reactor::init(const ExecutionContext& context)
817
0
    {
818
0
        PS_TIMEDBG_START_THIS;
819
820
0
        Reactor::Impl* new_impl = context.makeImpl(this);
821
0
        impl_.reset(new_impl);
822
0
    }
823
824
    Reactor::Key Reactor::addHandler(const std::shared_ptr<Handler>& handler)
825
0
    {
826
0
        PS_TIMEDBG_START_THIS;
827
0
        return impl()->addHandler(handler, true);
828
0
    }
829
830
    void Reactor::detachFromReactor(const std::shared_ptr<Handler>& handler)
831
0
    {
832
0
        PS_TIMEDBG_START_THIS;
833
0
        return impl()->detachFromReactor(handler);
834
0
    }
835
836
    void Reactor::detachAndRemoveAllHandlers()
837
0
    {
838
0
        PS_TIMEDBG_START_THIS;
839
840
0
        if (impl_) // may be null if Reactor::~Reactor called before we've had
841
                   // a chance to call Reactor::init()
842
0
            impl()->detachAndRemoveAllHandlers();
843
0
    }
844
845
    std::vector<std::shared_ptr<Handler>>
846
    Reactor::handlers(const Reactor::Key& key)
847
0
    {
848
0
        PS_TIMEDBG_START_THIS;
849
0
        return impl()->handlers(key);
850
0
    }
851
852
    void Reactor::registerFd(const Reactor::Key& key, Fd fd,
853
                             Polling::NotifyOn interest, Polling::Tag tag,
854
                             Polling::Mode mode)
855
0
    {
856
0
        PS_TIMEDBG_START_THIS;
857
0
        impl()->registerFd(key, fd, interest, tag, mode);
858
0
    }
859
860
    void Reactor::registerFdOneShot(const Reactor::Key& key, Fd fd,
861
                                    Polling::NotifyOn interest, Polling::Tag tag,
862
                                    Polling::Mode mode)
863
0
    {
864
0
        PS_TIMEDBG_START_THIS;
865
0
        impl()->registerFdOneShot(key, fd, interest, tag, mode);
866
0
    }
867
868
    void Reactor::registerFd(const Reactor::Key& key, Fd fd,
869
                             Polling::NotifyOn interest, Polling::Mode mode)
870
0
    {
871
0
        PS_TIMEDBG_START_THIS;
872
0
        impl()->registerFd(key, fd, interest, Polling::Tag(fd), mode);
873
0
    }
874
875
    void Reactor::registerFdOneShot(const Reactor::Key& key, Fd fd,
876
                                    Polling::NotifyOn interest,
877
                                    Polling::Mode mode)
878
0
    {
879
0
        PS_TIMEDBG_START_THIS;
880
0
        impl()->registerFdOneShot(key, fd, interest,
881
0
                                  Polling::Tag(fd), mode);
882
0
    }
883
884
    void Reactor::modifyFd(const Reactor::Key& key, Fd fd,
885
                           Polling::NotifyOn interest, Polling::Tag tag,
886
                           Polling::Mode mode)
887
0
    {
888
0
        PS_TIMEDBG_START_THIS;
889
0
        impl()->modifyFd(key, fd, interest, tag, mode);
890
0
    }
891
892
    void Reactor::modifyFd(const Reactor::Key& key, Fd fd,
893
                           Polling::NotifyOn interest, Polling::Mode mode)
894
0
    {
895
0
        PS_TIMEDBG_START_THIS;
896
0
        impl()->modifyFd(key, fd, interest, Polling::Tag(fd), mode);
897
0
    }
898
899
    void Reactor::removeFd(const Reactor::Key& key, Fd fd)
900
0
    {
901
0
        PS_TIMEDBG_START_ARGS("Reactor %p, Fd %" PIST_QUOTE(PS_FD_PRNTFCD),
902
0
                              this, fd);
903
904
0
        impl()->removeFd(key, fd);
905
0
    }
906
907
0
    void Reactor::run() { impl()->run(); }
908
909
    void Reactor::shutdown()
910
0
    {
911
0
        PS_TIMEDBG_START_THIS;
912
913
0
        if (impl_)
914
0
            impl()->shutdown();
915
0
    }
916
917
0
    void Reactor::runOnce() { impl()->runOnce(); }
918
919
    Reactor::Impl* Reactor::impl() const
920
0
    {
921
0
        if (!impl_)
922
0
            throw std::runtime_error(
923
0
                "Invalid object state, you should call init() before.");
924
925
0
        return impl_.get();
926
0
    }
927
928
    Reactor::Impl* SyncContext::makeImpl(Reactor* reactor) const
929
0
    {
930
0
        PS_TIMEDBG_START_THIS;
931
0
        return new SyncImpl(reactor);
932
0
    }
933
934
    Reactor::Impl* AsyncContext::makeImpl(Reactor* reactor) const
935
0
    {
936
0
        PS_TIMEDBG_START_THIS;
937
0
        return new AsyncImpl(reactor, threads_, threadsName_);
938
0
    }
939
940
0
    AsyncContext AsyncContext::singleThreaded() { return AsyncContext(1); }
941
942
} // namespace Pistache::Aio