Coverage Report

Created: 2026-04-09 11:41

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libreoffice/comphelper/source/misc/threadpool.cxx
Line
Count
Source
1
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
/*
3
 * This file is part of the LibreOffice project.
4
 *
5
 * This Source Code Form is subject to the terms of the Mozilla Public
6
 * License, v. 2.0. If a copy of the MPL was not distributed with this
7
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
8
 */
9
10
#include <comphelper/threadpool.hxx>
11
12
#include <com/sun/star/uno/Exception.hpp>
13
#include <config_options.h>
14
#include <o3tl/safeint.hxx>
15
#include <sal/config.h>
16
#include <sal/log.hxx>
17
#include <salhelper/thread.hxx>
18
#include <algorithm>
19
#include <memory>
20
#include <thread>
21
#include <chrono>
22
#include <cstddef>
23
#include <comphelper/debuggerinfo.hxx>
24
#include <utility>
25
26
#if defined HAVE_VALGRIND_HEADERS
27
#include <valgrind/memcheck.h>
28
#endif
29
30
#if defined(_WIN32)
31
#define WIN32_LEAN_AND_MEAN
32
#include <windows.h>
33
#endif
34
35
namespace comphelper {
36
37
/** prevent waiting for a task from inside a task */
38
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
39
static thread_local bool gbIsWorkerThread;
40
#endif
41
42
// used to group thread-tasks for waiting in waitTillDone()
43
class ThreadTaskTag
44
{
45
    std::mutex maMutex;
46
    sal_Int32 mnTasksWorking;
47
    std::condition_variable maTasksComplete;
48
49
public:
50
    ThreadTaskTag();
51
    bool isDone();
52
    void waitUntilDone();
53
    void onTaskWorkerDone();
54
    void onTaskPushed();
55
};
56
57
58
class ThreadPool::ThreadWorker : public salhelper::Thread
59
{
60
    ThreadPool *mpPool;
61
public:
62
63
    explicit ThreadWorker( ThreadPool *pPool ) :
64
10.6k
        salhelper::Thread("thread-pool"),
65
10.6k
        mpPool( pPool )
66
10.6k
    {
67
10.6k
    }
68
69
    virtual void execute() override
70
10.6k
    {
71
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
72
        gbIsWorkerThread = true;
73
#endif
74
10.6k
        std::unique_lock< std::mutex > aGuard( mpPool->maMutex );
75
76
38.4k
        while( !mpPool->mbTerminate )
77
27.7k
        {
78
27.7k
            std::unique_ptr<ThreadTask> pTask = mpPool->popWorkLocked( aGuard, true );
79
27.7k
            if( pTask )
80
17.1k
            {
81
17.1k
                std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
82
17.1k
                mpPool->incBusyWorker();
83
17.1k
                aGuard.unlock();
84
85
17.1k
                pTask->exec();
86
17.1k
                pTask.reset();
87
88
17.1k
                aGuard.lock();
89
17.1k
                mpPool->decBusyWorker();
90
17.1k
                pTag->onTaskWorkerDone();
91
17.1k
            }
92
27.7k
        }
93
10.6k
    }
94
};
95
96
ThreadPool::ThreadPool(std::size_t nWorkers)
97
22
    : mbTerminate(true)
98
22
    , mnMaxWorkers(nWorkers)
99
22
    , mnBusyWorkers(0)
100
22
{
101
22
}
102
103
ThreadPool::~ThreadPool()
104
0
{
105
    // note: calling shutdown from global variable dtor blocks forever on Win7
106
    // note2: there isn't enough MSVCRT left on exit to call assert() properly
107
    // so these asserts just print something to stderr but exit status is
108
    // still 0, but hopefully they will be more helpful on non-WNT platforms
109
0
    assert(mbTerminate);
110
0
    assert(maTasks.empty());
111
0
    assert(mnBusyWorkers == 0);
112
0
}
113
114
namespace {
115
116
std::shared_ptr< ThreadPool >& GetStaticThreadPool()
117
169k
{
118
169k
    static std::shared_ptr< ThreadPool > POOL =
119
169k
    []()
120
169k
    {
121
22
        const std::size_t nThreads = ThreadPool::getPreferredConcurrency();
122
22
        return std::make_shared< ThreadPool >( nThreads );
123
22
    }();
124
169k
    return POOL;
125
169k
}
126
127
}
128
129
ThreadPool& ThreadPool::getSharedOptimalPool()
130
169k
{
131
169k
    return *GetStaticThreadPool();
132
169k
}
133
134
std::size_t ThreadPool::getPreferredConcurrency()
135
22
{
136
22
    static std::size_t ThreadCount = []()
137
22
    {
138
22
        const std::size_t nHardThreads = o3tl::clamp_to_unsigned<std::size_t>(
139
22
            std::max(std::thread::hardware_concurrency(), 1U));
140
22
        std::size_t nThreads = nHardThreads;
141
22
        const char *pEnv = getenv("MAX_CONCURRENCY");
142
22
        if (pEnv != nullptr)
143
0
        {
144
            // Override with user/admin preference.
145
0
            nThreads = o3tl::clamp_to_unsigned<std::size_t>(rtl_str_toInt32(pEnv, 10));
146
0
        }
147
148
22
        nThreads = std::min(nHardThreads, nThreads);
149
22
        return std::max<std::size_t>(nThreads, 1);
150
22
    }();
151
152
22
    return ThreadCount;
153
22
}
154
155
// Used to order shutdown, and to ensure there are no lingering
156
// threads after LibreOfficeKit pre-init.
157
void ThreadPool::shutdown()
158
0
{
159
//    if (mbTerminate)
160
//        return;
161
162
0
    std::unique_lock< std::mutex > aGuard( maMutex );
163
0
    shutdownLocked(aGuard);
164
0
}
165
166
void ThreadPool::shutdownLocked(std::unique_lock<std::mutex>& aGuard)
167
169k
{
168
169k
    if( maWorkers.empty() )
169
165k
    { // no threads at all -> execute the work in-line
170
165k
        std::unique_ptr<ThreadTask> pTask;
171
165k
        while ( ( pTask = popWorkLocked(aGuard, false) ) )
172
0
        {
173
0
            std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
174
0
            pTask->exec();
175
0
            pTag->onTaskWorkerDone();
176
0
        }
177
165k
    }
178
3.88k
    else
179
3.88k
    {
180
3.88k
        while( !maTasks.empty() )
181
0
        {
182
0
            maTasksChanged.wait( aGuard );
183
            // In the (unlikely but possible?) case pushTask() gets called meanwhile,
184
            // its notify_one() call is meant to wake a up a thread and process the task.
185
            // But if this code gets woken up instead, it could lead to a deadlock.
186
            // Pass on the notification.
187
0
            maTasksChanged.notify_one();
188
0
        }
189
3.88k
    }
190
169k
    assert( maTasks.empty() );
191
192
169k
    mbTerminate = true;
193
194
169k
    maTasksChanged.notify_all();
195
196
169k
    decltype(maWorkers) aWorkers;
197
169k
    std::swap(maWorkers, aWorkers);
198
169k
    aGuard.unlock();
199
200
180k
    while (!aWorkers.empty())
201
10.6k
    {
202
10.6k
        rtl::Reference<ThreadWorker> xWorker = aWorkers.back();
203
10.6k
        aWorkers.pop_back();
204
10.6k
        assert(std::find(aWorkers.begin(), aWorkers.end(), xWorker)
205
10.6k
                == aWorkers.end());
206
10.6k
        {
207
10.6k
            xWorker->join();
208
10.6k
            xWorker.clear();
209
10.6k
        }
210
10.6k
    }
211
169k
}
212
213
void ThreadPool::pushTask( std::unique_ptr<ThreadTask> pTask )
214
17.1k
{
215
17.1k
    std::scoped_lock< std::mutex > aGuard( maMutex );
216
217
17.1k
    mbTerminate = false;
218
219
    // Worked on tasks are already removed from maTasks, so include the count of busy workers.
220
17.1k
    if (maWorkers.size() < mnMaxWorkers && maWorkers.size() <= maTasks.size() + mnBusyWorkers)
221
10.6k
    {
222
10.6k
        maWorkers.push_back( new ThreadWorker( this ) );
223
10.6k
        maWorkers.back()->launch();
224
10.6k
    }
225
226
17.1k
    pTask->mpTag->onTaskPushed();
227
17.1k
    maTasks.insert( maTasks.begin(), std::move(pTask) );
228
229
17.1k
    maTasksChanged.notify_one();
230
17.1k
}
231
232
std::unique_ptr<ThreadTask> ThreadPool::popWorkLocked( std::unique_lock< std::mutex > & rGuard, bool bWait )
233
193k
{
234
193k
    do
235
200k
    {
236
200k
        if( !maTasks.empty() )
237
17.1k
        {
238
17.1k
            std::unique_ptr<ThreadTask> pTask = std::move(maTasks.back());
239
17.1k
            maTasks.pop_back();
240
17.1k
            return pTask;
241
17.1k
        }
242
183k
        else if (!bWait || mbTerminate)
243
165k
            return nullptr;
244
245
17.1k
        maTasksChanged.wait( rGuard );
246
247
17.1k
    } while (!mbTerminate);
248
249
10.6k
    return nullptr;
250
193k
}
251
252
void ThreadPool::incBusyWorker()
253
17.1k
{
254
17.1k
    ++mnBusyWorkers;
255
17.1k
}
256
257
void ThreadPool::decBusyWorker()
258
17.1k
{
259
17.1k
    assert(mnBusyWorkers >= 1);
260
17.1k
    --mnBusyWorkers;
261
17.1k
}
262
263
void ThreadPool::waitUntilDone(const std::shared_ptr<ThreadTaskTag>& rTag, bool bJoin)
264
4.14k
{
265
#if defined DBG_UTIL && (defined LINUX || defined _WIN32)
266
    assert(!gbIsWorkerThread && "cannot wait for tasks from inside a task");
267
#endif
268
4.14k
    {
269
4.14k
        std::unique_lock< std::mutex > aGuard( maMutex );
270
271
4.14k
        if( maWorkers.empty() )
272
60
        { // no threads at all -> execute the work in-line
273
60
            while (!rTag->isDone())
274
0
            {
275
0
                std::unique_ptr<ThreadTask> pTask = popWorkLocked(aGuard, false);
276
0
                if (!pTask)
277
0
                    break;
278
0
                std::shared_ptr<ThreadTaskTag> pTag(pTask->mpTag);
279
0
                pTask->exec();
280
0
                pTag->onTaskWorkerDone();
281
0
            }
282
60
        }
283
4.14k
    }
284
285
4.14k
    rTag->waitUntilDone();
286
287
4.14k
    if (bJoin)
288
3.88k
        joinThreadsIfIdle();
289
4.14k
}
290
291
bool ThreadPool::joinThreadsIfIdle()
292
169k
{
293
169k
    std::unique_lock< std::mutex > aGuard( maMutex );
294
169k
    if (isIdle()) // check if there are still tasks from another tag
295
169k
    {
296
169k
        shutdownLocked(aGuard);
297
169k
        return true;
298
169k
    }
299
0
    return false;
300
169k
}
301
302
std::shared_ptr<ThreadTaskTag> ThreadPool::createThreadTaskTag()
303
4.14k
{
304
4.14k
    return std::make_shared<ThreadTaskTag>();
305
4.14k
}
306
307
bool ThreadPool::isTaskTagDone(const std::shared_ptr<ThreadTaskTag>& pTag)
308
0
{
309
0
    return pTag->isDone();
310
0
}
311
312
ThreadTask::ThreadTask(std::shared_ptr<ThreadTaskTag> xTag)
313
17.1k
    : mpTag(std::move(xTag))
314
17.1k
{
315
17.1k
}
316
317
void ThreadTask::exec()
318
17.0k
{
319
17.0k
    try {
320
17.0k
        doWork();
321
17.0k
    }
322
17.0k
    catch (const std::exception &e)
323
17.0k
    {
324
0
        SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e.what());
325
0
    }
326
17.0k
    catch (const css::uno::Exception &e)
327
17.0k
    {
328
0
        SAL_WARN("comphelper", "exception in thread worker while calling doWork(): " << e);
329
0
    }
330
17.0k
    catch (...)
331
17.0k
    {
332
0
        SAL_WARN("comphelper", "unknown exception in thread worker while calling doWork()");
333
0
    }
334
17.0k
}
335
336
4.14k
ThreadTaskTag::ThreadTaskTag() : mnTasksWorking(0)
337
4.14k
{
338
4.14k
}
339
340
void ThreadTaskTag::onTaskPushed()
341
17.1k
{
342
17.1k
    std::scoped_lock< std::mutex > aGuard( maMutex );
343
17.1k
    mnTasksWorking++;
344
17.1k
    assert( mnTasksWorking < 65536 ); // sanity checking
345
17.1k
}
346
347
void ThreadTaskTag::onTaskWorkerDone()
348
17.1k
{
349
17.1k
    std::scoped_lock< std::mutex > aGuard( maMutex );
350
17.1k
    mnTasksWorking--;
351
17.1k
    assert(mnTasksWorking >= 0);
352
17.1k
    if (mnTasksWorking == 0)
353
4.08k
        maTasksComplete.notify_all();
354
17.1k
}
355
356
bool ThreadTaskTag::isDone()
357
60
{
358
60
    std::scoped_lock< std::mutex > aGuard( maMutex );
359
60
    return mnTasksWorking == 0;
360
60
}
361
362
void ThreadTaskTag::waitUntilDone()
363
4.14k
{
364
4.14k
    std::unique_lock< std::mutex > aGuard( maMutex );
365
4.42k
    while( mnTasksWorking > 0 )
366
280
    {
367
#if defined DBG_UTIL && !defined NDEBUG
368
        // 10 minute timeout in debug mode, unless the code is built with
369
        // sanitizers or debugged in valgrind or gdb, in which case the threads
370
        // should not time out in the middle of a debugging session
371
        int maxTimeout = 10 * 60;
372
#if !ENABLE_RUNTIME_OPTIMIZATIONS
373
        maxTimeout = 30 * 60;
374
#endif
375
#if defined HAVE_VALGRIND_HEADERS
376
        if( RUNNING_ON_VALGRIND )
377
            maxTimeout = 30 * 60;
378
#endif
379
        if( isDebuggerAttached())
380
            maxTimeout = 300 * 60;
381
        std::cv_status result = maTasksComplete.wait_for(
382
            aGuard, std::chrono::seconds( maxTimeout ));
383
        assert(result != std::cv_status::timeout);
384
#else
385
        // 10 minute timeout in production so the app eventually throws some kind of error
386
280
        if (maTasksComplete.wait_for(
387
280
                aGuard, std::chrono::seconds( 10 * 60 )) == std::cv_status::timeout)
388
0
            throw std::runtime_error("timeout waiting for threadpool tasks");
389
280
#endif
390
280
    }
391
4.14k
}
392
393
} // namespace comphelper
394
395
/* vim:set shiftwidth=4 softtabstop=4 expandtab: */