Coverage Report

Created: 2026-06-15 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/vvdec/source/Lib/Utilities/ThreadPool.cpp
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2018-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVdeC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
#include "ThreadPool.h"
44
45
#include <chrono>
46
47
#if __linux
48
# include <pthread.h>
49
#endif
50
51
52
namespace vvdec
53
{
54
using namespace std::chrono_literals;
55
56
std::mutex Barrier::s_exceptionLock{};
57
58
// block threads after busy-waiting this long
59
11
const static auto VVDEC_BUSY_WAIT_TIME_MIN = [] {
60
11
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MIN" );
61
11
  if( env )
62
0
    return std::chrono::microseconds( int( atof( env ) * 1000 ) );
63
11
  return std::chrono::microseconds( 1ms );
64
11
}();
65
66
// block last waiting thread after this time
67
11
const static auto VVDEC_BUSY_WAIT_TIME_MAX = [] {
68
11
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MAX" );
69
11
  if( env )
70
0
    return atoi( env ) * 1ms;
71
11
  return 5ms;
72
11
}();
73
74
75
struct ThreadPool::TaskException : public std::exception
76
{
77
  explicit TaskException( std::exception_ptr e, ThreadPool::Slot& task )
78
0
    : m_originalException( e )
79
0
    , m_task( task )
80
0
  {}
81
  std::exception_ptr m_originalException;
82
  ThreadPool::Slot&  m_task;
83
};
84
85
class ScopeIncDecCounter
86
{
87
  std::atomic_uint& m_cntr;
88
89
public:
90
2.61k
  explicit ScopeIncDecCounter( std::atomic_uint& counter ): m_cntr( counter ) { m_cntr.fetch_add( 1, std::memory_order_relaxed ); }
91
2.62k
  ~ScopeIncDecCounter()                                                       { m_cntr.fetch_sub( 1, std::memory_order_relaxed ); }
92
  CLASS_COPY_MOVE_DELETE( ScopeIncDecCounter )
93
};
94
95
// ---------------------------------------------------------------------------
96
// Thread Pool
97
// ---------------------------------------------------------------------------
98
99
ThreadPool::ThreadPool( int numThreads, const char* threadPoolName )
100
356
  : m_poolName( threadPoolName )
101
356
  , m_threads( numThreads < 0 ? std::thread::hardware_concurrency() : numThreads )
102
356
  , m_poolPause( m_threads.size() )
103
356
{
104
356
  int tid = 0;
105
356
  for( auto& t: m_threads )
106
11.3k
  {
107
11.3k
    t = std::thread( &ThreadPool::threadProc, this, tid++ );
108
11.3k
  }
109
356
}
110
111
ThreadPool::~ThreadPool()
112
356
{
113
356
  m_exitThreads = true;
114
115
356
  waitForThreads();
116
356
}
117
118
bool ThreadPool::processTasksOnMainThread()
119
0
{
120
0
  CHECK_FATAL( m_threads.size() != 0, "should not be used with multiple threads" );
121
122
0
  TaskIterator taskIt;
123
0
  while( true )
124
0
  {
125
0
    try
126
0
    {
127
0
      if( !taskIt.isValid() )
128
0
      {
129
0
        taskIt = findNextTask( 0, m_tasks.begin() );
130
0
      }
131
0
      else
132
0
      {
133
0
        taskIt = findNextTask( 0, taskIt );
134
0
      }
135
136
0
      if( !taskIt.isValid() )
137
0
      {
138
0
        break;
139
0
      }
140
0
      processTask( 0, *taskIt );
141
0
    }
142
0
    catch( TaskException& e )
143
0
    {
144
0
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
145
0
    }
146
0
  }
147
148
  // return true if all done (-> false if some tasks blocked due to barriers)
149
0
  return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } );
