Coverage Report

Created: 2026-04-01 07:24

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/openexr/src/lib/IlmThread/IlmThreadPool.cpp
Line
Count
Source
1
//
2
// SPDX-License-Identifier: BSD-3-Clause
3
// Copyright (c) Contributors to the OpenEXR Project.
4
//
5
6
//-----------------------------------------------------------------------------
7
//
8
//  class Task, class ThreadPool, class TaskGroup
9
//
10
//-----------------------------------------------------------------------------
11
12
#include "IlmThreadPool.h"
13
#include "Iex.h"
14
#include "IlmThread.h"
15
#include "IlmThreadSemaphore.h"
16
17
#include <atomic>
18
#include <limits>
19
#include <memory>
20
#include <mutex>
21
#include <thread>
22
#include <vector>
23
24
#if (defined(_WIN32) || defined(_WIN64))
25
#    include <windows.h>
26
#else
27
#    include <unistd.h>
28
#endif
29
30
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_ENTER
31
32
#if ILMTHREAD_THREADING_ENABLED
33
#    define ENABLE_THREADING
34
#endif
35
36
namespace
37
{
38
39
static inline void
40
handleProcessTask (Task* task)
41
1.36k
{
42
1.36k
    if (task)
43
1.36k
    {
44
1.36k
        TaskGroup* taskGroup = task->group ();
45
46
1.36k
        task->execute ();
47
48
        // kill the task prior to notifying the group
49
        // such that any internal reference-based
50
        // semantics will be handled prior to
51
        // the task group destructor letting it out
52
        // of the scope of those references
53
1.36k
        delete task;
54
55
1.36k
        if (taskGroup) taskGroup->finishOneTask ();
56
1.36k
    }
57
1.36k
}
58
59
struct DefaultThreadPoolData
60
{
61
    Semaphore          _taskSemaphore; // threads wait on this for ready tasks
62
    mutable std::mutex _taskMutex;     // mutual exclusion for the tasks list
63
    std::vector<Task*> _tasks;         // the list of tasks to execute
64
65
    mutable std::mutex       _threadMutex; // mutual exclusion for threads list
66
    std::vector<std::thread> _threads;     // the list of all threads
67
68
    std::atomic<int>  _threadCount;
69
    std::atomic<bool> _stopping;
70
71
    inline bool stopped () const
72
16
    {
73
16
        return _stopping.load (std::memory_order_relaxed);
74
16
    }
75
76
1
    inline void stop () { _stopping = true; }
77
78
    inline void resetAtomics ()
79
2
    {
80
2
        _threadCount = 0;
81
2
        _stopping    = false;
82
2
    }
83
};
84
85
} // namespace
86
87
#ifdef ENABLE_THREADING
88
89
struct TaskGroup::Data
90
{
91
    Data ();
92
    ~Data ();
93
    Data (const Data&)            = delete;
94
    Data& operator= (const Data&) = delete;
95
    Data (Data&&)                 = delete;
96
    Data& operator= (Data&&)      = delete;
97
98
    void addTask ();
99
    void removeTask ();
100
101
    void waitForEmpty ();
102
103
    std::atomic<int> numPending;
104
    std::atomic<int> inFlight;
105
    Semaphore        isEmpty; // used to signal that the taskgroup is empty
106
};
107
108
struct ThreadPool::Data
109
{
110
    using ProviderPtr = std::shared_ptr<ThreadPoolProvider>;
111
112
    Data ();
113
    ~Data ();
114
    Data (const Data&)            = delete;
115
    Data& operator= (const Data&) = delete;
116
    Data (Data&&)                 = delete;
117
    Data& operator= (Data&&)      = delete;
118
119
9.73k
    ProviderPtr getProvider () const { return std::atomic_load (&_provider); }
120
121
    void setProvider (ProviderPtr provider)
122
4
    {
123
4
        ProviderPtr curp = std::atomic_exchange (&_provider, provider);
124
4
        if (curp && curp != provider) curp->finish ();
125
4
    }
126
127
    std::shared_ptr<ThreadPoolProvider> _provider;
128
};
129
130
namespace
131
{
132
133
//
134
// class DefaultThreadPoolProvider
135
//
136
class DefaultThreadPoolProvider : public ThreadPoolProvider
137
{
138
public:
139
    DefaultThreadPoolProvider (int count);
140
    DefaultThreadPoolProvider (const DefaultThreadPoolProvider&) = delete;
141
    DefaultThreadPoolProvider&
142
    operator= (const DefaultThreadPoolProvider&)                       = delete;
143
    DefaultThreadPoolProvider (DefaultThreadPoolProvider&&)            = delete;
144
    DefaultThreadPoolProvider& operator= (DefaultThreadPoolProvider&&) = delete;
145
    ~DefaultThreadPoolProvider () override;
146
147
    int  numThreads () const override;
148
    void setNumThreads (int count) override;
149
    void addTask (Task* task) override;
150
151
    void finish () override;
152
153
private:
154
    void lockedFinish ();
155
    void threadLoop (std::shared_ptr<DefaultThreadPoolData> d);
156
157
    std::shared_ptr<DefaultThreadPoolData> _data;
158
};
159
160
DefaultThreadPoolProvider::DefaultThreadPoolProvider (int count)
161
1
    : _data (std::make_shared<DefaultThreadPoolData> ())
162
1
{
163
1
    _data->resetAtomics ();
164
1
    setNumThreads (count);
165
1
}
166
167
DefaultThreadPoolProvider::~DefaultThreadPoolProvider ()
168
1
{}
169
170
int
171
DefaultThreadPoolProvider::numThreads () const
172
8.36k
{
173
8.36k
    return _data->_threadCount.load ();
174
8.36k
}
175
176
void
177
DefaultThreadPoolProvider::setNumThreads (int count)
178
1
{
179
    // since we're a private class, the thread pool won't call us if
180
    // we aren't changing size so no need to check that...
181
182
1
    std::lock_guard<std::mutex> lock (_data->_threadMutex);
183
184
1
    size_t curThreads = _data->_threads.size ();
185
1
    size_t nToAdd     = static_cast<size_t> (count);
186
187
1
    if (nToAdd < curThreads)
188
0
    {
189
        // no easy way to only shutdown the n threads at the end of
190
        // the vector (well, really, guaranteeing they are the ones to
191
        // be woken up), so just kill all of the threads
192
0
        lockedFinish ();
193
0
        curThreads = 0;
194
0
    }
195
196
1
    _data->_threads.resize (nToAdd);
197
17
    for (size_t i = curThreads; i < nToAdd; ++i)
198
16
    {
199
16
        _data->_threads[i] =
200
16
            std::thread (&DefaultThreadPoolProvider::threadLoop, this, _data);
201
16
    }
202
1
    _data->_threadCount = static_cast<int> (_data->_threads.size ());
203
1
}
204
205
void
206
DefaultThreadPoolProvider::addTask (Task* task)
207
1.36k
{
208
    // the thread pool will kill us and switch to a null provider
209
    // if the thread count is set to 0, so we can always
210
    // go ahead and lock and assume we have a thread to do the
211
    // processing
212
1.36k
    {
213
1.36k
        std::lock_guard<std::mutex> taskLock (_data->_taskMutex);
214
215
        //
216
        // Push the new task into the FIFO
217
        //
218
1.36k
        _data->_tasks.push_back (task);
219
1.36k
    }
220
221
    //
222
    // Signal that we have a new task to process
223
    //
224
1.36k
    _data->_taskSemaphore.post ();
225
1.36k
}
226
227
void
228
DefaultThreadPoolProvider::finish ()
229
1
{
230
1
    std::lock_guard<std::mutex> lock (_data->_threadMutex);
231
232
1
    lockedFinish ();
233
1
}
234
235
void
236
DefaultThreadPoolProvider::lockedFinish ()
237
1
{
238
1
    _data->stop ();
239
240
    //
241
    // Signal enough times to allow all threads to stop.
242
    //
243
    // NB: we must do this as many times as we have threads.
244
    //
245
    // If there is still work in the queue, or this call happens "too
246
    // quickly", threads will not be waiting on the semaphore, so we
247
    // need to ensure the semaphore is at a count equal to the amount
248
    // of work left plus the number of threads to ensure exit of a
249
    // thread. There can be threads in a few states:
250
    //   - still starting up (successive calls to setNumThreads)
251
    //   - in the middle of processing a task / looping
252
    //   - waiting in the semaphore
253
1
    size_t curT = _data->_threads.size ();
254
17
    for (size_t i = 0; i != curT; ++i)
255
16
        _data->_taskSemaphore.post ();
256
257
    //
258
    // We should not need to check joinability, they should all, by
259
    // definition, be joinable (assuming normal start)
260
    //
261
17
    for (size_t i = 0; i != curT; ++i)
262
16
    {
263
        // This isn't quite right in that the thread may have actually
264
        // be in an exited / signalled state (needing the
265
        // WaitForSingleObject call), and so already have an exit code
266
        // (I think, but the docs are vague), but if we don't do the
267
        // join, the stl thread seems to then throw an exception. The
268
        // join should just return invalid handle and continue, and is
269
        // more of a windows bug... except maybe someone needs to work
270
        // around it...
271
        //#    ifdef TEST_FOR_WIN_THREAD_STATUS
272
        //
273
        //        // per OIIO issue #2038, on exit / dll unload, windows may
274
        //        // kill the thread, double check that it is still active prior
275
        //        // to joining.
276
        //        DWORD tstatus;
277
        //        if (GetExitCodeThread (_threads[i].native_handle (), &tstatus))
278
        //        {
279
        //            if (tstatus != STILL_ACTIVE) { continue; }
280
        //        }
281
        //#    endif
282
283
16
        _data->_threads[i].join ();
284
16
    }
285
286
1
    _data->_threads.clear ();
287
288
1
    _data->resetAtomics ();
289
1
}
290
291
void
292
DefaultThreadPoolProvider::threadLoop (
293
    std::shared_ptr<DefaultThreadPoolData> data)
294
16
{
295
1.38k
    while (true)
296
1.38k
    {
297
        //
298
        // Wait for a task to become available
299
        //
300
301
1.38k
        data->_taskSemaphore.wait ();
302
303
1.38k
        {
304
1.38k
            std::unique_lock<std::mutex> taskLock (data->_taskMutex);
305
306
            //
307
            // If there is a task pending, pop off the next task in the FIFO
308
            //
309
310
1.38k
            if (!data->_tasks.empty ())
311
1.36k
            {
312
1.36k
                Task* task = data->_tasks.back ();
313
1.36k
                data->_tasks.pop_back ();
314
315
                // release the mutex while we process
316
1.36k
                taskLock.unlock ();
317
318
1.36k
                handleProcessTask (task);
319
320
                // do not need to reacquire the lock at all since we
321
                // will just loop around, pull any other task
322
1.36k
            }
323
16
            else if (data->stopped ()) { break; }
324
1.38k
        }
325
1.38k
    }
326
16
}
327
328
} //namespace
329
330
//
331
// struct TaskGroup::Data
332
//
333
334
2.09k
TaskGroup::Data::Data () : numPending (0), inFlight (0), isEmpty (1)
335
2.09k
{}
336
337
TaskGroup::Data::~Data ()
338
2.09k
{}
339
340
void
341
TaskGroup::Data::waitForEmpty ()
342
2.09k
{
343
    //
344
    // A TaskGroup acts like an "inverted" semaphore: if the count
345
    // is above 0 then waiting on the taskgroup will block.  The
346
    // destructor waits until the taskgroup is empty before returning.
347
    //
348
349
2.09k
    isEmpty.wait ();
350
351
    // pseudo spin to wait for the notifying thread to finish the post
352
    // to avoid a premature deletion of the semaphore
353
2.09k
    int count = 0;
354
561k
    while (inFlight.load () > 0)
355
559k
    {
356
559k
        ++count;
357
559k
        if (count > 100)
358
5.54k
        {
359
5.54k
            std::this_thread::yield ();
360
5.54k
            count = 0;
361
5.54k
        }
362
559k
    }
363
2.09k
}
364
365
void
366
TaskGroup::Data::addTask ()
367
1.36k
{
368
1.36k
    inFlight.fetch_add (1);
369
370
    // if we are the first task off the rank, clear the
371
    // isEmpty semaphore such that the group will actually pause
372
    // until the task finishes
373
1.36k
    if (numPending.fetch_add (1) == 0) { isEmpty.wait (); }
374
1.36k
}
375
376
void
377
TaskGroup::Data::removeTask ()
378
1.36k
{
379
    // if we are the last task, notify the group we're done
380
1.36k
    if (numPending.fetch_sub (1) == 1) { isEmpty.post (); }
381
382
    // in theory, a background thread could actually finish a task
383
    // prior to the next task being added. The fetch_add / fetch_sub
384
    // logic between addTask and removeTask are fine to keep the
385
    // inverted semaphore straight. All addTask must happen prior to
386
    // the TaskGroup destructor.
387
    //
388
    // But to let the taskgroup thread waiting know we're actually
389
    // finished with the last one and finished posting (the semaphore
390
    // might wake up the other thread while in the middle of post) so
391
    // we don't destroy the semaphore while posting to it, keep a
392
    // separate counter that is modified pre / post semaphore
393
1.36k
    inFlight.fetch_sub (1);
394
1.36k
}
395
396
//
397
// struct ThreadPool::Data
398
//
399
400
ThreadPool::Data::Data ()
401
1
{
402
    // empty
403
1
}
404
405
ThreadPool::Data::~Data ()
406
1
{
407
1
    setProvider (nullptr);
408
1
}
409
410
#endif // ENABLE_THREADING
411
412
//
413
// class Task
414
//
415
416
1.36k
Task::Task (TaskGroup* g) : _group (g)
417
1.36k
{
418
1.36k
#ifdef ENABLE_THREADING
419
1.36k
    if (g) g->_data->addTask ();
420
1.36k
#endif
421
1.36k
}
422
423
Task::~Task ()
424
1.36k
{
425
    // empty
426
1.36k
}
427
428
TaskGroup*
429
Task::group ()
430
1.36k
{
431
1.36k
    return _group;
432
1.36k
}
433
434
TaskGroup::TaskGroup ()
435
    :
