/src/log4cplus/threadpool/ThreadPool.h
Line | Count | Source |
1 | | // -*- C++ -*- |
2 | | // Copyright (c) 2012-2015 Jakob Progsch |
3 | | // |
4 | | // This software is provided 'as-is', without any express or implied |
5 | | // warranty. In no event will the authors be held liable for any damages |
6 | | // arising from the use of this software. |
7 | | // |
8 | | // Permission is granted to anyone to use this software for any purpose, |
9 | | // including commercial applications, and to alter it and redistribute it |
10 | | // freely, subject to the following restrictions: |
11 | | // |
12 | | // 1. The origin of this software must not be misrepresented; you must not |
13 | | // claim that you wrote the original software. If you use this software |
14 | | // in a product, an acknowledgment in the product documentation would be |
15 | | // appreciated but is not required. |
16 | | // |
17 | | // 2. Altered source versions must be plainly marked as such, and must not be |
18 | | // misrepresented as being the original software. |
19 | | // |
20 | | // 3. This notice may not be removed or altered from any source |
21 | | // distribution. |
22 | | // |
23 | | // Modified for log4cplus, copyright (c) 2014-2015 Václav Zeman. |
24 | | |
25 | | #ifndef THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c |
26 | | #define THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c |
27 | | |
28 | | #include <vector> |
29 | | #include <queue> |
30 | | #include <memory> |
31 | | #include <thread> |
32 | | #include <mutex> |
33 | | #include <condition_variable> |
34 | | #include <future> |
35 | | #include <atomic> |
36 | | #include <functional> |
37 | | #include <stdexcept> |
38 | | #include <algorithm> |
39 | | #include <cassert> |
40 | | |
41 | | |
42 | | namespace progschj { |
43 | | |
44 | | class would_block |
45 | | : public std::runtime_error |
46 | | { |
47 | | using std::runtime_error::runtime_error; |
48 | | }; |
49 | | |
50 | | |
51 | | class ThreadPool { |
52 | | public: |
53 | | template <typename F, typename... Args> |
54 | | using return_type = |
55 | | #if defined(__cpp_lib_is_invocable) && __cpp_lib_is_invocable >= 201703L |
56 | | typename std::invoke_result<F&&, Args&&...>::type; |
57 | | #else |
58 | | typename std::result_of<F&& (Args&&...)>::type; |
59 | | #endif |
60 | | |
61 | | explicit ThreadPool(std::size_t threads |
62 | | = (std::max)(2u, std::thread::hardware_concurrency())); |
63 | | template <typename F, typename... Args> |
64 | | auto enqueue_block(F&& f, Args&&... args) -> std::future<return_type<F, Args...>>; |
65 | | template <typename F, typename... Args> |
66 | | auto enqueue(F&& f, Args&&... args) -> std::future<return_type<F, Args...>>; |
67 | | void wait_until_empty(); |
68 | | void wait_until_nothing_in_flight(); |
69 | | void set_queue_size_limit(std::size_t limit); |
70 | | void set_pool_size(std::size_t limit); |
71 | | ~ThreadPool(); |
72 | | |
73 | | private: |
74 | | void start_worker(std::size_t worker_number, |
75 | | std::unique_lock<std::mutex> const &lock); |
76 | | |
77 | | template <typename F, typename... Args> |
78 | | auto enqueue_worker(bool, F&& f, Args&&... args) -> std::future<return_type<F, Args...>>; |
79 | | |
80 | | template <typename T> |
81 | | static std::future<T> make_exception_future (std::exception_ptr ex_ptr); |
82 | | |
83 | | // need to keep track of threads so we can join them |
84 | | std::vector< std::thread > workers; |
85 | | // target pool size |
86 | | std::size_t pool_size; |
87 | | // the task queue |
88 | | std::queue< std::function<void()> > tasks; |
89 | | // queue length limit |
90 | | std::size_t max_queue_size = 100000; |
91 | | // stop signal |
92 | | bool stop = false; |
93 | | |
94 | | // synchronization |
95 | | std::mutex queue_mutex; |
96 | | std::condition_variable condition_producers; |
97 | | std::condition_variable condition_consumers; |
98 | | |
99 | | std::mutex in_flight_mutex; |
100 | | std::condition_variable in_flight_condition; |
101 | | std::atomic<std::size_t> in_flight; |
102 | | |
103 | | struct handle_in_flight_decrement |
104 | | { |
105 | | ThreadPool & tp; |
106 | | |
107 | | handle_in_flight_decrement(ThreadPool & tp_) |
108 | 0 | : tp(tp_) |
109 | 0 | { } |
110 | | |
111 | | ~handle_in_flight_decrement() |
112 | 0 | { |
113 | 0 | std::size_t prev |
114 | 0 | = std::atomic_fetch_sub_explicit(&tp.in_flight, |
115 | 0 | std::size_t(1), |
116 | 0 | std::memory_order_acq_rel); |
117 | 0 | if (prev == 1) |
118 | 0 | { |
119 | 0 | std::unique_lock<std::mutex> guard(tp.in_flight_mutex); |
120 | 0 | tp.in_flight_condition.notify_all(); |
121 | 0 | } |
122 | 0 | } |
123 | | }; |
124 | | }; |
125 | | |
126 | | // the constructor just launches some amount of workers |
127 | | inline ThreadPool::ThreadPool(std::size_t threads) |
128 | 36 | : pool_size(threads) |
129 | 36 | , in_flight(0) |
130 | 36 | { |
131 | 36 | std::unique_lock<std::mutex> lock(this->queue_mutex); |
132 | 180 | for (std::size_t i = 0; i != threads; ++i) |
133 | 144 | start_worker(i, lock); |
134 | 36 | } |
135 | | |
136 | | // add new work item to the pool and block if the queue is full |
137 | | template<class F, class... Args> |
138 | | auto ThreadPool::enqueue_block(F&& f, Args&&... args) -> std::future<return_type<F, Args...>> |
139 | 0 | { |
140 | 0 | return enqueue_worker (true, std::forward<F> (f), std::forward<Args> (args)...); |
141 | 0 | } |
142 | | |
143 | | // add new work item to the pool and return future with would_block exception if it is full |
144 | | template<class F, class... Args> |
145 | | auto ThreadPool::enqueue(F&& f, Args&&... args) -> std::future<return_type<F, Args...>> |
146 | 0 | { |
147 | 0 | return enqueue_worker (false, std::forward<F> (f), std::forward<Args> (args)...); |
148 | 0 | } |
149 | | |
150 | | template <typename F, typename... Args> |
151 | | auto ThreadPool::enqueue_worker(bool block, F&& f, Args&&... args) -> std::future<return_type<F, Args...>> |
152 | 0 | { |
153 | 0 | auto task = std::make_shared< std::packaged_task<return_type<F, Args...>()> >( |
154 | 0 | std::bind(std::forward<F>(f), std::forward<Args>(args)...) |
155 | 0 | ); |
156 | |
|
157 | 0 | std::future<return_type<F, Args...>> res = task->get_future(); |
158 | |
|
159 | 0 | std::unique_lock<std::mutex> lock(queue_mutex); |
160 | |
|
161 | 0 | if (tasks.size () >= max_queue_size) |
162 | 0 | { |
163 | 0 | if (block) |
164 | 0 | { |
165 | | // wait for the queue to empty or be stopped |
166 | 0 | condition_producers.wait(lock, |
167 | 0 | [this] |
168 | 0 | { |
169 | 0 | return tasks.size () < max_queue_size |
170 | 0 | || stop; |
171 | 0 | }); Unexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&)::{lambda()#1}::operator()() constUnexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&)::{lambda()#1}::operator()() const |
172 | 0 | } |
173 | 0 | else |
174 | 0 | { |
175 | 0 | return ThreadPool::make_exception_future<return_type<F, Args...>> ( |
176 | 0 | std::make_exception_ptr (would_block("queue full"))); |
177 | 0 | } |
178 | 0 | } |
179 | | |
180 | | |
181 | | // don't allow enqueueing after stopping the pool |
182 | 0 | if (stop) |
183 | 0 | throw std::runtime_error("enqueue on stopped ThreadPool"); |
184 | | |
185 | 0 | tasks.emplace([task](){ (*task)(); });Unexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&)::{lambda()#2}::operator()() constUnexecuted instantiation: global-init.cxx:progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&)::{lambda()#2}::operator()() const |
186 | 0 | std::atomic_fetch_add_explicit(&in_flight, |
187 | 0 | std::size_t(1), |
188 | 0 | std::memory_order_relaxed); |
189 | 0 | condition_consumers.notify_one(); |
190 | |
|
191 | 0 | return res; |
192 | 0 | } Unexecuted instantiation: global-init.cxx:std::__1::future<std::__1::invoke_result<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&>::type> progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_0&&) Unexecuted instantiation: global-init.cxx:std::__1::future<std::__1::invoke_result<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&>::type> progschj::ThreadPool::enqueue_worker<log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1>(bool, log4cplus::enqueueAsyncDoAppend(log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> const&, log4cplus::spi::InternalLoggingEvent const&)::$_1&&) |
193 | | |
194 | | // the destructor joins all threads |
195 | | inline ThreadPool::~ThreadPool() |
196 | 0 | { |
197 | 0 | std::unique_lock<std::mutex> lock(queue_mutex); |
198 | 0 | stop = true; |
199 | 0 | pool_size = 0; |
200 | 0 | condition_consumers.notify_all(); |
201 | 0 | condition_producers.notify_all(); |
202 | 0 | condition_consumers.wait(lock, [this]{ return this->workers.empty(); }); |
203 | 0 | assert(in_flight == 0); |
204 | 0 | } |
205 | | |
206 | | inline void ThreadPool::wait_until_empty() |
207 | 304k | { |
208 | 304k | std::unique_lock<std::mutex> lock(this->queue_mutex); |
209 | 304k | this->condition_producers.wait(lock, |
210 | 304k | [this]{ return this->tasks.empty(); }); |
211 | 304k | } |
212 | | |
213 | | inline void ThreadPool::wait_until_nothing_in_flight() |
214 | 304k | { |
215 | 304k | std::unique_lock<std::mutex> lock(this->in_flight_mutex); |
216 | 304k | this->in_flight_condition.wait(lock, |
217 | 304k | [this]{ return this->in_flight == 0; }); |
218 | 304k | } |
219 | | |
220 | | inline void ThreadPool::set_queue_size_limit(std::size_t limit) |
221 | 0 | { |
222 | 0 | std::unique_lock<std::mutex> lock(this->queue_mutex); |
223 | |
|
224 | 0 | if (stop) |
225 | 0 | return; |
226 | | |
227 | 0 | std::size_t const old_limit = max_queue_size; |
228 | 0 | max_queue_size = (std::max)(limit, std::size_t(1)); |
229 | 0 | if (old_limit < max_queue_size) |
230 | 0 | condition_producers.notify_all(); |
231 | 0 | } |
232 | | |
233 | | inline void ThreadPool::set_pool_size(std::size_t limit) |
234 | 116k | { |
235 | 116k | if (limit < 1) |
236 | 0 | limit = 1; |
237 | | |
238 | 116k | std::unique_lock<std::mutex> lock(this->queue_mutex); |
239 | | |
240 | 116k | if (stop) |
241 | 0 | return; |
242 | | |
243 | 116k | std::size_t const old_size = pool_size; |
244 | 116k | assert(this->workers.size() >= old_size); |
245 | | |
246 | 116k | pool_size = limit; |
247 | 116k | if (pool_size > old_size) |
248 | 0 | { |
249 | | // create new worker threads |
250 | | // it is possible that some of these are still running because |
251 | | // they have not stopped yet after a pool size reduction, such |
252 | | // workers will just keep running |
253 | 0 | for (std::size_t i = old_size; i != pool_size; ++i) |
254 | 0 | start_worker(i, lock); |
255 | 0 | } |
256 | 116k | else if (pool_size < old_size) |
257 | | // notify all worker threads to start downsizing |
258 | 0 | this->condition_consumers.notify_all(); |
259 | 116k | } |
260 | | |
261 | | inline void ThreadPool::start_worker( |
262 | | std::size_t worker_number, std::unique_lock<std::mutex> const &lock) |
263 | 144 | { |
264 | 144 | assert(lock.owns_lock() && lock.mutex() == &this->queue_mutex); |
265 | 144 | assert(worker_number <= this->workers.size()); |
266 | | |
267 | 144 | auto worker_func = |
268 | 144 | [this, worker_number] |
269 | 144 | { |
270 | 144 | for(;;) |
271 | 144 | { |
272 | 144 | std::function<void()> task; |
273 | 144 | bool notify; |
274 | | |
275 | 144 | { |
276 | 144 | std::unique_lock<std::mutex> lock(this->queue_mutex); |
277 | 144 | this->condition_consumers.wait(lock, |
278 | 144 | [this, worker_number]{ |
279 | 144 | return this->stop || !this->tasks.empty() |
280 | 144 | || pool_size < worker_number + 1; }); |
281 | | |
282 | | // deal with downsizing of thread pool or shutdown |
283 | 144 | if ((this->stop && this->tasks.empty()) |
284 | 0 | || (!this->stop && pool_size < worker_number + 1)) |
285 | 0 | { |
286 | | // detach this worker, effectively marking it stopped |
287 | 0 | this->workers[worker_number].detach(); |
288 | | // downsize the workers vector as much as possible |
289 | 0 | while (this->workers.size() > pool_size |
290 | 0 | && !this->workers.back().joinable()) |
291 | 0 | this->workers.pop_back(); |
292 | | // if this is was last worker, notify the destructor |
293 | 0 | if (this->workers.empty()) |
294 | 0 | this->condition_consumers.notify_all(); |
295 | 0 | return; |
296 | 0 | } |
297 | 144 | else if (!this->tasks.empty()) |
298 | 0 | { |
299 | 0 | task = std::move(this->tasks.front()); |
300 | 0 | this->tasks.pop(); |
301 | 0 | notify = this->tasks.size() + 1 == max_queue_size |
302 | 0 | || this->tasks.empty(); |
303 | 0 | } |
304 | 144 | else |
305 | 144 | continue; |
306 | 144 | } |
307 | | |
308 | 0 | handle_in_flight_decrement guard(*this); |
309 | |
|
310 | 0 | if (notify) |
311 | 0 | { |
312 | 0 | std::unique_lock<std::mutex> lock(this->queue_mutex); |
313 | 0 | condition_producers.notify_all(); |
314 | 0 | } |
315 | |
|
316 | 0 | task(); |
317 | 0 | } |
318 | 144 | }; |
319 | | |
320 | 144 | if (worker_number < this->workers.size()) { |
321 | 0 | std::thread & worker = this->workers[worker_number]; |
322 | | // start only if not already running |
323 | 0 | if (!worker.joinable()) { |
324 | 0 | worker = std::thread(worker_func); |
325 | 0 | } |
326 | 0 | } else |
327 | 144 | this->workers.push_back(std::thread(worker_func)); |
328 | 144 | } |
329 | | |
330 | | template <typename T> |
331 | | inline std::future<T> ThreadPool::make_exception_future (std::exception_ptr ex_ptr) |
332 | 0 | { |
333 | 0 | std::promise<T> p; |
334 | 0 | p.set_exception (ex_ptr); |
335 | 0 | return p.get_future (); |
336 | 0 | } |
337 | | |
338 | | } // namespace progschj |
339 | | |
340 | | #endif // THREAD_POOL_H_7ea1ee6b_4f17_4c09_b76b_3d44e102400c |