150
0
}
151
152
void ThreadPool::shutdown( bool block )
153
356
{
154
356
  m_exitThreads = true;
155
356
  if( block )
156
356
  {
157
356
    waitForThreads();
158
356
  }
159
356
}
160
161
void ThreadPool::waitForThreads()
162
712
{
163
712
  m_poolPause.unpauseIfPaused( m_poolPause.acquireLock() );
164
165
712
  for( auto& t: m_threads )
166
22.7k
  {
167
22.7k
    if( t.joinable() )
168
11.3k
      t.join();
169
22.7k
  }
170
712
}
171
172
void ThreadPool::checkAndThrowThreadPoolException()
173
0
{
174
0
  if( !m_exceptionFlag.load() )
175
0
  {
176
0
    return;
177
0
  }
178
179
0
  msg( WARNING, "ThreadPool is in exception state." );
180
181
0
  std::exception_ptr tmp = m_threadPoolException;
182
0
  m_threadPoolException  = tmp;
183
0
  m_exceptionFlag.store( false );
184
185
0
  std::rethrow_exception( tmp );
186
0
}
187
188
void ThreadPool::threadProc( int threadId )
189
11.3k
{
190
11.3k
#if __linux
191
11.3k
  if( !m_poolName.empty() )
192
11.3k
  {
193
11.3k
    std::string threadName( m_poolName + std::to_string( threadId ) );
194
11.3k
    pthread_setname_np( pthread_self(), threadName.c_str() );
195
11.3k
  }
196
11.3k
#endif
197
198
11.3k
  auto nextTaskIt = m_tasks.begin();
199
11.3k
  while( !m_exitThreads )
200
10.1k
  {
201
10.1k
    try
202
10.1k
    {
203
10.1k
      auto taskIt = findNextTask( threadId, nextTaskIt );
204
10.1k
      if( !taskIt.isValid() )   // immediately try again without any delay
205
10.1k
      {
206
10.1k
        taskIt = findNextTask( threadId, nextTaskIt );
207
10.1k
      }
208
10.1k
      if( !taskIt.isValid() )   // still nothing found, go into idle loop searching for more tasks
209
10.1k
      {
210
10.1k
        ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait );
211
212
10.1k
        std::unique_lock<std::mutex> idleLock( m_idleMutex, std::defer_lock );
213
214
10.1k
        const auto startWait = std::chrono::steady_clock::now();
215
10.1k
        bool       didBlock  = false;   // if the previous iteration did block we don't want to yield in this iteration
216
46.9k
        while( !m_exitThreads )
217
42.2k
        {
218
42.2k
          if( !didBlock )
219
41.9k
          {
220
41.9k
            std::this_thread::yield();
221
41.9k
          }
222
42.2k
          didBlock = false;
223
224
42.2k
          taskIt = findNextTask( threadId, nextTaskIt );
225
42.2k
          if( taskIt.isValid() || m_exitThreads.load( std::memory_order_relaxed ) )
226
5.49k
          {
227
5.49k
            break;
228
5.49k
          }
229
230
36.7k
          if( !idleLock.owns_lock() )
231
28.2k
          {
232
28.2k
            if( VVDEC_BUSY_WAIT_TIME_MIN.count() == 0 || std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MIN )
233
2.61k
            {
234
2.61k
              ITT_TASKSTART( itt_domain_thrd, itt_handle_TPblocked );
235
2.61k
              ScopeIncDecCounter cntr( m_poolPause.m_waitingForLockThreads );
236
2.61k
              idleLock.lock();
237
2.61k
              didBlock = true;
238
2.61k
              ITT_TASKEND( itt_domain_thrd, itt_handle_TPblocked );
239
2.61k
            }
240
28.2k
          }
241
8.59k
          else if( std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MAX )
242
6
          {
243
#if THREAD_POOL_TASK_NAMES
244
            printWaitingTasks();
245
#endif
246
6
            didBlock = m_poolPause.pauseIfAllOtherThreadsWaiting(
247
6
              [&]
248
6
              {
249
0
                taskIt = findNextTask( threadId, nextTaskIt );
250
0
                return taskIt.isValid() || m_exitThreads;
251
0
              } );
252
6
            if( taskIt.isValid() )
253
0
            {
254
0
              break;
255
0
            }
256
6
          }
257
36.7k
        }
258
259
10.1k
        ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait );
260
10.1k
      }
261
10.1k
      if( m_exitThreads )
262
10.0k
      {
263
10.0k
        return;
264
10.0k
      }
265
266
120
      processTask( threadId, *taskIt );
267
268
120
      nextTaskIt = taskIt;
269
120
      nextTaskIt.incWrap();
270
120
    }
271
10.1k
    catch( TaskException& e )
272
10.1k
    {
273
0
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
274
0
    }
275
10.1k
    catch( std::exception& e )
276
10.1k
    {
277
0
      msg( ERROR, "ERROR: Caught unexpected exception from within the thread pool: %s", e.what() );
278
279
0
      if( m_exceptionFlag.exchange( true ) )
280
0
      {
281
0
        msg( ERROR, "ERROR: Another exception has already happend in the thread pool, but we can only store one." );
282
0
        return;
283
0
      }
284
0
      m_threadPoolException = std::current_exception();
285
0
      return;
286
0
    }
287
10.1k
  }
