Coverage Report

Created: 2026-06-16 07:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/vvdec/source/Lib/Utilities/ThreadPool.cpp
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2018-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVdeC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
#include "ThreadPool.h"
44
45
#include <chrono>
46
47
#if __linux
48
# include <pthread.h>
49
#endif
50
51
52
namespace vvdec
53
{
54
using namespace std::chrono_literals;
55
56
std::mutex Barrier::s_exceptionLock{};
57
58
// block threads after busy-waiting this long
59
254
const static auto VVDEC_BUSY_WAIT_TIME_MIN = [] {
60
254
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MIN" );
61
254
  if( env )
62
0
    return std::chrono::microseconds( int( atof( env ) * 1000 ) );
63
254
  return std::chrono::microseconds( 1ms );
64
254
}();
65
66
// block last waiting thread after this time
67
254
const static auto VVDEC_BUSY_WAIT_TIME_MAX = [] {
68
254
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MAX" );
69
254
  if( env )
70
0
    return atoi( env ) * 1ms;
71
254
  return 5ms;
72
254
}();
73
74
75
struct ThreadPool::TaskException : public std::exception
76
{
77
  explicit TaskException( std::exception_ptr e, ThreadPool::Slot& task )
78
10.3k
    : m_originalException( e )
79
10.3k
    , m_task( task )
80
10.3k
  {}
81
  std::exception_ptr m_originalException;
82
  ThreadPool::Slot&  m_task;
83
};
84
85
class ScopeIncDecCounter
86
{
87
  std::atomic_uint& m_cntr;
88
89
public:
90
23.7k
  explicit ScopeIncDecCounter( std::atomic_uint& counter ): m_cntr( counter ) { m_cntr.fetch_add( 1, std::memory_order_relaxed ); }
91
23.7k
  ~ScopeIncDecCounter()                                                       { m_cntr.fetch_sub( 1, std::memory_order_relaxed ); }
92
  CLASS_COPY_MOVE_DELETE( ScopeIncDecCounter )
93
};
94
95
// ---------------------------------------------------------------------------
96
// Thread Pool
97
// ---------------------------------------------------------------------------
98
99
ThreadPool::ThreadPool( int numThreads, const char* threadPoolName )
100
935
  : m_poolName( threadPoolName )
101
935
  , m_threads( numThreads < 0 ? std::thread::hardware_concurrency() : numThreads )
102
935
  , m_poolPause( m_threads.size() )
103
935
{
104
935
  int tid = 0;
105
935
  for( auto& t: m_threads )
106
29.9k
  {
107
29.9k
    t = std::thread( &ThreadPool::threadProc, this, tid++ );
108
29.9k
  }
109
935
}
110
111
ThreadPool::~ThreadPool()
112
935
{
113
935
  m_exitThreads = true;
114
115
935
  waitForThreads();
116
935
}
117
118
bool ThreadPool::processTasksOnMainThread()
119
0
{
120
0
  CHECK_FATAL( m_threads.size() != 0, "should not be used with multiple threads" );
121
122
0
  TaskIterator taskIt;
123
0
  while( true )
124
0
  {
125
0
    try
126
0
    {
127
0
      if( !taskIt.isValid() )
128
0
      {
129
0
        taskIt = findNextTask( 0, m_tasks.begin() );
130
0
      }
131
0
      else
132
0
      {
133
0
        taskIt = findNextTask( 0, taskIt );
134
0
      }
135
136
0
      if( !taskIt.isValid() )
137
0
      {
138
0
        break;
139
0
      }
140
0
      processTask( 0, *taskIt );
141
0
    }
142
0
    catch( TaskException& e )
143
0
    {
144
0
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
145
0
    }
146
0
  }
147
148
  // return true if all done (-> false if some tasks blocked due to barriers)
149
0
  return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } );
