/src/libreoffice/comphelper/source/misc/threadpool.cxx
Line | Count | Source |
1 | | /* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ |
2 | | /* |
3 | | * This file is part of the LibreOffice project. |
4 | | * |
5 | | * This Source Code Form is subject to the terms of the Mozilla Public |
6 | | * License, v. 2.0. If a copy of the MPL was not distributed with this |
7 | | * file, You can obtain one at http://mozilla.org/MPL/2.0/. |
8 | | */ |
9 | | |
10 | | #include <comphelper/threadpool.hxx> |
11 | | |
12 | | #include <com/sun/star/uno/Exception.hpp> |
13 | | #include <config_options.h> |
14 | | #include <o3tl/safeint.hxx> |
15 | | #include <sal/config.h> |
16 | | #include <sal/log.hxx> |
17 | | #include <salhelper/thread.hxx> |
18 | | #include <algorithm> |
19 | | #include <memory> |
20 | | #include <thread> |
21 | | #include <chrono> |
22 | | #include <cstddef> |
23 | | #include <comphelper/debuggerinfo.hxx> |
24 | | #include <utility> |
25 | | |
26 | | #if defined HAVE_VALGRIND_HEADERS |
27 | | #include <valgrind/memcheck.h> |
28 | | #endif |
29 | | |
30 | | #if defined(_WIN32) |
31 | | #define WIN32_LEAN_AND_MEAN |
32 | | #include <windows.h> |
33 | | #endif |
34 | | |
35 | | namespace comphelper { |
36 | | |
37 | | /** prevent waiting for a task from inside a task */ |
38 | | #if defined DBG_UTIL && (defined LINUX || defined _WIN32) |
39 | | static thread_local bool gbIsWorkerThread; |
40 | | #endif |
41 | | |
42 | | // used to group thread-tasks for waiting in waitTillDone() |
43 | | class ThreadTaskTag |
44 | | { |
45 | | std::mutex maMutex; |
46 | | sal_Int32 mnTasksWorking; |
47 | | std::condition_variable maTasksComplete; |
48 | | |
49 | | public: |
50 | | ThreadTaskTag(); |
51 | | bool isDone(); |
52 | | void waitUntilDone(); |
53 | | void onTaskWorkerDone(); |
54 | | void onTaskPushed(); |
55 | | }; |
56 | | |
57 | | |
58 | | class ThreadPool::ThreadWorker : public salhelper::Thread |
59 | | { |
60 | | ThreadPool *mpPool; |
61 | | public: |
62 | | |
63 | | explicit ThreadWorker( ThreadPool *pPool ) : |
64 | 10.6k | salhelper::Thread("thread-pool"), |
65 | 10.6k | mpPool( pPool ) |
66 | 10.6k | { |
67 | 10.6k | } |
68 | | |
69 | | virtual void execute() override |
70 | 10.6k | { |
71 | | #if defined DBG_UTIL && (defined LINUX || defined _WIN32) |
72 | | gbIsWorkerThread = true; |
73 | | #endif |
74 | 10.6k | std::unique_lock< std::mutex > aGuard( mpPool->maMutex ); |
75 | | |
76 | 38.4k | while( !mpPool->mbTerminate ) |
77 | 27.7k | { |
78 | 27.7k | std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true ); |
79 | 27.7k | if( pTask ) |
80 | 17.1k | { |
81 | 17.1k | std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag); |
82 | 17.1k | mpPool->incBusyWorker(); |
83 | 17.1k | aGuard.unlock(); |
84 | | |
85 | 17.1k | pTask->exec(); |
86 | 17.1k | pTask.reset(); |
87 | | |
88 | 17.1k | aGuard.lock(); |
89 | 17.1k | mpPool->decBusyWorker(); |
90 | 17.1k | pTag->onTaskWorkerDone(); |
91 | 17.1k | } |
92 | 27.7k | } |
93 | 10.6k | } |
94 | | }; |
95 | | |
96 | | ThreadPool::ThreadPool(std::size_t nWorkers) |
97 | 22 | : mbTerminate(true) |
98 | 22 | , mnMaxWorkers(nWorkers) |
99 | 22 | , mnBusyWorkers(0) |
100 | 22 | { |
101 | 22 | } |
102 | | |
103 | | ThreadPool::~ThreadPool() |
104 | 0 | { |
105 | | // note: calling shutdown from global variable dtor blocks forever on Win7 |
106 | | // note2: there isn't enough MSVCRT left on exit to call assert() properly |
107 | | // so these asserts just print something to stderr but exit status is |
108 | | // still 0, but hopefully they will be more helpful on non-WNT platforms |
109 | 0 | assert(mbTerminate); |
110 | 0 | assert(maTasks.empty()); |
111 | 0 | assert(mnBusyWorkers == 0); |
112 | 0 | } |
113 | | |
114 | | namespace { |
115 | | |
116 | | std::shared_ptr< ThreadPool >& GetStaticThreadPool() |
117 | 169k | { |
118 | 169k | static std::shared_ptr< ThreadPool > POOL = |
119 | 169k | []() |
120 | 169k | { |
121 | 22 | const std::size_t nThreads = ThreadPool::getPreferredConcurrency(); |
122 | 22 | return std::make_shared< ThreadPool >( nThreads ); |
123 | 22 | }(); |
124 | 169k | return POOL; |
125 | 169k | } |
126 | | |
127 | | } |
128 | | |
129 | | ThreadPool& ThreadPool::getSharedOptimalPool() |
130 | 169k | { |
131 | 169k | return *GetStaticThreadPool(); |
132 | 169k | } |
133 | | |
134 | | std::size_t ThreadPool::getPreferredConcurrency() |
135 | 22 | { |
136 | 22 | static std::size_t ThreadCount = []() |
137 | 22 | { |
138 | 22 | const std::size_t nHardThreads = o3tl::clamp_to_unsigned<std::size_t>( |
139 | 22 | std::max(std::thread::hardware_concurrency(), 1U)); |
140 | 22 | std::size_t nThreads = nHardThreads; |
141 | 22 | const char *pEnv = getenv("MAX_CONCURRENCY"); |
142 | 22 | if (pEnv != nullptr) |
143 | 0 | { |
144 | | // Override with user/admin preference. |
145 | 0 | nThreads = o3tl::clamp_to_unsigned<std::size_t>(rtl_str_toInt32(pEnv, 10)); |
146 | 0 | } |
147 | | |
148 | 22 | nThreads = std::min(nHardThreads, nThreads); |
149 | 22 | return std::max<std::size_t>(nThreads, 1); |
150 | 22 | }(); |
151 | | |
152 | 22 | return ThreadCount; |
153 | 22 | } |
154 | | |
155 | | // Used to order shutdown, and to ensure there are no lingering |
156 | | // threads after LibreOfficeKit pre-init. |
157 | | void ThreadPool::shutdown() |
158 | 0 | { |
159 | | // if (mbTerminate) |
160 | | // return; |
161 | |
|
162 | 0 | std::unique_lock< std::mutex > aGuard( maMutex ); |
163 | 0 | shutdownLocked(aGuard); |
164 | 0 | } |
165 | | |
166 | | void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard) |
167 | 169k | { |
168 | 169k | if( maWorkers.empty() ) |
169 | 165k | { // no threads at all -> execute the work in-line |
170 | 165k | std::unique_ptr<ThreadTask> pTask; |
171 | 165k | while ( ( pTask = popWorkLocked(aGuard, false) ) ) |
172 | 0 | { |
173 | 0 | std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag); |
174 | 0 | pTask->exec(); |
175 | 0 | pTag->onTaskWorkerDone(); |
176 | 0 | } |
177 | 165k | } |
178 | 3.88k | else |
179 | 3.88k | { |
180 | 3.88k | while( !maTasks.empty() ) |
181 | 0 | { |
182 | 0 | maTasksChanged.wait( aGuard ); |
183 | | // In the (unlikely but possible?) case pushTask() gets called meanwhile, |
184 | | // its notify_one() call is meant to wake a up a thread and process the task. |
185 | | // But if this code gets woken up instead, it could lead to a deadlock. |
186 | | // Pass on the notification. |
187 | 0 | maTasksChanged.notify_one(); |
188 | 0 | } |
189 | 3.88k | } |
190 | 169k | assert( maTasks.empty() ); |
191 | | |
192 | 169k | mbTerminate = true; |
193 | | |
194 | 169k | maTasksChanged.notify_all(); |
195 | | |
196 | 169k | decltype(maWorkers) aWorkers; |
197 | 169k | std::swap(maWorkers, aWorkers); |
198 | 169k | aGuard.unlock(); |
199 | | |
200 | 180k | while (!aWorkers.empty()) |
201 | 10.6k | { |
202 | 10.6k | rtl::Reference<ThreadWorker> xWorker = aWorkers.back(); |
203 | 10.6k | aWorkers.pop_back(); |
204 | 10.6k | assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker) |
205 | 10.6k | == aWorkers.end()); |
206 | 10.6k | { |
207 | 10.6k | xWorker->join(); |
208 | 10.6k | xWorker.clear(); |
209 | 10.6k | } |
210 | 10.6k | } |
211 | 169k | } |
212 | | |
213 | | void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask ) |
214 | 17.1k | { |
215 | 17.1k | std::scoped_lock< std::mutex > aGuard( maMutex ); |
216 | | |
217 | 17.1k | mbTerminate = false; |
218 | | |
219 | | // Worked on tasks are already removed from maTasks, so include the count of busy workers. |
220 | 17.1k | if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers) |
221 | 10.6k | { |
222 | 10.6k | maWorkers.push_back( new ThreadWorker( this ) ); |
223 | 10.6k | maWorkers.back()->launch(); |
224 | 10.6k | } |
225 | | |
226 | 17.1k | pTask->mpTag->onTaskPushed(); |
227 | 17.1k | maTasks.insert( maTasks.begin(), std::move(pTask) ); |
228 | | |
229 | 17.1k | maTasksChanged.notify_one(); |
230 | 17.1k | } |
231 | | |
232 | | std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait ) |
233 | 193k | { |
234 | 193k | do |
235 | 200k | { |
236 | 200k | if( !maTasks.empty() ) |
237 | 17.1k | { |
238 | 17.1k | std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back()); |
239 | 17.1k | maTasks.pop_back(); |
240 | 17.1k | return pTask; |
241 | 17.1k | } |
242 | 183k | else if (!bWait || mbTerminate) |
243 | 165k | return nullptr; |
244 | | |
245 | 17.1k | maTasksChanged.wait( rGuard ); |
246 | | |
247 | 17.1k | } while (!mbTerminate); |
248 | | |
249 | 10.6k | return nullptr; |
250 | 193k | } |
251 | | |
252 | | void ThreadPool::incBusyWorker() |
253 | 17.1k | { |
254 | 17.1k | ++mnBusyWorkers; |
255 | 17.1k | } |
256 | | |
257 | | void ThreadPool::decBusyWorker() |
258 | 17.1k | { |
259 | 17.1k | assert(mnBusyWorkers >= 1); |
260 | 17.1k | --mnBusyWorkers; |
261 | 17.1k | } |
262 | | |
263 | | void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin) |
264 | 4.14k | { |
265 | | #if defined DBG_UTIL && (defined LINUX || defined _WIN32) |
266 | | assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task"); |
267 | | #endif |
268 | 4.14k | { |
269 | 4.14k | std::unique_lock< std::mutex > aGuard( maMutex ); |
270 | | |
271 | 4.14k | if( maWorkers.empty() ) |
272 | 60 | { // no threads at all -> execute the work in-line |
273 | 60 | while (!rTag->isDone()) |
274 | 0 | { |
275 | 0 | std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false); |
276 | 0 | if (!pTask) |
277 | 0 | break; |
278 | 0 | std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag); |
279 | 0 | pTask->exec(); |
280 | 0 | pTag->onTaskWorkerDone(); |
281 | 0 | } |
282 | 60 | } |
283 | 4.14k | } |
284 | | |
285 | 4.14k | rTag->waitUntilDone(); |
286 | | |
287 | 4.14k | if (bJoin) |
288 | 3.88k | joinThreadsIfIdle(); |
289 | 4.14k | } |
290 | | |
291 | | bool ThreadPool::joinThreadsIfIdle() |
292 | 169k | { |
293 | 169k | std::unique_lock< std::mutex > aGuard( maMutex ); |
294 | 169k | if (isIdle()) // check if there are still tasks from another tag |
295 | 169k | { |
296 | 169k | shutdownLocked(aGuard); |
297 | 169k | return true; |
298 | 169k | } |
299 | 0 | return false; |
300 | 169k | } |
301 | | |
302 | | std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag() |
303 | 4.14k | { |
304 | 4.14k | return std::make_shared<ThreadTaskTag>(); |
305 | 4.14k | } |
306 | | |
307 | | bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag) |
308 | 0 | { |
309 | 0 | return pTag->isDone(); |
310 | 0 | } |
311 | | |
312 | | ThreadTask::ThreadTask(std::shared_ptr<ThreadTaskTag> xTag) |
313 | 17.1k | : mpTag(std::move(xTag)) |
314 | 17.1k | { |
315 | 17.1k | } |
316 | | |
317 | | void ThreadTask::exec() |
318 | 17.0k | { |
319 | 17.0k | try { |
320 | 17.0k | doWork(); |
321 | 17.0k | } |
322 | 17.0k | catch (const std::exception &e) |
323 | 17.0k | { |
324 | 0 | SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what()); |
325 | 0 | } |
326 | 17.0k | catch (const css::uno::Exception &e) |
327 | 17.0k | { |
328 | 0 | SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e); |
329 | 0 | } |
330 | 17.0k | catch (...) |
331 | 17.0k | { |
332 | 0 | SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()"); |
333 | 0 | } |
334 | 17.0k | } |
335 | | |
336 | 4.14k | ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0) |
337 | 4.14k | { |
338 | 4.14k | } |
339 | | |
340 | | void ThreadTaskTag::onTaskPushed() |
341 | 17.1k | { |
342 | 17.1k | std::scoped_lock< std::mutex > aGuard( maMutex ); |
343 | 17.1k | mnTasksWorking++; |
344 | 17.1k | assert( mnTasksWorking < 65536 ); // sanity checking |
345 | 17.1k | } |
346 | | |
347 | | void ThreadTaskTag::onTaskWorkerDone() |
348 | 17.1k | { |
349 | 17.1k | std::scoped_lock< std::mutex > aGuard( maMutex ); |
350 | 17.1k | mnTasksWorking--; |
351 | 17.1k | assert(mnTasksWorking >= 0); |
352 | 17.1k | if (mnTasksWorking == 0) |
353 | 4.08k | maTasksComplete.notify_all(); |
354 | 17.1k | } |
355 | | |
356 | | bool ThreadTaskTag::isDone() |
357 | 60 | { |
358 | 60 | std::scoped_lock< std::mutex > aGuard( maMutex ); |
359 | 60 | return mnTasksWorking == 0; |
360 | 60 | } |
361 | | |
362 | | void ThreadTaskTag::waitUntilDone() |
363 | 4.14k | { |
364 | 4.14k | std::unique_lock< std::mutex > aGuard( maMutex ); |
365 | 4.42k | while( mnTasksWorking > 0 ) |
366 | 280 | { |
367 | | #if defined DBG_UTIL && !defined NDEBUG |
368 | | // 10 minute timeout in debug mode, unless the code is built with |
369 | | // sanitizers or debugged in valgrind or gdb, in which case the threads |
370 | | // should not time out in the middle of a debugging session |
371 | | int maxTimeout = 10 * 60; |
372 | | #if !ENABLE_RUNTIME_OPTIMIZATIONS |
373 | | maxTimeout = 30 * 60; |
374 | | #endif |
375 | | #if defined HAVE_VALGRIND_HEADERS |
376 | | if( RUNNING_ON_VALGRIND ) |
377 | | maxTimeout = 30 * 60; |
378 | | #endif |
379 | | if( isDebuggerAttached()) |
380 | | maxTimeout = 300 * 60; |
381 | | std::cv_status result = maTasksComplete.wait_for( |
382 | | aGuard, std::chrono::seconds( maxTimeout )); |
383 | | assert(result != std::cv_status::timeout); |
384 | | #else |
385 | | // 10 minute timeout in production so the app eventually throws some kind of error |
386 | 280 | if (maTasksComplete.wait_for( |
387 | 280 | aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout) |
388 | 0 | throw std::runtime_error("timeout waiting for threadpool tasks"); |
389 | 280 | #endif |
390 | 280 | } |
391 | 4.14k | } |
392 | | |
393 | | } // namespace comphelper |
394 | | |
395 | | /* vim:set shiftwidth=4 softtabstop=4 expandtab: */ |