288
11.3k
}
289
290
bool ThreadPool::checkTaskReady( int threadId, CBarrierVec& barriers, ThreadPool::TaskFunc readyCheck, void* taskParam )
291
0
{
292
0
  if( !barriers.empty() )
293
0
  {
294
    // don't break early, because isBlocked() also checks exception state
295
0
    if( std::count_if( barriers.cbegin(), barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) )
296
0
    {
297
0
      return false;
298
0
    }
299
0
    barriers.clear();
300
0
  }
301
302
0
  if( readyCheck && readyCheck( threadId, taskParam ) == false )
303
0
  {
304
0
    return false;
305
0
  }
306
307
0
  return true;
308
0
}
309
310
ThreadPool::TaskIterator ThreadPool::findNextTask( int threadId, TaskIterator startSearch )
311
62.5k
{
312
62.5k
  if( !startSearch.isValid() )
313
0
  {
314
0
    startSearch = m_tasks.begin();
315
0
  }
316
62.5k
  bool first = true;
317
4.10M
  for( auto it = startSearch; it != startSearch || first; it.incWrap() )
318
4.04M
  {
319
4.04M
    first = false;
320
4.04M
    try
321
4.04M
    {
322
4.04M
      Slot& task     = *it;
323
4.04M
      auto  expected = WAITING;
324
4.04M
      if( task.state == expected && task.state.compare_exchange_strong( expected, RUNNING ) )
325
0
      {
326
0
        if( checkTaskReady( threadId, task.barriers, task.readyCheck, task.param ) )
327
0
        {
328
0
          return it;
329
0
        }
330
331
        // reschedule
332
0
        task.state = WAITING;
333
0
      }
334
4.04M
    }
335
4.04M
    catch( ... )
336
4.04M
    {
337
0
      throw TaskException( std::current_exception(), *it );
338
0
    }
339
4.04M
  }
340
55.9k
  return {};
341
62.5k
}
342
343
bool ThreadPool::processTask( int threadId, ThreadPool::Slot& task )
344
0
{
345
0
  try
346
0
  {
347
0
    const bool success = task.func( threadId, task.param );
348
0
    if( !success )
349
0
    {
350
0
      task.state = WAITING;
351
0
      return false;
352
0
    }
353
354
0
    if( task.done != nullptr )
355
0
    {
356
0
      task.done->unlock();
357
0
    }
358
0
    if( task.counter != nullptr )
359
0
    {
360
0
      --(*task.counter);
361
0
    }
362
0
  }
363
0
  catch( ... )
364
0
  {
365
0
    throw TaskException( std::current_exception(), task );
366
0
  }
367
368
0
  task.state = FREE;
369
370
0
  return true;
371
0
}
372
373
bool ThreadPool::bypassTaskQueue( TaskFunc func, void* param, WaitCounter* counter, Barrier* done, CBarrierVec& barriers, TaskFunc readyCheck )
374
0
{
375
0
  CHECKD( numThreads() > 0, "the task queue should only be bypassed, when running single-threaded." );
376
0
  try
377
0
  {
378
    // if singlethreaded, execute all pending tasks
379
0
    bool waiting_tasks = m_nextFillSlot != m_tasks.begin();
380
0
    bool is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
381
0
    if( !is_ready && waiting_tasks )
382
0
    {
383
0
      waiting_tasks = processTasksOnMainThread();
384
0
      is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
385
0
    }
386
387
    // when no barriers block this task, execute it directly
388
0
    if( is_ready )
389
0
    {
390
0
      if( func( 0, param ) )
391
0
      {
392
0
        if( done != nullptr )
393
0
        {
394
0
          done->unlock();
395
0
        }
396
397
0
        if( waiting_tasks )
398
0
        {
399
0
          processTasksOnMainThread();
400
0
        }
401
0
        return true;
402
0
      }
403
0
    }
404
0
  }
405
0
  catch( ... )
406
0
  {
407
0
    handleTaskException( std::current_exception(), done, counter, nullptr );
408
0
  }
409
410
  // direct execution of the task failed
411
0
  return false;
412
0
}
413
414
void ThreadPool::handleTaskException( const std::exception_ptr e, Barrier* done, WaitCounter* counter, std::atomic<TaskState>* slot_state )
415
0
{
416
0
  if( done != nullptr )
417
0
  {
418
0
    done->setException( e );
419
0
  }
420
0
  if( counter != nullptr )
421
0
  {
422
0
    counter->setException( e );
423
    // Barrier::unlock() in the decrement operator throws, when the counter reaches zero, so we catch it here
424
0
    try
425
0
    {
426
0
      --( *counter );
427
0
    }
428
0
    catch( ... )
429
0
    {
430
0
    }
431
0
  }
432
433
0
  if( slot_state != nullptr )
434
0
  {
435
0
    *slot_state = FREE;
436
0
  }
437
0
}
438
439
#if THREAD_POOL_TASK_NAMES
440
void ThreadPool::printWaitingTasks()
441
{
442
  std::cerr << "Waiting tasks:" << std::endl;
443
  int count = 0;
444
  for( auto& t: m_tasks )
445
  {
446
    if( t.state == WAITING )
447
    {
448
      ++count;
449
      std::cerr << t.taskName << std::endl;
450
    }
451
  }
452
  std::cerr << std::endl << count << " total tasks waiting" << std::endl;
453
}
454
#endif  // THREAD_POOL_TASK_NAMES
455
456
// ---------------------------------------------------------------------------
457
// Chunked Task Queue
458
// ---------------------------------------------------------------------------
459
460
ThreadPool::ChunkedTaskQueue::~ChunkedTaskQueue()
461
356
{
462
356
  Chunk* next = m_firstChunk.m_next;
463
356
  while( next )
464
0
  {
465
0
    Chunk* curr = next;
466
0
    next = curr->m_next;
467
0
    delete curr;
468
0
  }
469
356
}
470
471
ThreadPool::ChunkedTaskQueue::Iterator ThreadPool::ChunkedTaskQueue::grow()
472
0
{
473
0
  std::lock_guard<std::mutex> l( m_resizeMutex );   // prevent concurrent growth of the queue. Read access while growing is no problem
474
475
0
  m_lastChunk->m_next = new Chunk( &m_firstChunk );
476
0
  m_lastChunk         = m_lastChunk->m_next;
477
478
0
  return Iterator{ &m_lastChunk->m_slots.front(), m_lastChunk };
479
0
}
480
481
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::operator++()
482
0
{
483
0
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
484
0
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
485
486
0
  if( m_slot != &m_chunk->m_slots.back() )
487
0
  {
488
0
    ++m_slot;
489
0
  }
490
0
  else
491
0
  {
492
0
    m_chunk = m_chunk->m_next;
493
0
    if( m_chunk )
494
0
    {
495
0
      m_slot = &m_chunk->m_slots.front();
496
0
    }
497
0
    else
498
0
    {
499
0
      m_slot = nullptr;
500
0
    }
501
0
  }
502
0
  return *this;
503
0
}
504
505
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::incWrap()
506
4.04M
{
507
4.04M
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
508
4.04M
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
509
510
4.04M
  if( m_slot != &m_chunk->m_slots.back() )
511
4.01M
  {
512
4.01M
    ++m_slot;
513
4.01M
  }
514
24.4k
  else
515
24.4k
  {
516
24.4k
    if( m_chunk->m_next )
517
0
    {
518
0
      m_chunk = m_chunk->m_next;
519
0
    }
520
24.4k
    else
521
24.4k
    {
522
24.4k
      m_chunk = &m_chunk->m_firstChunk;
523
24.4k
    }
524
24.4k
    m_slot = &m_chunk->m_slots.front();
525
24.4k
  }
526
4.04M
  return *this;
527
4.04M
}
528
529
void ThreadPool::PoolPause::unpauseIfPaused( std::unique_lock<std::mutex> lockOwnership )
530
1.06k
{
531
1.06k
  CHECKD( lockOwnership.mutex() != &m_allThreadsWaitingMutex, "wrong mutex passed into ThreadPool::PoolPause::unpauseIfPaused()" );
532
1.06k
  CHECKD( !lockOwnership.owns_lock(), "lock passed into ThreadPool::PoolPause::unpauseIfPaused() does not own lock" );
533
  // All threads may be sleeping. If so, wake up.
534
1.06k
  m_allThreadsWaiting = false;
535
1.06k
  m_allThreadsWaitingCV.notify_all();
536
1.06k
}
537
538
template<typename Predicate>
539
bool ThreadPool::PoolPause::pauseIfAllOtherThreadsWaiting( Predicate predicate )
540
6
{
541
6
  if( m_nrThreads == 0 )
542
0
  {
543
0
    return false;
544
0
  }
545
6
  const auto nrWaiting = m_waitingForLockThreads.load( std::memory_order_relaxed );
546
6
  if( nrWaiting < m_nrThreads - 1 )
547
6
  {
548
6
    return false;
549
6
  }
550
551
  // All threads are waiting. This (current) threads is the one which locked `idleLock`. All
552
  // other threads are waiting in the above condition for `idleLock.lock();`.
553
  // The only way how more work for the threads can come in is if addBarrierTask is called
554
  // or if the thread pool is closed or destroyed.
555
0
  std::unique_lock<std::mutex> lock( m_allThreadsWaitingMutex );
556
0
  m_allThreadsWaiting = true;
557
0
  m_allThreadsWaitingCV.wait( lock, [this, &predicate] { return !m_allThreadsWaiting || predicate(); } );
558
0
  return true;
559
6
}
560
561
}   // namespace vvdec