436
#ifdef ENABLE_THREADING
437
2.09k
    _data (new Data)
438
#else
439
    _data (nullptr)
440
#endif
441
2.09k
{
442
    // empty
443
2.09k
}
444
445
TaskGroup::~TaskGroup ()
446
2.09k
{
447
2.09k
#ifdef ENABLE_THREADING
448
2.09k
    _data->waitForEmpty ();
449
2.09k
    delete _data;
450
2.09k
#endif
451
2.09k
}
452
453
void
454
TaskGroup::finishOneTask ()
455
1.36k
{
456
1.36k
#ifdef ENABLE_THREADING
457
1.36k
    _data->removeTask ();
458
1.36k
#endif
459
1.36k
}
460
461
//
462
// class ThreadPoolProvider
463
//
464
465
ThreadPoolProvider::ThreadPoolProvider ()
466
1
{}
467
468
ThreadPoolProvider::~ThreadPoolProvider ()
469
1
{}
470
471
//
472
// class ThreadPool
473
//
474
475
ThreadPool::ThreadPool (unsigned nthreads)
476
    :
477
#ifdef ENABLE_THREADING
478
1
    _data (new Data)
479
#else
480
    _data (nullptr)
481
#endif
482
1
{
483
1
#ifdef ENABLE_THREADING
484
1
    setNumThreads (static_cast<int> (nthreads));
485
1
#endif
486
1
}
487
488
ThreadPool::~ThreadPool ()
489
1
{
490
1
#ifdef ENABLE_THREADING
491
    // ensures any jobs / threads are finished & shutdown
492
1
    _data->setProvider (nullptr);
493
1
    delete _data;
494
1
#endif
495
1
}
496
497
int
498
ThreadPool::numThreads () const
499
4.18k
{
500
4.18k
#ifdef ENABLE_THREADING
501
4.18k
    Data::ProviderPtr sp = _data->getProvider ();
502
4.18k
    return (sp) ? sp->numThreads () : 0;
503
#else
504
    return 0;
505
#endif
506
4.18k
}
507
508
void
509
ThreadPool::setNumThreads (int count)
510
4.18k
{
511
4.18k
#ifdef ENABLE_THREADING
512
4.18k
    if (count < 0)
513
0
        throw IEX_INTERNAL_NAMESPACE::ArgExc (
514
0
            "Attempt to set the number of threads "
515
0
            "in a thread pool to a negative value.");
516
517
4.18k
    {
518
4.18k
        Data::ProviderPtr sp = _data->getProvider ();
519
4.18k
        if (sp)
520
4.18k
        {
521
4.18k
            int curT = sp->numThreads ();
522
4.18k
            if (curT == count) return;
523
524
0
            if (count != 0)
525
0
            {
526
0
                sp->setNumThreads (count);
527
0
                return;
528
0
            }
529
0
        }
530
4.18k
    }
531
532
    // either a null provider or a case where we should switch from
533
    // a default provider to a null one or vice-versa
534
2
    if (count == 0)
535
1
        _data->setProvider (nullptr);
536
1
    else
537
1
        _data->setProvider (
538
1
            std::make_shared<DefaultThreadPoolProvider> (count));
539
540
#else
541
    // just blindly ignore
542
    (void) count;
543
#endif
544
2
}
545
546
void
547
ThreadPool::setThreadProvider (ThreadPoolProvider* provider)
548
0
{
549
0
#ifdef ENABLE_THREADING
550
    // contract is we take ownership and will free the provider
551
0
    _data->setProvider (Data::ProviderPtr (provider));
552
#else
553
    throw IEX_INTERNAL_NAMESPACE::ArgExc (
554
        "Attempt to set a thread provider on a system with threads"
555
        " disabled / not available");
556
#endif
557
0
}
558
559
void
560
ThreadPool::addTask (Task* task)
561
1.36k
{
562
1.36k
    if (task)
563
1.36k
    {
564
1.36k
#ifdef ENABLE_THREADING
565
1.36k
        Data::ProviderPtr p = _data->getProvider ();
566
1.36k
        if (p)
567
1.36k
        {
568
1.36k
            p->addTask (task);
569
1.36k
            return;
570
1.36k
        }
571
0
#endif
572
573
0
        handleProcessTask (task);
574
0
    }
575
1.36k
}
576
577
ThreadPool&
578
ThreadPool::globalThreadPool ()
579
9.73k
{
580
    //
581
    // The global thread pool
582
    //
583
584
9.73k
    static ThreadPool gThreadPool (0);
585
586
9.73k
    return gThreadPool;
587
9.73k
}
588
589
void
590
ThreadPool::addGlobalTask (Task* task)
591
1.36k
{
592
1.36k
    globalThreadPool ().addTask (task);
593
1.36k
}
594
595
unsigned
596
ThreadPool::estimateThreadCountForFileIO ()
597
0
{
598
0
#ifdef ENABLE_THREADING
599
0
    unsigned rv = std::thread::hardware_concurrency ();
600
    // hardware concurrency is not required to work
601
0
    if (rv == 0 ||
602
0
        rv > static_cast<unsigned> (std::numeric_limits<int>::max ()))
603
0
    {
604
0
        rv = 1;
605
#    if (defined(_WIN32) || defined(_WIN64))
606
        SYSTEM_INFO si;
607
        GetNativeSystemInfo (&si);
608
609
        rv = si.dwNumberOfProcessors;
610
#    else
611
        // linux, bsd, and mac are fine with this
612
        // other *nix should be too, right?
613
0
        rv = sysconf (_SC_NPROCESSORS_ONLN);
614
0
#    endif
615
0
    }
616
0
    return rv;
617
#else
618
    return 0;
619
#endif
620
0
}
621
622
ILMTHREAD_INTERNAL_NAMESPACE_SOURCE_EXIT