/src/pistache/src/common/reactor.cc
Line | Count | Source |
1 | | /* |
2 | | * SPDX-FileCopyrightText: 2016 Mathieu Stefani |
3 | | * |
4 | | * SPDX-License-Identifier: Apache-2.0 |
5 | | */ |
6 | | |
7 | | /* |
8 | | Mathieu Stefani, 15 juin 2016 |
9 | | |
10 | | Implementation of the Reactor |
11 | | */ |
12 | | |
13 | | #include <pistache/reactor.h> |
14 | | |
15 | | #include <array> |
16 | | #include <atomic> |
17 | | #include <memory> |
18 | | #include <mutex> |
19 | | #include <string> |
20 | | #include <unordered_map> |
21 | | #include <vector> |
22 | | |
23 | | #include <pistache/pist_quote.h> |
24 | | #include <pistache/pist_timelog.h> |
25 | | |
26 | | #ifdef _IS_BSD |
27 | | // For pthread_set_name_np |
28 | | #include PST_THREAD_HDR |
29 | | #ifndef __NetBSD__ |
30 | | #include <pthread_np.h> |
31 | | #endif |
32 | | #endif |
33 | | |
34 | | #ifdef _IS_WINDOWS |
35 | | #include <windows.h> // Needed for PST_THREAD_HDR (processthreadsapi.h) |
36 | | #include PST_THREAD_HDR // for SetThreadDescription |
37 | | #endif |
38 | | |
39 | | #ifdef _IS_WINDOWS |
40 | | static std::atomic_bool lLoggedSetThreadDescriptionFail = false; |
41 | | #ifdef __MINGW32__ |
42 | | |
43 | | #include <windows.h> |
44 | | #include <libloaderapi.h> // for GetProcAddress and GetModuleHandleA |
45 | | typedef HRESULT (WINAPI *TSetThreadDescription)(HANDLE, PCWSTR); |
46 | | |
47 | | static std::atomic_bool lSetThreadDescriptionLoaded = false; |
48 | | static std::mutex lSetThreadDescriptionLoadMutex; |
49 | | static TSetThreadDescription lSetThreadDescriptionPtr = nullptr; |
50 | | |
51 | | TSetThreadDescription getSetThreadDescriptionPtr() |
52 | | { |
53 | | if (lSetThreadDescriptionLoaded) |
54 | | return(lSetThreadDescriptionPtr); |
55 | | |
56 | | GUARD_AND_DBG_LOG(lSetThreadDescriptionLoadMutex); |
57 | | if (lSetThreadDescriptionLoaded) |
58 | | return(lSetThreadDescriptionPtr); |
59 | | |
60 | | HMODULE hKernelBase = GetModuleHandleA("KernelBase.dll"); |
61 | | |
62 | | if (!hKernelBase) |
63 | | { |
64 | | PS_LOG_WARNING( |
65 | | "Failed to get KernelBase.dll for SetThreadDescription"); |
66 | | lSetThreadDescriptionLoaded = true; |
67 | | return(nullptr); |
68 | | } |
69 | | |
70 | | FARPROC set_thread_desc_fpptr = |
71 | | GetProcAddress(hKernelBase, "SetThreadDescription"); |
72 | | |
73 | | // We do the cast in two steps, otherwise mingw-gcc complains about |
74 | | // incompatible types |
75 | | void * set_thread_desc_vptr = |
76 | | reinterpret_cast<void *>(set_thread_desc_fpptr); |
77 | | lSetThreadDescriptionPtr = |
78 | | reinterpret_cast<TSetThreadDescription>(set_thread_desc_vptr); |
79 | | |
80 | | lSetThreadDescriptionLoaded = true; |
81 | | if (!lSetThreadDescriptionPtr) |
82 | | { |
83 | | PS_LOG_WARNING( |
84 | | "Failed to get SetThreadDescription from KernelBase.dll"); |
85 | | } |
86 | | return(lSetThreadDescriptionPtr); |
87 | | } |
88 | | #endif // of ifdef __MINGW32__ |
89 | | #endif // of ifdef _IS_WINDOWS |
90 | | |
91 | | using namespace std::string_literals; |
92 | | |
93 | | namespace Pistache::Aio |
94 | | { |
95 | | |
96 | | class Reactor::Impl |
97 | | { |
98 | | public: |
99 | | Impl(Reactor* reactor) |
100 | 0 | : reactor_(reactor) |
101 | 0 | { } |
102 | | |
103 | 0 | virtual ~Impl() = default; |
104 | | |
105 | | virtual Reactor::Key addHandler( |
106 | | const std::shared_ptr<Handler>& handler, bool setKey) |
107 | | = 0; |
108 | | |
109 | | virtual void detachFromReactor( |
110 | | const std::shared_ptr<Handler>& handler) |
111 | | = 0; |
112 | | |
113 | | virtual void detachAndRemoveAllHandlers() = 0; |
114 | | |
115 | | virtual std::vector<std::shared_ptr<Handler>> |
116 | | handlers(const Reactor::Key& key) const = 0; |
117 | | |
118 | | virtual void registerFd(const Reactor::Key& key, Fd fd, |
119 | | Polling::NotifyOn interest, Polling::Tag tag, |
120 | | Polling::Mode mode = Polling::Mode::Level) |
121 | | = 0; |
122 | | |
123 | | virtual void registerFdOneShot(const Reactor::Key& key, Fd fd, |
124 | | Polling::NotifyOn interest, Polling::Tag tag, |
125 | | Polling::Mode mode = Polling::Mode::Level) |
126 | | = 0; |
127 | | |
128 | | virtual void modifyFd(const Reactor::Key& key, Fd fd, |
129 | | Polling::NotifyOn interest, Polling::Tag tag, |
130 | | Polling::Mode mode = Polling::Mode::Level) |
131 | | = 0; |
132 | | |
133 | | virtual void removeFd(const Reactor::Key& key, Fd fd) = 0; |
134 | | |
135 | | virtual void runOnce() = 0; |
136 | | virtual void run() = 0; |
137 | | |
138 | | virtual void shutdown() = 0; |
139 | | |
140 | | Reactor* reactor_; |
141 | | }; |
142 | | |
143 | | /* Synchronous implementation of the reactor that polls in the context |
144 | | * of the same thread |
145 | | */ |
146 | | class SyncImpl : public Reactor::Impl |
147 | | { |
148 | | public: |
149 | | explicit SyncImpl(Reactor* reactor) |
150 | 0 | : Reactor::Impl(reactor) |
151 | 0 | , handlers_() |
152 | 0 | , shutdown_() |
153 | 0 | , shutdownFd() |
154 | 0 | , poller() |
155 | 0 | { |
156 | 0 | shutdownFd.bind(poller); |
157 | 0 | } |
158 | | |
159 | | Reactor::Key addHandler(const std::shared_ptr<Handler>& handler, |
160 | | bool setKey = true) override |
161 | 0 | { |
162 | 0 | handler->registerPoller(poller); |
163 | |
|
164 | 0 | handler->reactor_ = reactor_; |
165 | |
|
166 | 0 | std::mutex& poller_reg_unreg_mutex(poller.reg_unreg_mutex_); |
167 | 0 | GUARD_AND_DBG_LOG(poller_reg_unreg_mutex); |
168 | 0 | auto key = handlers_.add(handler); |
169 | 0 | if (setKey) |
170 | 0 | handler->key_ = key; |
171 | |
|
172 | 0 | return key; |
173 | 0 | } |
174 | | |
175 | | // poller.reg_unreg_mutex_ must be locked before calling |
176 | | void detachFromReactor(const std::shared_ptr<Handler>& handler) |
177 | | override |
178 | 0 | { |
179 | 0 | PS_TIMEDBG_START_THIS; |
180 | |
|
181 | 0 | handler->unregisterPoller(poller); |
182 | 0 | handler->reactor_ = nullptr; |
183 | 0 | } |
184 | | |
185 | | void detachAndRemoveAllHandlers() override |
186 | 0 | { |
187 | 0 | std::mutex& poller_reg_unreg_mutex(poller.reg_unreg_mutex_); |
188 | 0 | GUARD_AND_DBG_LOG(poller_reg_unreg_mutex); |
189 | |
|
190 | 0 | handlers_.forEachHandler([this]( |
191 | 0 | const std::shared_ptr<Handler> handler) { |
192 | 0 | detachFromReactor(handler); |
193 | 0 | }); |
194 | |
|
195 | 0 | handlers_.removeAll(); |
196 | 0 | } |
197 | | |
198 | | std::shared_ptr<Handler> handler(const Reactor::Key& key) const |
199 | 0 | { |
200 | 0 | return handlers_.at(static_cast<size_t>(key.data())); |
201 | 0 | } |
202 | | |
203 | | std::vector<std::shared_ptr<Handler>> |
204 | | handlers(const Reactor::Key& key) const override |
205 | 0 | { |
206 | 0 | std::vector<std::shared_ptr<Handler>> res; |
207 | |
|
208 | 0 | res.push_back(handler(key)); |
209 | 0 | return res; |
210 | 0 | } |
211 | | |
212 | | #ifdef DEBUG |
213 | | static void logNotifyOn(Fd fd, Polling::NotifyOn interest) |
214 | | { |
215 | | std::string str("Fd "); |
216 | | |
217 | | std::stringstream ss; |
218 | | ss << fd; |
219 | | str += ss.str(); |
220 | | |
221 | | if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Read))) |
222 | | str += " read"; |
223 | | if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Write))) |
224 | | str += " write"; |
225 | | if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Hangup))) |
226 | | str += " hangup"; |
227 | | if ((static_cast<unsigned int>(interest)) & (static_cast<unsigned int>(Polling::NotifyOn::Shutdown))) |
228 | | str += " shutdown"; |
229 | | |
230 | | PS_LOG_DEBUG_ARGS("%s", str.c_str()); |
231 | | } |
232 | | |
233 | | #define PS_LOG_DBG_NOTIFY_ON logNotifyOn(fd, interest) |
234 | | #else |
235 | | #define PS_LOG_DBG_NOTIFY_ON |
236 | | #endif |
237 | | |
238 | | void registerFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, |
239 | | Polling::Tag tag, |
240 | | Polling::Mode mode = Polling::Mode::Level) override |
241 | 0 | { |
242 | |
|
243 | 0 | auto pollTag = encodeTag(key, tag); |
244 | 0 | PS_LOG_DBG_NOTIFY_ON; |
245 | 0 | poller.addFd(fd, Flags<Polling::NotifyOn>(interest), pollTag, mode); |
246 | 0 | } |
247 | | |
248 | | void registerFdOneShot(const Reactor::Key& key, Fd fd, |
249 | | Polling::NotifyOn interest, Polling::Tag tag, |
250 | | Polling::Mode mode = Polling::Mode::Level) override |
251 | 0 | { |
252 | 0 | PS_TIMEDBG_START_ARGS("Fd %" PIST_QUOTE(PS_FD_PRNTFCD), fd); |
253 | |
|
254 | 0 | auto pollTag = encodeTag(key, tag); |
255 | 0 | PS_LOG_DBG_NOTIFY_ON; |
256 | 0 | poller.addFdOneShot(fd, Flags<Polling::NotifyOn>(interest), pollTag, mode); |
257 | 0 | } |
258 | | |
259 | | void modifyFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, |
260 | | Polling::Tag tag, |
261 | | Polling::Mode mode = Polling::Mode::Level) override |
262 | 0 | { |
263 | |
|
264 | 0 | auto pollTag = encodeTag(key, tag); |
265 | 0 | poller.rearmFd(fd, Flags<Polling::NotifyOn>(interest), pollTag, mode); |
266 | 0 | } |
267 | | |
268 | | void removeFd(const Reactor::Key& /*key*/, Fd fd) override |
269 | 0 | { |
270 | 0 | PS_TIMEDBG_START_ARGS("Reactor %p, Fd %" PIST_QUOTE(PS_FD_PRNTFCD), |
271 | 0 | this, fd); |
272 | |
|
273 | 0 | poller.removeFd(fd); |
274 | 0 | } |
275 | | |
276 | | void runOnce() override |
277 | 0 | { |
278 | 0 | PS_TIMEDBG_START; |
279 | |
|
280 | 0 | if (handlers_.empty()) |
281 | 0 | throw std::runtime_error("You need to set at least one handler"); |
282 | | |
283 | 0 | for (;;) |
284 | 0 | { |
285 | 0 | PS_TIMEDBG_START; |
286 | 0 | { // encapsulate l_guard(poller.reg_unreg_mutex_) |
287 | | // See comment in class Epoll regarding reg_unreg_mutex_ |
288 | |
|
289 | 0 | std::mutex& |
290 | 0 | poller_reg_unreg_mutex(poller.reg_unreg_mutex_); |
291 | 0 | GUARD_AND_DBG_LOG(poller_reg_unreg_mutex); |
292 | |
|
293 | 0 | std::vector<Polling::Event> events; |
294 | 0 | int ready_fds = poller.poll(events); |
295 | |
|
296 | 0 | switch (ready_fds) |
297 | 0 | { |
298 | 0 | case -1: |
299 | 0 | break; |
300 | 0 | case 0: |
301 | 0 | break; |
302 | 0 | default: |
303 | 0 | { |
304 | 0 | if (shutdown_) |
305 | 0 | return; |
306 | | |
307 | 0 | GUARD_AND_DBG_LOG(shutdown_mutex_); |
308 | 0 | if (shutdown_) |
309 | 0 | return; |
310 | | |
311 | 0 | handleFds(std::move(events)); |
312 | 0 | } |
313 | 0 | } |
314 | 0 | } |
315 | 0 | } |
316 | 0 | } |
317 | | |
318 | | void run() override |
319 | 0 | { |
320 | 0 | PS_TIMEDBG_START; |
321 | | |
322 | | // Note: poller_reg_unreg_mutex is already locked (by |
323 | | // Listener::run()) before calling here, so it is safe to call |
324 | | // handlers_.forEachHandler here |
325 | |
|
326 | 0 | handlers_.forEachHandler([](const std::shared_ptr<Handler> handler) { |
327 | 0 | handler->context_.tid = std::this_thread::get_id(); |
328 | 0 | }); |
329 | |
|
330 | 0 | while (!shutdown_) |
331 | 0 | { |
332 | 0 | PS_TIMEDBG_START; |
333 | 0 | runOnce(); |
334 | 0 | } |
335 | 0 | } |
336 | | |
337 | | void shutdown() override |
338 | 0 | { |
339 | 0 | PS_TIMEDBG_START_THIS; |
340 | |
|
341 | 0 | shutdown_.store(true); |
342 | |
|
343 | 0 | GUARD_AND_DBG_LOG(shutdown_mutex_); |
344 | 0 | shutdownFd.notify(); |
345 | 0 | } |
346 | | |
347 | 0 | static constexpr size_t MaxHandlers() { return HandlerList::MaxHandlers; } |
348 | | |
349 | | private: |
350 | | static Polling::Tag encodeTag(const Reactor::Key& key, Polling::Tag tag) |
351 | 0 | { |
352 | 0 | auto value = tag.value(); |
353 | 0 | return HandlerList::encodeTag(key, value); |
354 | 0 | } |
355 | | |
356 | | static std::pair<size_t, Polling::TagValue> |
357 | | decodeTag(const Polling::Tag& tag) |
358 | 0 | { |
359 | 0 | return HandlerList::decodeTag(tag); |
360 | 0 | } |
361 | | |
362 | | void handleFds(std::vector<Polling::Event> events) const |
363 | 0 | { |
364 | | // Fast-path: if we only have one handler, do not bother scanning the fds to |
365 | | // find the right handlers |
366 | 0 | if (handlers_.size() == 1) |
367 | 0 | handlers_.at(0)->onReady(FdSet(std::move(events))); |
368 | 0 | else |
369 | 0 | { |
370 | 0 | std::unordered_map<std::shared_ptr<Handler>, std::vector<Polling::Event>> |
371 | 0 | fdHandlers; |
372 | |
|
373 | 0 | for (auto& event : events) |
374 | 0 | { |
375 | 0 | size_t index; |
376 | 0 | Polling::TagValue value; |
377 | |
|
378 | 0 | std::tie(index, value) = decodeTag(event.tag); |
379 | 0 | auto handler_ = handlers_.at(index); |
380 | 0 | auto& evs = fdHandlers.at(handler_); |
381 | 0 | evs.push_back(std::move(event)); |
382 | 0 | } |
383 | |
|
384 | 0 | for (auto& data : fdHandlers) |
385 | 0 | { |
386 | 0 | data.first->onReady(FdSet(std::move(data.second))); |
387 | 0 | } |
388 | 0 | } |
389 | 0 | } |
390 | | |
391 | | struct HandlerList |
392 | | { |
393 | | |
394 | | // We are using the highest 8 bits of the fd to encode the index of the |
395 | | // handler, which gives us a maximum of 2**8 - 1 handler, 255 |
396 | | static constexpr size_t HandlerBits = 8; |
397 | | static constexpr size_t HandlerShift = sizeof(uint64_t) - HandlerBits; |
398 | | static constexpr uint64_t DataMask = uint64_t(-1) >> HandlerBits; |
399 | | |
400 | | static constexpr size_t MaxHandlers = (1 << HandlerBits) - 1; |
401 | | |
402 | | HandlerList() |
403 | 0 | : handlers() |
404 | | , index_() |
405 | 0 | { |
406 | 0 | std::fill(std::begin(handlers), std::end(handlers), nullptr); |
407 | 0 | } |
408 | | |
409 | | HandlerList(const HandlerList& other) = delete; |
410 | | HandlerList& operator=(const HandlerList& other) = delete; |
411 | | |
412 | | // poller.reg_unreg_mutex_ must be locked before calling |
413 | | Reactor::Key add(const std::shared_ptr<Handler>& handler) |
414 | 0 | { |
415 | 0 | if (index_ == MaxHandlers) |
416 | 0 | throw std::runtime_error("Maximum handlers reached"); |
417 | | |
418 | 0 | Reactor::Key key(index_); |
419 | 0 | handlers.at(index_++) = handler; |
420 | |
|
421 | 0 | return key; |
422 | 0 | } |
423 | | |
424 | | // poller.reg_unreg_mutex_ must be locked before calling |
425 | | void removeAll() |
426 | 0 | { |
427 | 0 | index_ = 0; |
428 | 0 | handlers.fill(nullptr); |
429 | 0 | } |
430 | | |
431 | | // poller.reg_unreg_mutex_ must be locked before calling |
432 | | std::shared_ptr<Handler> at(size_t index) const |
433 | 0 | { |
434 | 0 | if (index >= index_) |
435 | 0 | throw std::runtime_error("Attempting to retrieve invalid handler"); |
436 | 0 | return handlers.at(index); |
437 | 0 | } |
438 | | |
439 | 0 | bool empty() const { return index_ == 0; } |
440 | | |
441 | 0 | size_t size() const { return index_; } |
442 | | |
443 | | // Note that in the _USE_LIBEVENT case the the tag has type "struct |
444 | | // event *" but in fact may be that pointer with high bits set to |
445 | | // the value of "index". So in the _USE_LIBEVENT case we must be |
446 | | // careful to mask out those high bits to retrieve the actual |
447 | | // pointer, just as, in the non-_USE_LIBEVENT case, we have to mask |
448 | | // those high bits to retrieve the actual file descriptor. |
449 | | static Polling::Tag encodeTag(const Reactor::Key& key, |
450 | | Polling::TagValueConst value) |
451 | 0 | { |
452 | 0 | auto index = key.data(); |
453 | | // The reason why we are using the most significant bits to |
454 | | // encode the index of the handler is that in the fast path, we |
455 | | // won't need to shift the value to retrieve the fd if there is |
456 | | // only one handler as all the bits will already be set to 0. |
457 | 0 | auto encodedValue = |
458 | 0 | (index << HandlerShift) | |
459 | 0 | PS_FD_CAST_TO_UNUM(uint64_t, static_cast<Fd>(value)); |
460 | 0 | Polling::TagValue encodedValueTV = |
461 | 0 | static_cast<Polling::TagValue>(PS_NUM_CAST_TO_FD(encodedValue)); |
462 | 0 | return Polling::Tag(encodedValueTV); |
463 | 0 | } |
464 | | |
465 | | static std::pair<size_t, Polling::TagValue> |
466 | | decodeTag(const Polling::Tag& tag) |
467 | 0 | { |
468 | 0 | auto value = tag.valueU64(); |
469 | 0 | size_t index = value >> HandlerShift; |
470 | 0 | uint64_t maskedValue = value & DataMask; |
471 | 0 | Polling::TagValue maskedValueTV = |
472 | 0 | static_cast<Polling::TagValue>(PS_NUM_CAST_TO_FD(maskedValue)); |
473 | |
|
474 | 0 | return std::make_pair(index, maskedValueTV); |
475 | 0 | } |
476 | | |
477 | | // poller.reg_unreg_mutex_ must be locked before calling |
478 | | template <typename Func> |
479 | | void forEachHandler(Func func) const |
480 | 0 | { |
481 | 0 | for (size_t i = 0; i < index_; ++i) |
482 | 0 | func(handlers.at(i)); |
483 | 0 | } Unexecuted instantiation: void Pistache::Aio::SyncImpl::HandlerList::forEachHandler<Pistache::Aio::SyncImpl::detachAndRemoveAllHandlers()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}>(Pistache::Aio::SyncImpl::detachAndRemoveAllHandlers()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}) constUnexecuted instantiation: void Pistache::Aio::SyncImpl::HandlerList::forEachHandler<Pistache::Aio::SyncImpl::run()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}>(Pistache::Aio::SyncImpl::run()::{lambda(std::__1::shared_ptr<Pistache::Aio::Handler>)#1}) const |
484 | | |
485 | | private: |
486 | | std::array<std::shared_ptr<Handler>, MaxHandlers> handlers; |
487 | | size_t index_; |
488 | | }; |
489 | | |
490 | | HandlerList handlers_; |
491 | | |
492 | | std::mutex shutdown_mutex_; |
493 | | std::atomic<bool> shutdown_; |
494 | | NotifyFd shutdownFd; |
495 | | |
496 | | Polling::Epoll poller; |
497 | | }; |
498 | | |
499 | | /* Asynchronous implementation of the reactor that spawns a number N of threads |
500 | | * and creates a polling fd per thread |
501 | | * |
502 | | * Implementation detail: |
503 | | * |
504 | | * Here is how it works: the implementation simply starts a synchronous variant |
505 | | * of the implementation in its own std::thread. When adding an handler, it |
506 | | * will add a clone() of the handler to every worker (thread), and assign its |
507 | | * own key to the handler. Here is where things start to get interesting. Here |
508 | | * is how the key encoding works for every handler: |
509 | | * |
510 | | * [ handler idx ] [ worker idx ] |
511 | | * ------------------------ ---------------------------- |
512 | | * ^ 32 bits ^ 32 bits |
513 | | * ----------------------------------------------------- |
514 | | * ^ 64 bits |
515 | | * |
516 | | * Since we have up to 64 bits of data for every key, we encode the index of the |
517 | | * handler that has been assigned by the SyncImpl in the upper 32 bits, and |
518 | | * encode the index of the worker thread in the lowest 32 bits. |
519 | | * |
520 | | * When registering a fd for a given key, the AsyncImpl then knows which worker |
521 | | * to use by looking at the lowest 32 bits of the Key's data. The SyncImpl will |
522 | | * then use the highest 32 bits to retrieve the index of the handler. |
523 | | */ |
524 | | |
525 | | class AsyncImpl : public Reactor::Impl |
526 | | { |
527 | | public: |
528 | | static constexpr uint32_t KeyMarker = 0xBADB0B; |
529 | | |
530 | | AsyncImpl(Reactor* reactor, |
531 | | size_t threads, const std::string& threadsName) |
532 | 0 | : Reactor::Impl(reactor) |
533 | 0 | { |
534 | 0 | PS_TIMEDBG_START_THIS; |
535 | |
|
536 | 0 | if (threads > SyncImpl::MaxHandlers()) |
537 | 0 | throw std::runtime_error("Too many worker threads requested (max "s + std::to_string(SyncImpl::MaxHandlers()) + ")."s); |
538 | | |
539 | 0 | for (size_t i = 0; i < threads; ++i) |
540 | 0 | workers_.emplace_back(std::make_unique<Worker>(reactor, threadsName)); |
541 | 0 | PS_LOG_DEBUG_ARGS("threads %d, workers_.size() %d", |
542 | 0 | threads, workers_.size()); |
543 | 0 | } |
544 | | |
545 | | Reactor::Key addHandler(const std::shared_ptr<Handler>& handler, |
546 | | bool) override |
547 | 0 | { |
548 | 0 | PS_TIMEDBG_START_THIS; |
549 | |
|
550 | 0 | std::array<Reactor::Key, SyncImpl::MaxHandlers()> keys; |
551 | |
|
552 | 0 | for (size_t i = 0; i < workers_.size(); ++i) |
553 | 0 | { |
554 | 0 | auto& wrk = workers_.at(i); |
555 | |
|
556 | 0 | auto cl = handler->clone(); |
557 | 0 | auto key = wrk->sync->addHandler(cl, false /* setKey */); |
558 | 0 | auto newKey = encodeKey(key, static_cast<uint32_t>(i)); |
559 | 0 | cl->key_ = newKey; |
560 | |
|
561 | 0 | keys.at(i) = key; |
562 | 0 | } |
563 | |
|
564 | 0 | auto data = keys.at(0).data() << 32 | KeyMarker; |
565 | |
|
566 | 0 | return Reactor::Key(data); |
567 | 0 | } |
568 | | |
569 | | void detachFromReactor(const std::shared_ptr<Handler>& handler) |
570 | | override |
571 | 0 | { |
572 | 0 | for (size_t i = 0; i < workers_.size(); ++i) |
573 | 0 | { |
574 | 0 | auto& wrk = workers_.at(i); |
575 | |
|
576 | 0 | wrk->sync->detachFromReactor(handler); |
577 | 0 | } |
578 | 0 | } |
579 | | |
580 | | void detachAndRemoveAllHandlers() override |
581 | 0 | { |
582 | 0 | for (size_t i = 0; i < workers_.size(); ++i) |
583 | 0 | { |
584 | 0 | auto& wrk = workers_.at(i); |
585 | |
|
586 | 0 | wrk->sync->detachAndRemoveAllHandlers(); |
587 | 0 | } |
588 | 0 | } |
589 | | |
590 | | std::vector<std::shared_ptr<Handler>> |
591 | | handlers(const Reactor::Key& key) const override |
592 | 0 | { |
593 | |
|
594 | 0 | const std::pair<uint32_t, uint32_t> idx_marker = decodeKey(key); |
595 | 0 | if (idx_marker.second != KeyMarker) |
596 | 0 | throw std::runtime_error("Invalid key"); |
597 | | |
598 | 0 | Reactor::Key originalKey(idx_marker.first); |
599 | |
|
600 | 0 | std::vector<std::shared_ptr<Handler>> res; |
601 | 0 | res.reserve(workers_.size()); |
602 | 0 | for (const auto& wrk : workers_) |
603 | 0 | { |
604 | 0 | res.push_back(wrk->sync->handler(originalKey)); |
605 | 0 | } |
606 | |
|
607 | 0 | return res; |
608 | 0 | } |
609 | | |
610 | | void registerFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, |
611 | | Polling::Tag tag, |
612 | | Polling::Mode mode = Polling::Mode::Level) override |
613 | 0 | { |
614 | 0 | PS_TIMEDBG_START_THIS; |
615 | |
|
616 | 0 | dispatchCall(key, &SyncImpl::registerFd, fd, interest, tag, mode); |
617 | 0 | } |
618 | | |
619 | | void registerFdOneShot(const Reactor::Key& key, Fd fd, |
620 | | Polling::NotifyOn interest, Polling::Tag tag, |
621 | | Polling::Mode mode = Polling::Mode::Level) override |
622 | 0 | { |
623 | 0 | PS_TIMEDBG_START_THIS; |
624 | |
|
625 | 0 | dispatchCall(key, &SyncImpl::registerFdOneShot, fd, interest, tag, mode); |
626 | 0 | } |
627 | | |
628 | | void modifyFd(const Reactor::Key& key, Fd fd, Polling::NotifyOn interest, |
629 | | Polling::Tag tag, |
630 | | Polling::Mode mode = Polling::Mode::Level) override |
631 | 0 | { |
632 | 0 | PS_TIMEDBG_START_THIS; |
633 | |
|
634 | 0 | dispatchCall(key, &SyncImpl::modifyFd, fd, interest, tag, mode); |
635 | 0 | } |
636 | | |
637 | | void removeFd(const Reactor::Key& key, Fd fd) override |
638 | 0 | { |
639 | 0 | PS_TIMEDBG_START_ARGS("this %p, Fd %" PIST_QUOTE(PS_FD_PRNTFCD), |
640 | 0 | this, fd); |
641 | 0 | dispatchCall(key, &SyncImpl::removeFd, fd); |
642 | 0 | } |
643 | | |
644 | 0 | void runOnce() override { } |
645 | | |
646 | | void run() override |
647 | 0 | { |
648 | 0 | for (auto& wrk : workers_) |
649 | 0 | wrk->run(); |
650 | 0 | } |
651 | | |
652 | | void shutdown() override |
653 | 0 | { |
654 | 0 | for (auto& wrk : workers_) |
655 | 0 | wrk->shutdown(); |
656 | 0 | } |
657 | | |
658 | | private: |
659 | | static Reactor::Key encodeKey(const Reactor::Key& originalKey, |
660 | | uint32_t value) |
661 | 0 | { |
662 | 0 | auto data = originalKey.data(); |
663 | 0 | auto newValue = data << 32 | value; |
664 | 0 | return Reactor::Key(newValue); |
665 | 0 | } |
666 | | |
667 | | static std::pair<uint32_t, uint32_t> |
668 | | decodeKey(const Reactor::Key& encodedKey) |
669 | 0 | { |
670 | 0 | auto data = encodedKey.data(); |
671 | 0 | auto hi = static_cast<uint32_t>(data >> 32); |
672 | 0 | auto lo = static_cast<uint32_t>(data & 0xFFFFFFFF); |
673 | 0 | return std::make_pair(hi, lo); |
674 | 0 | } |
675 | | |
676 | 0 | #define CALL_MEMBER_FN(obj, pmf) (obj->*(pmf)) |
677 | | |
678 | | template <typename Func, typename... Args> |
679 | | void dispatchCall(const Reactor::Key& key, Func func, Args&&... args) const |
680 | 0 | { |
681 | 0 | PS_TIMEDBG_START_THIS; |
682 | 0 | PS_LOG_DEBUG_ARGS("workers_.size() %d", workers_.size()); |
683 | |
|
684 | 0 | auto decoded = decodeKey(key); |
685 | 0 | const auto& wrk = workers_.at(decoded.second); |
686 | |
|
687 | 0 | Reactor::Key originalKey(decoded.first); |
688 | |
|
689 | 0 | CALL_MEMBER_FN(wrk->sync.get(), func) |
690 | 0 | (originalKey, std::forward<Args>(args)...); |
691 | 0 | } Unexecuted instantiation: void Pistache::Aio::AsyncImpl::dispatchCall<void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int, Pistache::Polling::NotifyOn, Pistache::Polling::Tag, Pistache::Polling::Mode), int&, Pistache::Polling::NotifyOn&, Pistache::Polling::Tag&, Pistache::Polling::Mode&>(Pistache::Aio::Reactor::Key const&, void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int, Pistache::Polling::NotifyOn, Pistache::Polling::Tag, Pistache::Polling::Mode), int&, Pistache::Polling::NotifyOn&, Pistache::Polling::Tag&, Pistache::Polling::Mode&) const Unexecuted instantiation: void Pistache::Aio::AsyncImpl::dispatchCall<void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int), int&>(Pistache::Aio::Reactor::Key const&, void (Pistache::Aio::SyncImpl::*)(Pistache::Aio::Reactor::Key const&, int), int&) const |
692 | | |
693 | | #undef CALL_MEMBER_FN |
694 | | |
695 | | struct Worker |
696 | | { |
697 | | |
698 | | explicit Worker(Reactor* reactor, const std::string& threadsName) |
699 | 0 | : thread() |
700 | 0 | , sync(new SyncImpl(reactor)) |
701 | 0 | , threadsName_(threadsName) |
702 | 0 | { } |
703 | | |
704 | | ~Worker() |
705 | 0 | { |
706 | 0 | if (thread.joinable()) |
707 | 0 | thread.join(); |
708 | 0 | } |
709 | | |
710 | | void run() |
711 | 0 | { |
712 | 0 | PS_TIMEDBG_START; |
713 | |
|
714 | 0 | thread = std::thread([this]() { |
715 | 0 | PS_TIMEDBG_START; |
716 | |
|
717 | 0 | if (!threadsName_.empty()) |
718 | 0 | { |
719 | 0 | PS_LOG_DEBUG("Setting thread name/description"); |
720 | | #ifdef _IS_WINDOWS |
721 | | const std::string threads_name(threadsName_.substr(0, 15)); |
722 | | const std::wstring temp(threads_name.begin(), |
723 | | threads_name.end()); |
724 | | const LPCWSTR wide_threads_name = temp.c_str(); |
725 | | |
726 | | HRESULT hr = E_NOTIMPL; |
727 | | #ifdef __MINGW32__ |
728 | | TSetThreadDescription set_thread_description_ptr = |
729 | | getSetThreadDescriptionPtr(); |
730 | | if (set_thread_description_ptr) |
731 | | { |
732 | | hr = set_thread_description_ptr( |
733 | | GetCurrentThread(), wide_threads_name); |
734 | | } |
735 | | #else |
736 | | hr = SetThreadDescription(GetCurrentThread(), |
737 | | wide_threads_name); |
738 | | #endif |
739 | | if ((FAILED(hr)) && (!lLoggedSetThreadDescriptionFail)) |
740 | | { |
741 | | lLoggedSetThreadDescriptionFail = true; |
742 | | // Log it just once |
743 | | PS_LOG_INFO("SetThreadDescription failed"); |
744 | | } |
745 | | #else |
746 | | #if defined _IS_BSD && !defined __NetBSD__ |
747 | | pthread_set_name_np( |
748 | | #else |
749 | 0 | pthread_setname_np( |
750 | 0 | #endif |
751 | 0 | #ifndef __APPLE__ |
752 | | // Apple's macOS version of pthread_setname_np |
753 | | // takes only "const char * name" as parm |
754 | | // (Nov/2023), and assumes that the thread is the |
755 | | // calling thread. Note that pthread_self returns |
756 | | // calling thread in Linux, so this amounts to |
757 | | // the same thing in the end |
758 | | // It appears older FreeBSD (2003 ?) also behaves |
759 | | // as per macOS, while newer FreeBSD (2021 ?) |
760 | | // behaves as per Linux |
761 | 0 | pthread_self(), |
762 | 0 | #endif |
763 | | #ifdef __NetBSD__ |
764 | | "%s", // NetBSD has 3 parms for pthread_setname_np |
765 | | (void*)/*cast away const for NetBSD*/ |
766 | | #endif |
767 | 0 | threadsName_.substr(0, 15) |
768 | 0 | .c_str()); |
769 | 0 | #endif // of ifdef _IS_WINDOWS... else... |
770 | 0 | } |
771 | 0 | PS_LOG_DEBUG("Calling sync->run()"); |
772 | 0 | sync->run(); |
773 | 0 | }); |
774 | 0 | } |
775 | | |
776 | 0 | void shutdown() { sync->shutdown(); } |
777 | | |
778 | | std::thread thread; |
779 | | std::unique_ptr<SyncImpl> sync; |
780 | | std::string threadsName_; |
781 | | }; |
782 | | |
783 | | std::vector<std::unique_ptr<Worker>> workers_; |
784 | | }; |
785 | | |
786 | | Reactor::Key::Key() |
787 | 0 | : data_(0) |
788 | 0 | { } |
789 | | |
790 | | Reactor::Key::Key(uint64_t data) |
791 | 0 | : data_(data) |
792 | 0 | { } |
793 | | |
794 | 0 | Reactor::Reactor() = default; |
795 | | |
796 | | // Reactor::~Reactor() = default; |
797 | | Reactor::~Reactor() |
798 | 0 | { |
799 | 0 | PS_TIMEDBG_START_THIS; |
800 | |
|
801 | 0 | detachAndRemoveAllHandlers(); |
802 | 0 | } |
803 | | |
804 | | std::shared_ptr<Reactor> Reactor::create() |
805 | 0 | { |
806 | 0 | PS_TIMEDBG_START; |
807 | 0 | return std::make_shared<Reactor>(); |
808 | 0 | } |
809 | | |
810 | | void Reactor::init() |
811 | 0 | { |
812 | 0 | SyncContext context; |
813 | 0 | init(context); |
814 | 0 | } |
815 | | |
816 | | void Reactor::init(const ExecutionContext& context) |
817 | 0 | { |
818 | 0 | PS_TIMEDBG_START_THIS; |
819 | |
|
820 | 0 | Reactor::Impl* new_impl = context.makeImpl(this); |
821 | 0 | impl_.reset(new_impl); |
822 | 0 | } |
823 | | |
824 | | Reactor::Key Reactor::addHandler(const std::shared_ptr<Handler>& handler) |
825 | 0 | { |
826 | 0 | PS_TIMEDBG_START_THIS; |
827 | 0 | return impl()->addHandler(handler, true); |
828 | 0 | } |
829 | | |
830 | | void Reactor::detachFromReactor(const std::shared_ptr<Handler>& handler) |
831 | 0 | { |
832 | 0 | PS_TIMEDBG_START_THIS; |
833 | 0 | return impl()->detachFromReactor(handler); |
834 | 0 | } |
835 | | |
836 | | void Reactor::detachAndRemoveAllHandlers() |
837 | 0 | { |
838 | 0 | PS_TIMEDBG_START_THIS; |
839 | |
|
840 | 0 | if (impl_) // may be null if Reactor::~Reactor called before we've had |
841 | | // a chance to call Reactor::init() |
842 | 0 | impl()->detachAndRemoveAllHandlers(); |
843 | 0 | } |
844 | | |
845 | | std::vector<std::shared_ptr<Handler>> |
846 | | Reactor::handlers(const Reactor::Key& key) |
847 | 0 | { |
848 | 0 | PS_TIMEDBG_START_THIS; |
849 | 0 | return impl()->handlers(key); |
850 | 0 | } |
851 | | |
852 | | void Reactor::registerFd(const Reactor::Key& key, Fd fd, |
853 | | Polling::NotifyOn interest, Polling::Tag tag, |
854 | | Polling::Mode mode) |
855 | 0 | { |
856 | 0 | PS_TIMEDBG_START_THIS; |
857 | 0 | impl()->registerFd(key, fd, interest, tag, mode); |
858 | 0 | } |
859 | | |
860 | | void Reactor::registerFdOneShot(const Reactor::Key& key, Fd fd, |
861 | | Polling::NotifyOn interest, Polling::Tag tag, |
862 | | Polling::Mode mode) |
863 | 0 | { |
864 | 0 | PS_TIMEDBG_START_THIS; |
865 | 0 | impl()->registerFdOneShot(key, fd, interest, tag, mode); |
866 | 0 | } |
867 | | |
868 | | void Reactor::registerFd(const Reactor::Key& key, Fd fd, |
869 | | Polling::NotifyOn interest, Polling::Mode mode) |
870 | 0 | { |
871 | 0 | PS_TIMEDBG_START_THIS; |
872 | 0 | impl()->registerFd(key, fd, interest, Polling::Tag(fd), mode); |
873 | 0 | } |
874 | | |
875 | | void Reactor::registerFdOneShot(const Reactor::Key& key, Fd fd, |
876 | | Polling::NotifyOn interest, |
877 | | Polling::Mode mode) |
878 | 0 | { |
879 | 0 | PS_TIMEDBG_START_THIS; |
880 | 0 | impl()->registerFdOneShot(key, fd, interest, |
881 | 0 | Polling::Tag(fd), mode); |
882 | 0 | } |
883 | | |
884 | | void Reactor::modifyFd(const Reactor::Key& key, Fd fd, |
885 | | Polling::NotifyOn interest, Polling::Tag tag, |
886 | | Polling::Mode mode) |
887 | 0 | { |
888 | 0 | PS_TIMEDBG_START_THIS; |
889 | 0 | impl()->modifyFd(key, fd, interest, tag, mode); |
890 | 0 | } |
891 | | |
892 | | void Reactor::modifyFd(const Reactor::Key& key, Fd fd, |
893 | | Polling::NotifyOn interest, Polling::Mode mode) |
894 | 0 | { |
895 | 0 | PS_TIMEDBG_START_THIS; |
896 | 0 | impl()->modifyFd(key, fd, interest, Polling::Tag(fd), mode); |
897 | 0 | } |
898 | | |
899 | | void Reactor::removeFd(const Reactor::Key& key, Fd fd) |
900 | 0 | { |
901 | 0 | PS_TIMEDBG_START_ARGS("Reactor %p, Fd %" PIST_QUOTE(PS_FD_PRNTFCD), |
902 | 0 | this, fd); |
903 | |
|
904 | 0 | impl()->removeFd(key, fd); |
905 | 0 | } |
906 | | |
907 | 0 | void Reactor::run() { impl()->run(); } |
908 | | |
909 | | void Reactor::shutdown() |
910 | 0 | { |
911 | 0 | PS_TIMEDBG_START_THIS; |
912 | |
|
913 | 0 | if (impl_) |
914 | 0 | impl()->shutdown(); |
915 | 0 | } |
916 | | |
917 | 0 | void Reactor::runOnce() { impl()->runOnce(); } |
918 | | |
919 | | Reactor::Impl* Reactor::impl() const |
920 | 0 | { |
921 | 0 | if (!impl_) |
922 | 0 | throw std::runtime_error( |
923 | 0 | "Invalid object state, you should call init() before."); |
924 | | |
925 | 0 | return impl_.get(); |
926 | 0 | } |
927 | | |
928 | | Reactor::Impl* SyncContext::makeImpl(Reactor* reactor) const |
929 | 0 | { |
930 | 0 | PS_TIMEDBG_START_THIS; |
931 | 0 | return new SyncImpl(reactor); |
932 | 0 | } |
933 | | |
934 | | Reactor::Impl* AsyncContext::makeImpl(Reactor* reactor) const |
935 | 0 | { |
936 | 0 | PS_TIMEDBG_START_THIS; |
937 | 0 | return new AsyncImpl(reactor, threads_, threadsName_); |
938 | 0 | } |
939 | | |
940 | 0 | AsyncContext AsyncContext::singleThreaded() { return AsyncContext(1); } |
941 | | |
942 | | } // namespace Pistache::Aio |