Coverage Report

Created: 2025-11-16 07:29

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/log4cplus/threadpool/ThreadPool.h
Line
Count
Source
1
// -*- C++ -*-
2
// Copyright (c) 2012-2015 Jakob Progsch
3
//
4
// This software is provided 'as-is', without any express or implied
5
// warranty. In no event will the authors be held liable for any damages
6
// arising from the use of this software.
7
//
8
// Permission is granted to anyone to use this software for any purpose,
9
// including commercial applications, and to alter it and redistribute it
10
// freely, subject to the following restrictions:
11
//
12
//    1. The origin of this software must not be misrepresented; you must not
13
//    claim that you wrote the original software. If you use this software
14
//    in a product, an acknowledgment in the product documentation would be
15
//    appreciated but is not required.
16
//
17
//    2. Altered source versions must be plainly marked as such, and must not be
18
//    misrepresented as being the original software.
19
//
20
//    3. This notice may not be removed or altered from any source
21
//    distribution.
22
//
23
// Modified for log4cplus, copyright (c) 2014-2015 Václav Zeman.
24
25
#ifndef THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
26
#define THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c
27
28
#include <vector>
29
#include <queue>
30
#include <memory>
31
#include <thread>
32
#include <mutex>
33
#include <condition_variable>
34
#include <future>
35
#include <atomic>
36
#include <functional>
37
#include <stdexcept>
38
#include <algorithm>
39
#include <cassert>
40
41
42
namespace progschj {
43
44
class would_block
45
    : public std::runtime_error
46
{
47
    using std::runtime_error::runtime_error;
48
};
49
50
51
class ThreadPool {
52
public:
53
    template <typename F, typename... Args>
54
    using return_type =
55
#if defined(__cpp_lib_is_invocable) && __cpp_lib_is_invocable >= 201703L
56
            typename std::invoke_result<F&&, Args&&...>::type;
57
#else
58
            typename std::result_of<F&& (Args&&...)>::type;
59
#endif
60
61
    explicit ThreadPool(std::size_t threads
62
        = (std::max)(2u, std::thread::hardware_concurrency()));
63
    template <typename F, typename... Args>
64
    auto enqueue_block(F&& f, Args&&... args) -> std::future<return_type<F, Args...>>;
65
    template <typename F, typename... Args>
66
    auto enqueue(F&& f, Args&&... args) -> std::future<return_type<F, Args...>>;
67
    void wait_until_empty();
68
    void wait_until_nothing_in_flight();
69
    void set_queue_size_limit(std::size_t limit);
70
    void set_pool_size(std::size_t limit);
71
    ~ThreadPool();
72
73
private:
74
    void start_worker(std::size_t worker_number,
75
        std::unique_lock<std::mutex> const &lock);
76
77
    template <typename F, typename... Args>
78
    auto enqueue_worker(bool, F&& f, Args&&... args) -> std::future<return_type<F, Args...>>;
79
80
    template <typename T>
81
    static std::future<T> make_exception_future (std::exception_ptr ex_ptr);
82
83
    // need to keep track of threads so we can join them
84
    std::vector< std::thread > workers;
85
    // target pool size
86
    std::size_t pool_size;
87
    // the task queue
88
    std::queue< std::function<void()> > tasks;
89
    // queue length limit
90
    std::size_t max_queue_size = 100000;
91
    // stop signal
92
    bool stop = false;
93
94
    // synchronization
95
    std::mutex queue_mutex;
96
    std::condition_variable condition_producers;
97
    std::condition_variable condition_consumers;
98
99
    std::mutex in_flight_mutex;
100
    std::condition_variable in_flight_condition;
101
    std::atomic<std::size_t> in_flight;
102
103
    struct handle_in_flight_decrement
104
    {
105
        ThreadPool & tp;
106
107
        handle_in_flight_decrement(ThreadPool & tp_)
108
0
            : tp(tp_)
109
0
        { }
110
111
        ~handle_in_flight_decrement()
112
0
        {
113
0
            std::size_t prev
114
0
                = std::atomic_fetch_sub_explicit(&tp.in_flight,
115
0
                    std::size_t(1),
116
0
                    std::memory_order_acq_rel);
117
0
            if (prev == 1)
118
0
            {
119
0
                std::unique_lock<std::mutex> guard(tp.in_flight_mutex);
120
0
                tp.in_flight_condition.notify_all();
121
0
            }
122
0
        }
123
    };
124
};
125
126
// the constructor just launches some amount of workers
127
inline ThreadPool::ThreadPool(std::size_t threads)
128
36
    : pool_size(threads)
129
36
    , in_flight(0)
130
36
{
131
36
    std::unique_lock<std::mutex> lock(this->queue_mutex);
132
180
    for (std::size_t i = 0; i != threads; ++i)
133
144
        start_worker(i, lock);
134
36
}
135
136
// add new work item to the pool and block if the queue is full
137
template<class F, class... Args>
138
auto ThreadPool::enqueue_block(F&& f, Args&&... args) -> std::future<return_type<F, Args...>>
139
0
{
140
0
    return enqueue_worker (true, std::forward<F> (f), std::forward<Args> (args)...);
141
0
}
142
143
// add new work item to the pool and return future with would_block exception if it is full
144
template<class F, class... Args>
145
auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<return_type<F, Args...>>
146
0
{
147
0
    return enqueue_worker (false, std::forward<F> (f), std::forward<Args> (args)...);
148
0
}
149
150
template <typename F, typename... Args>
151
auto ThreadPool::enqueue_worker(bool block, F&& f, Args&&... args) -> std::future<return_type<F, Args...>>
152
0
{
153
0
    auto task = std::make_shared< std::packaged_task<return_type<F, Args...>()> >(
154
0
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
155
0
        );
156
157
0
    std::future<return_type<F, Args...>> res = task->get_future();
158
159
0
    std::unique_lock<std::mutex> lock(queue_mutex);
160
161
0
    if (tasks.size () >= max_queue_size)
162
0
    {
163
0
        if (block)
164
0
        {
165
            // wait for the queue to empty or be stopped
166
0
            condition_producers.wait(lock,
167
0
                [this]
168
0
                {
169
0
                    return tasks.size () < max_queue_size
170
0
                        || stop;
171
0
                });
Unexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&)::{lambda()#1}::operator()() const
Unexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&)::{lambda()#1}::operator()() const
172
0
        }
173
0
        else
174
0
        {
175
0
            return ThreadPool::make_exception_future<return_type<F, Args...>> (
176
0
                std::make_exception_ptr (would_block("queue full")));
177
0
        }
178
0
    }
179
180
181
    // don't allow enqueueing after stopping the pool
182
0
    if (stop)
183
0
        throw std::runtime_error("enqueue on stopped ThreadPool");
184
185
0
    tasks.emplace([task](){ (*task)(); });
Unexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&)::{lambda()#2}::operator()() const
Unexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&)::{lambda()#2}::operator()() const
186
0
    std::atomic_fetch_add_explicit(&in_flight,
187
0
        std::size_t(1),
188
0
        std::memory_order_relaxed);
189
0
    condition_consumers.notify_one();
190
191
0
    return res;
192
0
}
Unexecuted instantiation: global-init.cxx:std::__1::future<std::__1::invoke_result<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&>::type> progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&)
Unexecuted instantiation: global-init.cxx:std::__1::future<std::__1::invoke_result<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&>::type> progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&)
193
194
// the destructor joins all threads
195
inline ThreadPool::~ThreadPool()
196
0
{
197
0
    std::unique_lock<std::mutex> lock(queue_mutex);
198
0
    stop = true;
199
0
    pool_size = 0;
200
0
    condition_consumers.notify_all();
201
0
    condition_producers.notify_all();
202
0
    condition_consumers.wait(lock, [this]{ return this->workers.empty(); });
203
0
    assert(in_flight == 0);
204
0
}
205
206
inline void ThreadPool::wait_until_empty()
207
304k
{
208
304k
    std::unique_lock<std::mutex> lock(this->queue_mutex);
209
304k
    this->condition_producers.wait(lock,
210
304k
        [this]{ return this->tasks.empty(); });
211
304k
}
212
213
inline void ThreadPool::wait_until_nothing_in_flight()
214
304k
{
215
304k
    std::unique_lock<std::mutex> lock(this->in_flight_mutex);
216
304k
    this->in_flight_condition.wait(lock,
217
304k
        [this]{ return this->in_flight == 0; });
218
304k
}
219
220
inline void ThreadPool::set_queue_size_limit(std::size_t limit)
221
0
{
222
0
    std::unique_lock<std::mutex> lock(this->queue_mutex);
223
224
0
    if (stop)
225
0
        return;
226
227
0
    std::size_t const old_limit = max_queue_size;
228
0
    max_queue_size = (std::max)(limit, std::size_t(1));
229
0
    if (old_limit < max_queue_size)
230
0
        condition_producers.notify_all();
231
0
}
232
233
inline void ThreadPool::set_pool_size(std::size_t limit)
234
116k
{
235
116k
    if (limit < 1)
236
0
        limit = 1;
237
238
116k
    std::unique_lock<std::mutex> lock(this->queue_mutex);
239
240
116k
    if (stop)
241
0
        return;
242
243
116k
    std::size_t const old_size = pool_size;
244
116k
    assert(this->workers.size() >= old_size);
245
246
116k
    pool_size = limit;
247
116k
    if (pool_size > old_size)
248
0
    {
249
        // create new worker threads
250
        // it is possible that some of these are still running because
251
        // they have not stopped yet after a pool size reduction, such
252
        // workers will just keep running
253
0
        for (std::size_t i = old_size; i != pool_size; ++i)
254
0
            start_worker(i, lock);
255
0
    }
256
116k
    else if (pool_size < old_size)
257
        // notify all worker threads to start downsizing
258
0
        this->condition_consumers.notify_all();
259
116k
}
260
261
inline void ThreadPool::start_worker(
262
    std::size_t worker_number, std::unique_lock<std::mutex> const &lock)
