/src/openexr/src/lib/IlmThread/IlmThreadPool.cpp
Line | Count | Source |
1 | | // |
2 | | // SPDX-License-Identifier: BSD-3-Clause |
3 | | // Copyright (c) Contributors to the OpenEXR Project. |
4 | | // |
5 | | |
6 | | //----------------------------------------------------------------------------- |
7 | | // |
8 | | // class Task, class ThreadPool, class TaskGroup |
9 | | // |
10 | | //----------------------------------------------------------------------------- |
11 | | |
12 | | #include "IlmThreadPool.h" |
13 | | #include "Iex.h" |
14 | | #include "IlmThread.h" |
15 | | #include "IlmThreadSemaphore.h" |
16 | | |
17 | | #include <atomic> |
18 | | #include <limits> |
19 | | #include <memory> |
20 | | #include <mutex> |
21 | | #include <thread> |
22 | | #include <vector> |
23 | | |
24 | | #if (defined(_WIN32) || defined(_WIN64)) |
25 | | # include <windows.h> |
26 | | #else |
27 | | # include <unistd.h> |
28 | | #endif |
29 | | |
30 | | ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER |
31 | | |
32 | | #if ILMTHREAD_THREADING_ENABLED |
33 | | # define ENABLE_THREADING |
34 | | #endif |
35 | | |
36 | | namespace |
37 | | { |
38 | | |
39 | | static inline void |
40 | | handleProcessTask (Task* task) |
41 | 1.36k | { |
42 | 1.36k | if (task) |
43 | 1.36k | { |
44 | 1.36k | TaskGroup* taskGroup = task->group (); |
45 | | |
46 | 1.36k | task->execute (); |
47 | | |
48 | | // kill the task prior to notifying the group |
49 | | // such that any internal reference-based |
50 | | // semantics will be handled prior to |
51 | | // the task group destructor letting it out |
52 | | // of the scope of those references |
53 | 1.36k | delete task; |
54 | | |
55 | 1.36k | if (taskGroup) taskGroup->finishOneTask (); |
56 | 1.36k | } |
57 | 1.36k | } |
58 | | |
59 | | struct DefaultThreadPoolData |
60 | | { |
61 | | Semaphore _taskSemaphore; // threads wait on this for ready tasks |
62 | | mutable std::mutex _taskMutex; // mutual exclusion for the tasks list |
63 | | std::vector<Task*> _tasks; // the list of tasks to execute |
64 | | |
65 | | mutable std::mutex _threadMutex; // mutual exclusion for threads list |
66 | | std::vector<std::thread> _threads; // the list of all threads |
67 | | |
68 | | std::atomic<int> _threadCount; |
69 | | std::atomic<bool> _stopping; |
70 | | |
71 | | inline bool stopped () const |
72 | 16 | { |
73 | 16 | return _stopping.load (std::memory_order_relaxed); |
74 | 16 | } |
75 | | |
76 | 1 | inline void stop () { _stopping = true; } |
77 | | |
78 | | inline void resetAtomics () |
79 | 2 | { |
80 | 2 | _threadCount = 0; |
81 | 2 | _stopping = false; |
82 | 2 | } |
83 | | }; |
84 | | |
85 | | } // namespace |
86 | | |
87 | | #ifdef ENABLE_THREADING |
88 | | |
89 | | struct TaskGroup::Data |
90 | | { |
91 | | Data (); |
92 | | ~Data (); |
93 | | Data (const Data&) = delete; |
94 | | Data& operator= (const Data&) = delete; |
95 | | Data (Data&&) = delete; |
96 | | Data& operator= (Data&&) = delete; |
97 | | |
98 | | void addTask (); |
99 | | void removeTask (); |
100 | | |
101 | | void waitForEmpty (); |
102 | | |
103 | | std::atomic<int> numPending; |
104 | | std::atomic<int> inFlight; |
105 | | Semaphore isEmpty; // used to signal that the taskgroup is empty |
106 | | }; |
107 | | |
108 | | struct ThreadPool::Data |
109 | | { |
110 | | using ProviderPtr = std::shared_ptr<ThreadPoolProvider>; |
111 | | |
112 | | Data (); |
113 | | ~Data (); |
114 | | Data (const Data&) = delete; |
115 | | Data& operator= (const Data&) = delete; |
116 | | Data (Data&&) = delete; |
117 | | Data& operator= (Data&&) = delete; |
118 | | |
119 | 9.73k | ProviderPtr getProvider () const { return std::atomic_load (&_provider); } |
120 | | |
121 | | void setProvider (ProviderPtr provider) |
122 | 4 | { |
123 | 4 | ProviderPtr curp = std::atomic_exchange (&_provider, provider); |
124 | 4 | if (curp && curp != provider) curp->finish (); |
125 | 4 | } |
126 | | |
127 | | std::shared_ptr<ThreadPoolProvider> _provider; |
128 | | }; |
129 | | |
130 | | namespace |
131 | | { |
132 | | |
133 | | // |
134 | | // class DefaultThreadPoolProvider |
135 | | // |
136 | | class DefaultThreadPoolProvider : public ThreadPoolProvider |
137 | | { |
138 | | public: |
139 | | DefaultThreadPoolProvider (int count); |
140 | | DefaultThreadPoolProvider (const DefaultThreadPoolProvider&) = delete; |
141 | | DefaultThreadPoolProvider& |
142 | | operator= (const DefaultThreadPoolProvider&) = delete; |
143 | | DefaultThreadPoolProvider (DefaultThreadPoolProvider&&) = delete; |
144 | | DefaultThreadPoolProvider& operator= (DefaultThreadPoolProvider&&) = delete; |
145 | | ~DefaultThreadPoolProvider () override; |
146 | | |
147 | | int numThreads () const override; |
148 | | void setNumThreads (int count) override; |
149 | | void addTask (Task* task) override; |
150 | | |
151 | | void finish () override; |
152 | | |
153 | | private: |
154 | | void lockedFinish (); |
155 | | void threadLoop (std::shared_ptr<DefaultThreadPoolData> d); |
156 | | |
157 | | std::shared_ptr<DefaultThreadPoolData> _data; |
158 | | }; |
159 | | |
160 | | DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count) |
161 | 1 | : _data (std::make_shared<DefaultThreadPoolData> ()) |
162 | 1 | { |
163 | 1 | _data->resetAtomics (); |
164 | 1 | setNumThreads (count); |
165 | 1 | } |
166 | | |
167 | | DefaultThreadPoolProvider::~DefaultThreadPoolProvider () |
168 | 1 | {} |
169 | | |
170 | | int |
171 | | DefaultThreadPoolProvider::numThreads () const |
172 | 8.36k | { |
173 | 8.36k | return _data->_threadCount.load (); |
174 | 8.36k | } |
175 | | |
176 | | void |
177 | | DefaultThreadPoolProvider::setNumThreads (int count) |
178 | 1 | { |
179 | | // since we're a private class, the thread pool won't call us if |
180 | | // we aren't changing size so no need to check that... |
181 | | |
182 | 1 | std::lock_guard<std::mutex> lock (_data->_threadMutex); |
183 | | |
184 | 1 | size_t curThreads = _data->_threads.size (); |
185 | 1 | size_t nToAdd = static_cast<size_t> (count); |
186 | | |
187 | 1 | if (nToAdd < curThreads) |
188 | 0 | { |
189 | | // no easy way to only shutdown the n threads at the end of |
190 | | // the vector (well, really, guaranteeing they are the ones to |
191 | | // be woken up), so just kill all of the threads |
192 | 0 | lockedFinish (); |
193 | 0 | curThreads = 0; |
194 | 0 | } |
195 | | |
196 | 1 | _data->_threads.resize (nToAdd); |
197 | 17 | for (size_t i = curThreads; i < nToAdd; ++i) |
198 | 16 | { |
199 | 16 | _data->_threads[i] = |
200 | 16 | std::thread (&DefaultThreadPoolProvider::threadLoop, this, _data); |
201 | 16 | } |
202 | 1 | _data->_threadCount = static_cast<int> (_data->_threads.size ()); |
203 | 1 | } |
204 | | |
205 | | void |
206 | | DefaultThreadPoolProvider::addTask (Task* task) |
207 | 1.36k | { |
208 | | // the thread pool will kill us and switch to a null provider |
209 | | // if the thread count is set to 0, so we can always |
210 | | // go ahead and lock and assume we have a thread to do the |
211 | | // processing |
212 | 1.36k | { |
213 | 1.36k | std::lock_guard<std::mutex> taskLock (_data->_taskMutex); |
214 | | |
215 | | // |
216 | | // Push the new task into the FIFO |
217 | | // |
218 | 1.36k | _data->_tasks.push_back (task); |
219 | 1.36k | } |
220 | | |
221 | | // |
222 | | // Signal that we have a new task to process |
223 | | // |
224 | 1.36k | _data->_taskSemaphore.post (); |
225 | 1.36k | } |
226 | | |
227 | | void |
228 | | DefaultThreadPoolProvider::finish () |
229 | 1 | { |
230 | 1 | std::lock_guard<std::mutex> lock (_data->_threadMutex); |
231 | | |
232 | 1 | lockedFinish (); |
233 | 1 | } |
234 | | |
235 | | void |
236 | | DefaultThreadPoolProvider::lockedFinish () |
237 | 1 | { |
238 | 1 | _data->stop (); |
239 | | |
240 | | // |
241 | | // Signal enough times to allow all threads to stop. |
242 | | // |
243 | | // NB: we must do this as many times as we have threads. |
244 | | // |
245 | | // If there is still work in the queue, or this call happens "too |
246 | | // quickly", threads will not be waiting on the semaphore, so we |
247 | | // need to ensure the semaphore is at a count equal to the amount |
248 | | // of work left plus the number of threads to ensure exit of a |
249 | | // thread. There can be threads in a few states: |
250 | | // - still starting up (successive calls to setNumThreads) |
251 | | // - in the middle of processing a task / looping |
252 | | // - waiting in the semaphore |
253 | 1 | size_t curT = _data->_threads.size (); |
254 | 17 | for (size_t i = 0; i != curT; ++i) |
255 | 16 | _data->_taskSemaphore.post (); |
256 | | |
257 | | // |
258 | | // We should not need to check joinability, they should all, by |
259 | | // definition, be joinable (assuming normal start) |
260 | | // |
261 | 17 | for (size_t i = 0; i != curT; ++i) |
262 | 16 | { |
263 | | // This isn't quite right in that the thread may have actually |
264 | | // be in an exited / signalled state (needing the |
265 | | // WaitForSingleObject call), and so already have an exit code |
266 | | // (I think, but the docs are vague), but if we don't do the |
267 | | // join, the stl thread seems to then throw an exception. The |
268 | | // join should just return invalid handle and continue, and is |
269 | | // more of a windows bug... except maybe someone needs to work |
270 | | // around it... |
271 | | //# ifdef TEST_FOR_WIN_THREAD_STATUS |
272 | | // |
273 | | // // per OIIO issue #2038, on exit / dll unload, windows may |
274 | | // // kill the thread, double check that it is still active prior |
275 | | // // to joining. |
276 | | // DWORD tstatus; |
277 | | // if (GetExitCodeThread (_threads[i].native_handle (), &tstatus)) |
278 | | // { |
279 | | // if (tstatus != STILL_ACTIVE) { continue; } |
280 | | // } |
281 | | //# endif |
282 | | |
283 | 16 | _data->_threads[i].join (); |
284 | 16 | } |
285 | | |
286 | 1 | _data->_threads.clear (); |
287 | | |
288 | 1 | _data->resetAtomics (); |
289 | 1 | } |
290 | | |
291 | | void |
292 | | DefaultThreadPoolProvider::threadLoop ( |
293 | | std::shared_ptr<DefaultThreadPoolData> data) |
294 | 16 | { |
295 | 1.38k | while (true) |
296 | 1.38k | { |
297 | | // |
298 | | // Wait for a task to become available |
299 | | // |
300 | | |
301 | 1.38k | data->_taskSemaphore.wait (); |
302 | | |
303 | 1.38k | { |
304 | 1.38k | std::unique_lock<std::mutex> taskLock (data->_taskMutex); |
305 | | |
306 | | // |
307 | | // If there is a task pending, pop off the next task in the FIFO |
308 | | // |
309 | | |
310 | 1.38k | if (!data->_tasks.empty ()) |
311 | 1.36k | { |
312 | 1.36k | Task* task = data->_tasks.back (); |
313 | 1.36k | data->_tasks.pop_back (); |
314 | | |
315 | | // release the mutex while we process |
316 | 1.36k | taskLock.unlock (); |
317 | | |
318 | 1.36k | handleProcessTask (task); |
319 | | |
320 | | // do not need to reacquire the lock at all since we |
321 | | // will just loop around, pull any other task |
322 | 1.36k | } |
323 | 16 | else if (data->stopped ()) { break; } |
324 | 1.38k | } |
325 | 1.38k | } |
326 | 16 | } |
327 | | |
328 | | } //namespace |
329 | | |
330 | | // |
331 | | // struct TaskGroup::Data |
332 | | // |
333 | | |
334 | 2.09k | TaskGroup::Data::Data () : numPending (0), inFlight (0), isEmpty (1) |
335 | 2.09k | {} |
336 | | |
337 | | TaskGroup::Data::~Data () |
338 | 2.09k | {} |
339 | | |
340 | | void |
341 | | TaskGroup::Data::waitForEmpty () |
342 | 2.09k | { |
343 | | // |
344 | | // A TaskGroup acts like an "inverted" semaphore: if the count |
345 | | // is above 0 then waiting on the taskgroup will block. The |
346 | | // destructor waits until the taskgroup is empty before returning. |
347 | | // |
348 | | |
349 | 2.09k | isEmpty.wait (); |
350 | | |
351 | | // pseudo spin to wait for the notifying thread to finish the post |
352 | | // to avoid a premature deletion of the semaphore |
353 | 2.09k | int count = 0; |
354 | 561k | while (inFlight.load () > 0) |
355 | 559k | { |
356 | 559k | ++count; |
357 | 559k | if (count > 100) |
358 | 5.54k | { |
359 | 5.54k | std::this_thread::yield (); |
360 | 5.54k | count = 0; |
361 | 5.54k | } |
362 | 559k | } |
363 | 2.09k | } |
364 | | |
365 | | void |
366 | | TaskGroup::Data::addTask () |
367 | 1.36k | { |
368 | 1.36k | inFlight.fetch_add (1); |
369 | | |
370 | | // if we are the first task off the rank, clear the |
371 | | // isEmpty semaphore such that the group will actually pause |
372 | | // until the task finishes |
373 | 1.36k | if (numPending.fetch_add (1) == 0) { isEmpty.wait (); } |
374 | 1.36k | } |
375 | | |
376 | | void |
377 | | TaskGroup::Data::removeTask () |
378 | 1.36k | { |
379 | | // if we are the last task, notify the group we're done |
380 | 1.36k | if (numPending.fetch_sub (1) == 1) { isEmpty.post (); } |
381 | | |
382 | | // in theory, a background thread could actually finish a task |
383 | | // prior to the next task being added. The fetch_add / fetch_sub |
384 | | // logic between addTask and removeTask are fine to keep the |
385 | | // inverted semaphore straight. All addTask must happen prior to |
386 | | // the TaskGroup destructor. |
387 | | // |
388 | | // But to let the taskgroup thread waiting know we're actually |
389 | | // finished with the last one and finished posting (the semaphore |
390 | | // might wake up the other thread while in the middle of post) so |
391 | | // we don't destroy the semaphore while posting to it, keep a |
392 | | // separate counter that is modified pre / post semaphore |
393 | 1.36k | inFlight.fetch_sub (1); |
394 | 1.36k | } |
395 | | |
396 | | // |
397 | | // struct ThreadPool::Data |
398 | | // |
399 | | |
400 | | ThreadPool::Data::Data () |
401 | 1 | { |
402 | | // empty |
403 | 1 | } |
404 | | |
405 | | ThreadPool::Data::~Data () |
406 | 1 | { |
407 | 1 | setProvider (nullptr); |
408 | 1 | } |
409 | | |
410 | | #endif // ENABLE_THREADING |
411 | | |
412 | | // |
413 | | // class Task |
414 | | // |
415 | | |
416 | 1.36k | Task::Task (TaskGroup* g) : _group (g) |
417 | 1.36k | { |
418 | 1.36k | #ifdef ENABLE_THREADING |
419 | 1.36k | if (g) g->_data->addTask (); |
420 | 1.36k | #endif |
421 | 1.36k | } |
422 | | |
423 | | Task::~Task () |
424 | 1.36k | { |
425 | | // empty |
426 | 1.36k | } |
427 | | |
428 | | TaskGroup* |
429 | | Task::group () |
430 | 1.36k | { |
431 | 1.36k | return _group; |
432 | 1.36k | } |
433 | | |
434 | | TaskGroup::TaskGroup () |
435 | | : |
436 | | #ifdef ENABLE_THREADING |
437 | 2.09k | _data (new Data) |
438 | | #else |
439 | | _data (nullptr) |
440 | | #endif |
441 | 2.09k | { |
442 | | // empty |
443 | 2.09k | } |
444 | | |
445 | | TaskGroup::~TaskGroup () |
446 | 2.09k | { |
447 | 2.09k | #ifdef ENABLE_THREADING |
448 | 2.09k | _data->waitForEmpty (); |
449 | 2.09k | delete _data; |
450 | 2.09k | #endif |
451 | 2.09k | } |
452 | | |
453 | | void |
454 | | TaskGroup::finishOneTask () |
455 | 1.36k | { |
456 | 1.36k | #ifdef ENABLE_THREADING |
457 | 1.36k | _data->removeTask (); |
458 | 1.36k | #endif |
459 | 1.36k | } |
460 | | |
461 | | // |
462 | | // class ThreadPoolProvider |
463 | | // |
464 | | |
465 | | ThreadPoolProvider::ThreadPoolProvider () |
466 | 1 | {} |
467 | | |
468 | | ThreadPoolProvider::~ThreadPoolProvider () |
469 | 1 | {} |
470 | | |
471 | | // |
472 | | // class ThreadPool |
473 | | // |
474 | | |
475 | | ThreadPool::ThreadPool (unsigned nthreads) |
476 | | : |
477 | | #ifdef ENABLE_THREADING |
478 | 1 | _data (new Data) |
479 | | #else |
480 | | _data (nullptr) |
481 | | #endif |
482 | 1 | { |
483 | 1 | #ifdef ENABLE_THREADING |
484 | 1 | setNumThreads (static_cast<int> (nthreads)); |
485 | 1 | #endif |
486 | 1 | } |
487 | | |
488 | | ThreadPool::~ThreadPool () |
489 | 1 | { |
490 | 1 | #ifdef ENABLE_THREADING |
491 | | // ensures any jobs / threads are finished & shutdown |
492 | 1 | _data->setProvider (nullptr); |
493 | 1 | delete _data; |
494 | 1 | #endif |
495 | 1 | } |
496 | | |
497 | | int |
498 | | ThreadPool::numThreads () const |
499 | 4.18k | { |
500 | 4.18k | #ifdef ENABLE_THREADING |
501 | 4.18k | Data::ProviderPtr sp = _data->getProvider (); |
502 | 4.18k | return (sp) ? sp->numThreads () : 0; |
503 | | #else |
504 | | return 0; |
505 | | #endif |
506 | 4.18k | } |
507 | | |
508 | | void |
509 | | ThreadPool::setNumThreads (int count) |
510 | 4.18k | { |
511 | 4.18k | #ifdef ENABLE_THREADING |
512 | 4.18k | if (count < 0) |
513 | 0 | throw IEX_INTERNAL_NAMESPACE::ArgExc ( |
514 | 0 | "Attempt to set the number of threads " |
515 | 0 | "in a thread pool to a negative value."); |
516 | | |
517 | 4.18k | { |
518 | 4.18k | Data::ProviderPtr sp = _data->getProvider (); |
519 | 4.18k | if (sp) |
520 | 4.18k | { |
521 | 4.18k | int curT = sp->numThreads (); |
522 | 4.18k | if (curT == count) return; |
523 | | |
524 | 0 | if (count != 0) |
525 | 0 | { |
526 | 0 | sp->setNumThreads (count); |
527 | 0 | return; |
528 | 0 | } |
529 | 0 | } |
530 | 4.18k | } |
531 | | |
532 | | // either a null provider or a case where we should switch from |
533 | | // a default provider to a null one or vice-versa |
534 | 2 | if (count == 0) |
535 | 1 | _data->setProvider (nullptr); |
536 | 1 | else |
537 | 1 | _data->setProvider ( |
538 | 1 | std::make_shared<DefaultThreadPoolProvider> (count)); |
539 | | |
540 | | #else |
541 | | // just blindly ignore |
542 | | (void) count; |
543 | | #endif |
544 | 2 | } |
545 | | |
546 | | void |
547 | | ThreadPool::setThreadProvider (ThreadPoolProvider* provider) |
548 | 0 | { |
549 | 0 | #ifdef ENABLE_THREADING |
550 | | // contract is we take ownership and will free the provider |
551 | 0 | _data->setProvider (Data::ProviderPtr (provider)); |
552 | | #else |
553 | | throw IEX_INTERNAL_NAMESPACE::ArgExc ( |
554 | | "Attempt to set a thread provider on a system with threads" |
555 | | " disabled / not available"); |
556 | | #endif |
557 | 0 | } |
558 | | |
559 | | void |
560 | | ThreadPool::addTask (Task* task) |
561 | 1.36k | { |
562 | 1.36k | if (task) |
563 | 1.36k | { |
564 | 1.36k | #ifdef ENABLE_THREADING |
565 | 1.36k | Data::ProviderPtr p = _data->getProvider (); |
566 | 1.36k | if (p) |
567 | 1.36k | { |
568 | 1.36k | p->addTask (task); |
569 | 1.36k | return; |
570 | 1.36k | } |
571 | 0 | #endif |
572 | | |
573 | 0 | handleProcessTask (task); |
574 | 0 | } |
575 | 1.36k | } |
576 | | |
577 | | ThreadPool& |
578 | | ThreadPool::globalThreadPool () |
579 | 9.73k | { |
580 | | // |
581 | | // The global thread pool |
582 | | // |
583 | | |
584 | 9.73k | static ThreadPool gThreadPool (0); |
585 | | |
586 | 9.73k | return gThreadPool; |
587 | 9.73k | } |
588 | | |
589 | | void |
590 | | ThreadPool::addGlobalTask (Task* task) |
591 | 1.36k | { |
592 | 1.36k | globalThreadPool ().addTask (task); |
593 | 1.36k | } |
594 | | |
595 | | unsigned |
596 | | ThreadPool::estimateThreadCountForFileIO () |
597 | 0 | { |
598 | 0 | #ifdef ENABLE_THREADING |
599 | 0 | unsigned rv = std::thread::hardware_concurrency (); |
600 | | // hardware concurrency is not required to work |
601 | 0 | if (rv == 0 || |
602 | 0 | rv > static_cast<unsigned> (std::numeric_limits<int>::max ())) |
603 | 0 | { |
604 | 0 | rv = 1; |
605 | | # if (defined(_WIN32) || defined(_WIN64)) |
606 | | SYSTEM_INFO si; |
607 | | GetNativeSystemInfo (&si); |
608 | | |
609 | | rv = si.dwNumberOfProcessors; |
610 | | # else |
611 | | // linux, bsd, and mac are fine with this |
612 | | // other *nix should be too, right? |
613 | 0 | rv = sysconf (_SC_NPROCESSORS_ONLN); |
614 | 0 | # endif |
615 | 0 | } |
616 | 0 | return rv; |
617 | | #else |
618 | | return 0; |
619 | | #endif |
620 | 0 | } |
621 | | |
622 | | ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT |