Coverage Report

Created: 2026-04-29 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/openexr/src/lib/IlmThread/IlmThreadProcessGroup.h
Line
Count
Source
1
//
2
// SPDX-License-Identifier: BSD-3-Clause
3
// Copyright (c) Contributors to the OpenEXR Project.
4
//
5
6
#ifndef INCLUDED_ILM_THREAD_PROCESS_GROUP_H
7
#define INCLUDED_ILM_THREAD_PROCESS_GROUP_H
8
9
//-----------------------------------------------------------------------------
10
//
11
// Class ProcessGroup is a templated inline helper for constraining
12
// task contexts to a number of threads. It maintains a list of
13
// contexts and then can hand them out one at a time, waiting for a
14
// previous thread request to finish before handing out more,
15
// preventing over-subscription / allocation of contexts.
16
//
17
//-----------------------------------------------------------------------------
18
19
#include "IlmThreadConfig.h"
20
#include "IlmThreadExport.h"
21
#include "IlmThreadNamespace.h"
22
#include "IlmThreadSemaphore.h"
23
24
#include "Iex.h"
25
26
#include <atomic>
27
#include <string>
28
#include <type_traits>
29
#include <vector>
30
31
ILMTHREAD_INTERNAL_NAMESPACE_HEADER_ENTER
32
33
template <typename P,
34
    std::enable_if_t <
35
        std::is_default_constructible <P>::value &&
36
        std::is_same <decltype (P {}.next), P *>::value, bool> = true>
37
class ProcessGroup
38
{
39
public:
40
    using Process = P;
41
42
    ProcessGroup (unsigned int numThreads)
43
0
        : _sem (numThreads)
44
0
        , _avail_head (nullptr)
45
0
        , _first_failure (nullptr)
46
0
    {
47
0
        _fixed_pool.resize (numThreads);
48
0
        for ( unsigned int i = 0; i < numThreads; ++i )
49
0
        {
50
0
            if (i == (numThreads - 1))
51
0
                _fixed_pool[i].next = nullptr;
52
0
            else
53
0
                _fixed_pool[i].next = &(_fixed_pool[i+1]);
54
0
        }
55
0
        _avail_head = &(_fixed_pool[0]);
56
0
    }
Unexecuted instantiation: ImfScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::ProcessGroup(unsigned int)
Unexecuted instantiation: ImfTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::ProcessGroup(unsigned int)
Unexecuted instantiation: ImfDeepScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::ProcessGroup(unsigned int)
Unexecuted instantiation: ImfDeepTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::ProcessGroup(unsigned int)
57
58
    ProcessGroup (const ProcessGroup&) = delete;
59
    ProcessGroup& operator= (const ProcessGroup&) = delete;
60
    ProcessGroup (ProcessGroup&&) = default;
61
    ProcessGroup& operator= (ProcessGroup&&) = delete;
62
    ~ProcessGroup()
63
0
    {
64
0
        std::string *cur = _first_failure.load ();
65
0
        delete cur;
66
0
    }
Unexecuted instantiation: ImfScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::~ProcessGroup()
Unexecuted instantiation: ImfTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::~ProcessGroup()
Unexecuted instantiation: ImfDeepScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::~ProcessGroup()
Unexecuted instantiation: ImfDeepTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::~ProcessGroup()
67
68
    void push (Process *p)
69
0
    {
70
0
        Process* oldhead = _avail_head.load (std::memory_order_relaxed);
71
72
0
        do
73
0
        {
74
0
            p->next = oldhead;
75
0
        } while (!_avail_head.compare_exchange_weak (
76
0
                     oldhead, p,
77
0
                     std::memory_order_release,
78
0
                     std::memory_order_relaxed));
79
80
        // notify someone else there's one available
81
0
        _sem.post ();
82
0
    }
Unexecuted instantiation: ImfScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::push(Imf_3_3::(anonymous namespace)::ScanLineProcess*)
Unexecuted instantiation: ImfTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::push(Imf_3_3::(anonymous namespace)::TileProcess*)
Unexecuted instantiation: ImfDeepScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::push(Imf_3_3::(anonymous namespace)::ScanLineProcess*)
Unexecuted instantiation: ImfDeepTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::push(Imf_3_3::(anonymous namespace)::TileProcess*)
83
84
    // called by the thread dispatching work units, may block
85
    Process* pop ()
86
0
    {
87
0
        Process* ret = nullptr;
88
89
        // we do not have to worry about ABA problems as
90
        // we have a static pool of items we own, we're just
91
        // putting them here and popping them off.
92
93
        // used for honoring the numThreads, as pop
94
        // should only be called by the one thread
95
        // waiting to submit thread calls
96
0
        _sem.wait ();
97
98
0
        ret = _avail_head.load (std::memory_order_acquire);
99
100
0
        Process* newhead;
101
0
        do
102
0
        {
103
0
            if (!ret)
104
0
                std::cerr << "GACK: serious failure case???" << std::endl;
105
106
0
            newhead = ret->next;
107
0
        } while ( !_avail_head.compare_exchange_weak(
108
0
                      ret, newhead, std::memory_order_acquire));
109
110
0
        return ret;
111
0
    }
Unexecuted instantiation: ImfScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::pop()
Unexecuted instantiation: ImfTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::pop()
Unexecuted instantiation: ImfDeepScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::pop()
Unexecuted instantiation: ImfDeepTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::pop()
112
113
    void record_failure (const char *e)
114
0
    {
115
        // should we construct a list of failures if there are
116
        // more than one? seems less confusing to just report
117
        // the first we happened to record
118
119
0
        std::string *cur = _first_failure.load ();
120
0
        if (!cur)
121
0
        {
122
0
            std::string *msg = new std::string (e);
123
0
            if (! _first_failure.compare_exchange_strong (cur, msg))
124
0
                delete msg;
125
0
        }
126
0
    }
Unexecuted instantiation: ImfScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::record_failure(char const*)
Unexecuted instantiation: ImfTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::record_failure(char const*)
Unexecuted instantiation: ImfDeepScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::record_failure(char const*)
Unexecuted instantiation: ImfDeepTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::record_failure(char const*)
127
128
    void throw_on_failure ()
129
0
    {
130
0
        std::string *cur = _first_failure.load ();
131
0
        _first_failure.store (nullptr);
132
133
0
        if (cur)
134
0
        {
135
0
            std::string msg (*cur);
136
0
            delete cur;
137
138
0
            throw IEX_NAMESPACE::IoExc (msg);
139
0
        }
140
0
    }
Unexecuted instantiation: ImfScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::throw_on_failure()
Unexecuted instantiation: ImfTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::throw_on_failure()
Unexecuted instantiation: ImfDeepScanLineInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::ScanLineProcess, true>::throw_on_failure()
Unexecuted instantiation: ImfDeepTiledInputFile.cpp:IlmThread_3_3::ProcessGroup<Imf_3_3::(anonymous namespace)::TileProcess, true>::throw_on_failure()
141
private:
142
    Semaphore _sem;
143
144
    std::vector<Process>   _fixed_pool;
145
146
    std::atomic<Process *> _avail_head;
147
148
    std::atomic<std::string *> _first_failure;
149
};
150
151
152
ILMTHREAD_INTERNAL_NAMESPACE_HEADER_EXIT
153
154
#endif // INCLUDED_ILM_THREAD_POOL_H