263
144
{
264
144
    assert(lock.owns_lock() && lock.mutex() == &this->queue_mutex);
265
144
    assert(worker_number <= this->workers.size());
266
267
144
    auto worker_func =
268
144
        [this, worker_number]
269
144
        {
270
144
            for(;;)
271
144
            {
272
144
                std::function<void()> task;
273
144
                bool notify;
274
275
144
                {
276
144
                    std::unique_lock<std::mutex> lock(this->queue_mutex);
277
144
                    this->condition_consumers.wait(lock,
278
144
                        [this, worker_number]{
279
144
                            return this->stop || !this->tasks.empty()
280
144
                                || pool_size < worker_number + 1; });
281
282
                    // deal with downsizing of thread pool or shutdown
283
144
                    if ((this->stop && this->tasks.empty())
284
0
                        || (!this->stop && pool_size < worker_number + 1))
285
0
                    {
286
                        // detach this worker, effectively marking it stopped
287
0
                        this->workers[worker_number].detach();
288
                        // downsize the workers vector as much as possible
289
0
                        while (this->workers.size() > pool_size
290
0
                             && !this->workers.back().joinable())
291
0
                            this->workers.pop_back();
292
                        // if this is was last worker, notify the destructor
293
0
                        if (this->workers.empty())
294
0
                            this->condition_consumers.notify_all();
295
0
                        return;
296
0
                    }
297
144
                    else if (!this->tasks.empty())
298
0
                    {
299
0
                        task = std::move(this->tasks.front());
300
0
                        this->tasks.pop();
301
0
                        notify = this->tasks.size() + 1 ==  max_queue_size
302
0
                            || this->tasks.empty();
303
0
                    }
304
144
                    else
305
144
                        continue;
306
144
                }
307
308
0
                handle_in_flight_decrement guard(*this);
309
310
0
                if (notify)
311
0
                {
312
0
                    std::unique_lock<std::mutex> lock(this->queue_mutex);
313
0
                    condition_producers.notify_all();
314
0
                }
315
316
0
                task();
317
0
            }
318
144
        };
319
320
144
    if (worker_number < this->workers.size()) {
321
0
        std::thread & worker = this->workers[worker_number];
322
        // start only if not already running
323
0
        if (!worker.joinable()) {
324
0
            worker = std::thread(worker_func);
325
0
        }
326
0
    } else
327
144
        this->workers.push_back(std::thread(worker_func));
328
144
}
329
330
template <typename T>
331
inline std::future<T> ThreadPool::make_exception_future (std::exception_ptr ex_ptr)
332
0
{
333
0
    std::promise<T> p;
334
0
    p.set_exception (ex_ptr);
335
0
    return p.get_future ();
336
0
}
337
338
} // namespace progschj
339
340
#endif // THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c