150
0
}
151
152
void ThreadPool::shutdown( bool block )
153
935
{
154
935
  m_exitThreads = true;
155
935
  if( block )
156
935
  {
157
935
    waitForThreads();
158
935
  }
159
935
}
160
161
void ThreadPool::waitForThreads()
162
1.87k
{
163
1.87k
  m_poolPause.unpauseIfPaused( m_poolPause.acquireLock() );
164
165
1.87k
  for( auto& t: m_threads )
166
59.8k
  {
167
59.8k
    if( t.joinable() )
168
29.9k
      t.join();
169
59.8k
  }
170
1.87k
}
171
172
void ThreadPool::checkAndThrowThreadPoolException()
173
10.3k
{
174
10.3k
  if( !m_exceptionFlag.load() )
175
10.3k
  {
176
10.3k
    return;
177
10.3k
  }
178
179
0
  msg( WARNING, "ThreadPool is in exception state." );
180
181
0
  std::exception_ptr tmp = m_threadPoolException;
182
0
  m_threadPoolException  = tmp;
183
0
  m_exceptionFlag.store( false );
184
185
0
  std::rethrow_exception( tmp );
186
0
}
187
188
void ThreadPool::threadProc( int threadId )
189
29.9k
{
190
29.9k
#if __linux
191
29.9k
  if( !m_poolName.empty() )
192
29.9k
  {
193
29.9k
    std::string threadName( m_poolName + std::to_string( threadId ) );
194
29.9k
    pthread_setname_np( pthread_self(), threadName.c_str() );
195
29.9k
  }
196
29.9k
#endif
197
198
29.9k
  auto nextTaskIt = m_tasks.begin();
199
44.4k
  while( !m_exitThreads )
200
43.9k
  {
201
43.9k
    try
202
43.9k
    {
203
43.9k
      auto taskIt = findNextTask( threadId, nextTaskIt );
204
43.9k
      if( !taskIt.isValid() )   // immediately try again without any delay
205
38.1k
      {
206
38.1k
        taskIt = findNextTask( threadId, nextTaskIt );
207
38.1k
      }
208
43.9k
      if( !taskIt.isValid() )   // still nothing found, go into idle loop searching for more tasks
209
37.3k
      {
210
37.3k
        ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait );
211
212
37.3k
        std::unique_lock<std::mutex> idleLock( m_idleMutex, std::defer_lock );
213
214
37.3k
        const auto startWait = std::chrono::steady_clock::now();
215
37.3k
        bool       didBlock  = false;   // if the previous iteration did block we don't want to yield in this iteration
216
714k
        while( !m_exitThreads )
217
688k
        {
218
688k
          if( !didBlock )
219
684k
          {
220
684k
            std::this_thread::yield();
221
684k
          }
222
688k
          didBlock = false;
223
224
688k
          taskIt = findNextTask( threadId, nextTaskIt );
225
688k
          if( taskIt.isValid() || m_exitThreads.load( std::memory_order_relaxed ) )
226
11.2k
          {
227
11.2k
            break;
228
11.2k
          }
229
230
676k
          if( !idleLock.owns_lock() )
231
285k
          {
232
285k
            if( VVDEC_BUSY_WAIT_TIME_MIN.count() == 0 || std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MIN )
233
23.7k
            {
234
23.7k
              ITT_TASKSTART( itt_domain_thrd, itt_handle_TPblocked );
235
23.7k
              ScopeIncDecCounter cntr( m_poolPause.m_waitingForLockThreads );
236
23.7k
              idleLock.lock();
237
23.7k
              didBlock = true;
238
23.7k
              ITT_TASKEND( itt_domain_thrd, itt_handle_TPblocked );
239
23.7k
            }
240
285k
          }
241
391k
          else if( std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MAX )
242
177k
          {
243
#if THREAD_POOL_TASK_NAMES
244
            printWaitingTasks();
245
#endif
246
177k
            didBlock = m_poolPause.pauseIfAllOtherThreadsWaiting(
247
177k
              [&]
248
177k
              {
249
9
                taskIt = findNextTask( threadId, nextTaskIt );
250
9
                return taskIt.isValid() || m_exitThreads;
251
9
              } );
252
177k
            if( taskIt.isValid() )
253
0
            {
254
0
              break;
255
0
            }
256
177k
          }
257
676k
        }
258
259
37.3k
        ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait );
260
37.3k
      }
261
43.9k
      if( m_exitThreads )
262
29.2k
      {
263
29.2k
        return;
264
29.2k
      }
265
266
14.6k
      processTask( threadId, *taskIt );
267
268
14.6k
      nextTaskIt = taskIt;
269
14.6k
      nextTaskIt.incWrap();
270
14.6k
    }
271
43.9k
    catch( TaskException& e )
272
43.9k
    {
273
10.3k
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
274
10.3k
    }
275
43.9k
    catch( std::exception& e )
276
43.9k
    {
277
0
      msg( ERROR, "ERROR: Caught unexpected exception from within the thread pool: %s", e.what() );
278
279
0
      if( m_exceptionFlag.exchange( true ) )
280
0
      {
281
0
        msg( ERROR, "ERROR: Another exception has already happend in the thread pool, but we can only store one." );
282
0
        return;
283
0
      }
284
0
      m_threadPoolException = std::current_exception();
285
0
      return;
286
0
    }
287
43.9k
  }
288
29.9k
}
289
290
bool ThreadPool::checkTaskReady( int threadId, CBarrierVec& barriers, ThreadPool::TaskFunc readyCheck, void* taskParam )
291
5.21M
{
292
5.21M
  if( !barriers.empty() )
293
4.57M
  {
294
    // don't break early, because isBlocked() also checks exception state
295
4.57M
    if( std::count_if( barriers.cbegin(), barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) )
296
4.56M
    {
297
4.56M
      return false;
298
4.56M
    }
299
13.1k
    barriers.clear();
300
13.1k
  }
301
302
648k
  if( readyCheck && readyCheck( threadId, taskParam ) == false )
303
633k
  {
304
633k
    return false;
305
633k
  }
306
307
15.2k
  return true;
308
648k
}
309
310
ThreadPool::TaskIterator ThreadPool::findNextTask( int threadId, TaskIterator startSearch )
311
770k
{
312
770k
  if( !startSearch.isValid() )
313
0
  {
314
0
    startSearch = m_tasks.begin();
315
0
  }
316
770k
  bool first = true;
317
76.8M
  for( auto it = startSearch; it != startSearch || first; it.incWrap() )
318
76.1M
  {
319
76.1M
    first = false;
320
76.1M
    try
321
76.1M
    {
322
76.1M
      Slot& task     = *it;
323
76.1M
      auto  expected = WAITING;
324
76.1M
      if( task.state == expected && task.state.compare_exchange_strong( expected, RUNNING ) )
325
5.21M
      {
326
5.21M
        if( checkTaskReady( threadId, task.barriers, task.readyCheck, task.param ) )
327
5.27k
        {
328
5.27k
          return it;
329
5.27k
        }
330
331
        // reschedule
332
5.21M
        task.state = WAITING;
333
5.21M
      }
334
76.1M
    }
335
76.1M
    catch( ... )
336
76.1M
    {
337
9.27k
      throw TaskException( std::current_exception(), *it );
338
9.27k
    }
339
76.1M
  }
340
654k
  return {};
341
770k
}
342
343
bool ThreadPool::processTask( int threadId, ThreadPool::Slot& task )
344
5.27k
{
345
5.27k
  try
346
5.27k
  {
347
5.27k
    const bool success = task.func( threadId, task.param );
348
5.27k
    if( !success )
349
4.16k
    {
350
4.16k
      task.state = WAITING;
351
4.16k
      return false;
352
4.16k
    }
353
354
1.10k
    if( task.done != nullptr )
355
6
    {
356
6
      task.done->unlock();
357
6
    }
358
1.10k
    if( task.counter != nullptr )
359
24
    {
360
24
      --(*task.counter);
361
24
    }
362
1.10k
  }
363
5.27k
  catch( ... )
364
5.27k
  {
365
1.08k
    throw TaskException( std::current_exception(), task );
366
1.08k
  }
367
368
24
  task.state = FREE;
369
370
24
  return true;
371
5.27k
}
372
373
bool ThreadPool::bypassTaskQueue( TaskFunc func, void* param, WaitCounter* counter, Barrier* done, CBarrierVec& barriers, TaskFunc readyCheck )
374
0
{
375
0
  CHECKD( numThreads() > 0, "the task queue should only be bypassed, when running single-threaded." );
376
0
  try
377
0
  {
378
    // if singlethreaded, execute all pending tasks
379
0
    bool waiting_tasks = m_nextFillSlot != m_tasks.begin();
380
0
    bool is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
381
0
    if( !is_ready && waiting_tasks )
382
0
    {
383
0
      waiting_tasks = processTasksOnMainThread();
384
0
      is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
385
0
    }
386
387
    // when no barriers block this task, execute it directly
388
0
    if( is_ready )
389
0
    {
390
0
      if( func( 0, param ) )
391
0
      {
392
0
        if( done != nullptr )
393
0
        {
394
0
          done->unlock();
395
0
        }
396
397
0
        if( waiting_tasks )
398
0
        {
399
0
          processTasksOnMainThread();
400
0
        }
401
0
        return true;
402
0
      }
403
0
    }
404
0
  }
405
0
  catch( ... )
406
0
  {
407
0
    handleTaskException( std::current_exception(), done, counter, nullptr );
408
0
  }
409
410
  // direct execution of the task failed
411
0
  return false;
412
0
}
413
414
void ThreadPool::handleTaskException( const std::exception_ptr e, Barrier* done, WaitCounter* counter, std::atomic<TaskState>* slot_state )
415
10.3k
{
416
10.3k
  if( done != nullptr )
417
1.32k
  {
418
1.32k
    done->setException( e );
419
1.32k
  }
420
10.3k
  if( counter != nullptr )
421
10.3k
  {
422
10.3k
    counter->setException( e );
423
    // Barrier::unlock() in the decrement operator throws, when the counter reaches zero, so we catch it here
424
10.3k
    try
425
10.3k
    {
426
10.3k
      --( *counter );
427
10.3k
    }
428
10.3k
    catch( ... )
429
10.3k
    {
430
1.33k
    }
431
10.3k
  }
432
433
10.3k
  if( slot_state != nullptr )
434
10.3k
  {
435
10.3k
    *slot_state = FREE;
436
10.3k
  }
437
10.3k
}
438
439
#if THREAD_POOL_TASK_NAMES
440
void ThreadPool::printWaitingTasks()
441
{
442
  std::cerr << "Waiting tasks:" << std::endl;
443
  int count = 0;
444
  for( auto& t: m_tasks )
445
  {
446
    if( t.state == WAITING )
447
    {
448
      ++count;
449
      std::cerr << t.taskName << std::endl;
450
    }
451
  }
452
  std::cerr << std::endl << count << " total tasks waiting" << std::endl;
453
}
454
#endif  // THREAD_POOL_TASK_NAMES
455
456
// ---------------------------------------------------------------------------
457
// Chunked Task Queue
458
// ---------------------------------------------------------------------------
459
460
ThreadPool::ChunkedTaskQueue::~ChunkedTaskQueue()
461
935
{
462
935
  Chunk* next = m_firstChunk.m_next;
463
935
  while( next )
464
0
  {
465
0
    Chunk* curr = next;
466
0
    next = curr->m_next;
467
0
    delete curr;
468
0
  }
469
935
}
470
471
ThreadPool::ChunkedTaskQueue::Iterator ThreadPool::ChunkedTaskQueue::grow()
472
0
{
473
0
  std::lock_guard<std::mutex> l( m_resizeMutex );   // prevent concurrent growth of the queue. Read access while growing is no problem
474
475
0
  m_lastChunk->m_next = new Chunk( &m_firstChunk );
476
0
  m_lastChunk         = m_lastChunk->m_next;
477
478
0
  return Iterator{ &m_lastChunk->m_slots.front(), m_lastChunk };
479
0
}
480
481
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::operator++()
482
0
{
483
0
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
484
0
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
485
486
0
  if( m_slot != &m_chunk->m_slots.back() )
487
0
  {
488
0
    ++m_slot;
489
0
  }
490
0
  else
491
0
  {
492
0
    m_chunk = m_chunk->m_next;
493
0
    if( m_chunk )
494
0
    {
495
0
      m_slot = &m_chunk->m_slots.front();
496
0
    }
497
0
    else
498
0
    {
499
0
      m_slot = nullptr;
500
0
    }
501
0
  }
502
0
  return *this;
503
0
}
504
505
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::incWrap()
506
75.9M
{
507
75.9M
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
508
75.9M
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
509
510
75.9M
  if( m_slot != &m_chunk->m_slots.back() )
511
75.8M
  {
512
75.8M
    ++m_slot;
513
75.8M
  }
514
82.2k
  else
515
82.2k
  {
516
82.2k
    if( m_chunk->m_next )
517
0
    {
518
0
      m_chunk = m_chunk->m_next;
519
0
    }
520
82.2k
    else
521
82.2k
    {
522
82.2k
      m_chunk = &m_chunk->m_firstChunk;
523
82.2k
    }
524
82.2k
    m_slot = &m_chunk->m_slots.front();
525
82.2k
  }
526
75.9M
  return *this;
527
75.9M
}
528
529
void ThreadPool::PoolPause::unpauseIfPaused( std::unique_lock<std::mutex> lockOwnership )
530
13.1k
{
531
13.1k
  CHECKD( lockOwnership.mutex() != &m_allThreadsWaitingMutex, "wrong mutex passed into ThreadPool::PoolPause::unpauseIfPaused()" );
532
13.1k
  CHECKD( !lockOwnership.owns_lock(), "lock passed into ThreadPool::PoolPause::unpauseIfPaused() does not own lock" );
533
  // All threads may be sleeping. If so, wake up.
534
13.1k
  m_allThreadsWaiting = false;
535
13.1k
  m_allThreadsWaitingCV.notify_all();
536
13.1k
}
537
538
template<typename Predicate>
539
bool ThreadPool::PoolPause::pauseIfAllOtherThreadsWaiting( Predicate predicate )
540
177k
{
541
177k
  if( m_nrThreads == 0 )
542
0
  {
543
0
    return false;
544
0
  }
545
177k
  const auto nrWaiting = m_waitingForLockThreads.load( std::memory_order_relaxed );
546
177k
  if( nrWaiting < m_nrThreads - 1 )
547
177k
  {
548
177k
    return false;
549
177k
  }
550
551
  // All threads are waiting. This (current) threads is the one which locked `idleLock`. All
552
  // other threads are waiting in the above condition for `idleLock.lock();`.
553
  // The only way how more work for the threads can come in is if addBarrierTask is called
554
  // or if the thread pool is closed or destroyed.
555
9
  std::unique_lock<std::mutex> lock( m_allThreadsWaitingMutex );
556
9
  m_allThreadsWaiting = true;
557
18
  m_allThreadsWaitingCV.wait( lock, [this, &predicate] { return !m_allThreadsWaiting || predicate(); } );
558
9
  return true;
559
177k
}
560
561
}   // namespace vvdec