Coverage Report

Created: 2026-04-01 07:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/vvdec/source/Lib/Utilities/ThreadPool.cpp
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2018-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVdeC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
#include "ThreadPool.h"
44
45
#include <chrono>
46
47
#if __linux
48
# include <pthread.h>
49
#endif
50
51
52
namespace vvdec
53
{
54
using namespace std::chrono_literals;
55
56
std::mutex Barrier::s_exceptionLock{};
57
58
// block threads after busy-waiting this long
59
256
const static auto VVDEC_BUSY_WAIT_TIME_MIN = [] {
60
256
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MIN" );
61
256
  if( env )
62
0
    return std::chrono::microseconds( int( atof( env ) * 1000 ) );
63
256
  return std::chrono::microseconds( 1ms );
64
256
}();
65
66
// block last waiting thread after this time
67
256
const static auto VVDEC_BUSY_WAIT_TIME_MAX = [] {
68
256
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MAX" );
69
256
  if( env )
70
0
    return atoi( env ) * 1ms;
71
256
  return 5ms;
72
256
}();
73
74
75
struct ThreadPool::TaskException : public std::exception
76
{
77
  explicit TaskException( std::exception_ptr e, ThreadPool::Slot& task )
78
0
    : m_originalException( e )
79
0
    , m_task( task )
80
0
  {}
81
  std::exception_ptr m_originalException;
82
  ThreadPool::Slot&  m_task;
83
};
84
85
class ScopeIncDecCounter
86
{
87
  std::atomic_uint& m_cntr;
88
89
public:
90
0
  explicit ScopeIncDecCounter( std::atomic_uint& counter ): m_cntr( counter ) { m_cntr.fetch_add( 1, std::memory_order_relaxed ); }
91
0
  ~ScopeIncDecCounter()                                                       { m_cntr.fetch_sub( 1, std::memory_order_relaxed ); }
92
  CLASS_COPY_MOVE_DELETE( ScopeIncDecCounter )
93
};
94
95
// ---------------------------------------------------------------------------
96
// Thread Pool
97
// ---------------------------------------------------------------------------
98
99
ThreadPool::ThreadPool( int numThreads, const char* threadPoolName )
100
0
  : m_poolName( threadPoolName )
101
0
  , m_threads( numThreads < 0 ? std::thread::hardware_concurrency() : numThreads )
102
0
  , m_poolPause( m_threads.size() )
103
0
{
104
0
  int tid = 0;
105
0
  for( auto& t: m_threads )
106
0
  {
107
0
    t = std::thread( &ThreadPool::threadProc, this, tid++ );
108
0
  }
109
0
}
110
111
ThreadPool::~ThreadPool()
112
0
{
113
0
  m_exitThreads = true;
114
115
0
  waitForThreads();
116
0
}
117
118
bool ThreadPool::processTasksOnMainThread()
119
0
{
120
0
  CHECK_FATAL( m_threads.size() != 0, "should not be used with multiple threads" );
121
122
0
  TaskIterator taskIt;
123
0
  while( true )
124
0
  {
125
0
    try
126
0
    {
127
0
      if( !taskIt.isValid() )
128
0
      {
129
0
        taskIt = findNextTask( 0, m_tasks.begin() );
130
0
      }
131
0
      else
132
0
      {
133
0
        taskIt = findNextTask( 0, taskIt );
134
0
      }
135
136
0
      if( !taskIt.isValid() )
137
0
      {
138
0
        break;
139
0
      }
140
0
      processTask( 0, *taskIt );
141
0
    }
142
0
    catch( TaskException& e )
143
0
    {
144
0
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
145
0
    }
146
0
  }
147
148
  // return true if all done (-> false if some tasks blocked due to barriers)
149
0
  return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } );
150
0
}
151
152
void ThreadPool::shutdown( bool block )
153
0
{
154
0
  m_exitThreads = true;
155
0
  if( block )
156
0
  {
157
0
    waitForThreads();
158
0
  }
159
0
}
160
161
void ThreadPool::waitForThreads()
162
0
{
163
0
  m_poolPause.unpauseIfPaused( m_poolPause.acquireLock() );
164
165
0
  for( auto& t: m_threads )
166
0
  {
167
0
    if( t.joinable() )
168
0
      t.join();
169
0
  }
170
0
}
171
172
void ThreadPool::checkAndThrowThreadPoolException()
173
0
{
174
0
  if( !m_exceptionFlag.load() )
175
0
  {
176
0
    return;
177
0
  }
178
179
0
  msg( WARNING, "ThreadPool is in exception state." );
180
181
0
  std::exception_ptr tmp = m_threadPoolException;
182
0
  m_threadPoolException  = tmp;
183
0
  m_exceptionFlag.store( false );
184
185
0
  std::rethrow_exception( tmp );
186
0
}
187
188
void ThreadPool::threadProc( int threadId )
189
0
{
190
0
#if __linux
191
0
  if( !m_poolName.empty() )
192
0
  {
193
0
    std::string threadName( m_poolName + std::to_string( threadId ) );
194
0
    pthread_setname_np( pthread_self(), threadName.c_str() );
195
0
  }
196
0
#endif
197
198
0
  auto nextTaskIt = m_tasks.begin();
199
0
  while( !m_exitThreads )
200
0
  {
201
0
    try
202
0
    {
203
0
      auto taskIt = findNextTask( threadId, nextTaskIt );
204
0
      if( !taskIt.isValid() )   // immediately try again without any delay
205
0
      {
206
0
        taskIt = findNextTask( threadId, nextTaskIt );
207
0
      }
208
0
      if( !taskIt.isValid() )   // still nothing found, go into idle loop searching for more tasks
209
0
      {
210
0
        ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait );
211
212
0
        std::unique_lock<std::mutex> idleLock( m_idleMutex, std::defer_lock );
213
214
0
        const auto startWait = std::chrono::steady_clock::now();
215
0
        bool       didBlock  = false;   // if the previous iteration did block we don't want to yield in this iteration
216
0
        while( !m_exitThreads )
217
0
        {
218
0
          if( !didBlock )
219
0
          {
220
0
            std::this_thread::yield();
221
0
          }
222
0
          didBlock = false;
223
224
0
          taskIt = findNextTask( threadId, nextTaskIt );
225
0
          if( taskIt.isValid() || m_exitThreads.load( std::memory_order_relaxed ) )
226
0
          {
227
0
            break;
228
0
          }
229
230
0
          if( !idleLock.owns_lock() )
231
0
          {
232
0
            if( VVDEC_BUSY_WAIT_TIME_MIN.count() == 0 || std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MIN )
233
0
            {
234
0
              ITT_TASKSTART( itt_domain_thrd, itt_handle_TPblocked );
235
0
              ScopeIncDecCounter cntr( m_poolPause.m_waitingForLockThreads );
236
0
              idleLock.lock();
237
0
              didBlock = true;
238
0
              ITT_TASKEND( itt_domain_thrd, itt_handle_TPblocked );
239
0
            }
240
0
          }
241
0
          else if( std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MAX )
242
0
          {
243
#if THREAD_POOL_TASK_NAMES
244
            printWaitingTasks();
245
#endif
246
0
            didBlock = m_poolPause.pauseIfAllOtherThreadsWaiting(
247
0
              [&]
248
0
              {
249
0
                taskIt = findNextTask( threadId, nextTaskIt );
250
0
                return taskIt.isValid() || m_exitThreads;
251
0
              } );
252
0
            if( taskIt.isValid() )
253
0
            {
254
0
              break;
255
0
            }
256
0
          }
257
0
        }
258
259
0
        ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait );
260
0
      }
261
0
      if( m_exitThreads )
262
0
      {
263
0
        return;
264
0
      }
265
266
0
      processTask( threadId, *taskIt );
267
268
0
      nextTaskIt = taskIt;
269
0
      nextTaskIt.incWrap();
270
0
    }
271
0
    catch( TaskException& e )
272
0
    {
273
0
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
274
0
    }
275
0
    catch( std::exception& e )
276
0
    {
277
0
      msg( ERROR, "ERROR: Caught unexpected exception from within the thread pool: %s", e.what() );
278
279
0
      if( m_exceptionFlag.exchange( true ) )
280
0
      {
281
0
        msg( ERROR, "ERROR: Another exception has already happend in the thread pool, but we can only store one." );
282
0
        return;
283
0
      }
284
0
      m_threadPoolException = std::current_exception();
285
0
      return;
286
0
    }
287
0
  }
288
0
}
289
290
bool ThreadPool::checkTaskReady( int threadId, CBarrierVec& barriers, ThreadPool::TaskFunc readyCheck, void* taskParam )
291
0
{
292
0
  if( !barriers.empty() )
293
0
  {
294
    // don't break early, because isBlocked() also checks exception state
295
0
    if( std::count_if( barriers.cbegin(), barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) )
296
0
    {
297
0
      return false;
298
0
    }
299
0
    barriers.clear();
300
0
  }
301
302
0
  if( readyCheck && readyCheck( threadId, taskParam ) == false )
303
0
  {
304
0
    return false;
305
0
  }
306
307
0
  return true;
308
0
}
309
310
ThreadPool::TaskIterator ThreadPool::findNextTask( int threadId, TaskIterator startSearch )
311
0
{
312
0
  if( !startSearch.isValid() )
313
0
  {
314
0
    startSearch = m_tasks.begin();
315
0
  }
316
0
  bool first = true;
317
0
  for( auto it = startSearch; it != startSearch || first; it.incWrap() )
318
0
  {
319
0
    first = false;
320
0
    try
321
0
    {
322
0
      Slot& task     = *it;
323
0
      auto  expected = WAITING;
324
0
      if( task.state == expected && task.state.compare_exchange_strong( expected, RUNNING ) )
325
0
      {
326
0
        if( checkTaskReady( threadId, task.barriers, task.readyCheck, task.param ) )
327
0
        {
328
0
          return it;
329
0
        }
330
331
        // reschedule
332
0
        task.state = WAITING;
333
0
      }
334
0
    }
335
0
    catch( ... )
336
0
    {
337
0
      throw TaskException( std::current_exception(), *it );
338
0
    }
339
0
  }
340
0
  return {};
341
0
}
342
343
bool ThreadPool::processTask( int threadId, ThreadPool::Slot& task )
344
0
{
345
0
  try
346
0
  {
347
0
    const bool success = task.func( threadId, task.param );
348
0
    if( !success )
349
0
    {
350
0
      task.state = WAITING;
351
0
      return false;
352
0
    }
353
354
0
    if( task.done != nullptr )
355
0
    {
356
0
      task.done->unlock();
357
0
    }
358
0
    if( task.counter != nullptr )
359
0
    {
360
0
      --(*task.counter);
361
0
    }
362
0
  }
363
0
  catch( ... )
364
0
  {
365
0
    throw TaskException( std::current_exception(), task );
366
0
  }
367
368
0
  task.state = FREE;
369
370
0
  return true;
371
0
}
372
373
bool ThreadPool::bypassTaskQueue( TaskFunc func, void* param, WaitCounter* counter, Barrier* done, CBarrierVec& barriers, TaskFunc readyCheck )
374
0
{
375
0
  CHECKD( numThreads() > 0, "the task queue should only be bypassed, when running single-threaded." );
376
0
  try
377
0
  {
378
    // if singlethreaded, execute all pending tasks
379
0
    bool waiting_tasks = m_nextFillSlot != m_tasks.begin();
380
0
    bool is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
381
0
    if( !is_ready && waiting_tasks )
382
0
    {
383
0
      waiting_tasks = processTasksOnMainThread();
384
0
      is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
385
0
    }
386
387
    // when no barriers block this task, execute it directly
388
0
    if( is_ready )
389
0
    {
390
0
      if( func( 0, param ) )
391
0
      {
392
0
        if( done != nullptr )
393
0
        {
394
0
          done->unlock();
395
0
        }
396
397
0
        if( waiting_tasks )
398
0
        {
399
0
          processTasksOnMainThread();
400
0
        }
401
0
        return true;
402
0
      }
403
0
    }
404
0
  }
405
0
  catch( ... )
406
0
  {
407
0
    handleTaskException( std::current_exception(), done, counter, nullptr );
408
0
  }
409
410
  // direct execution of the task failed
411
0
  return false;
412
0
}
413
414
void ThreadPool::handleTaskException( const std::exception_ptr e, Barrier* done, WaitCounter* counter, std::atomic<TaskState>* slot_state )
415
0
{
416
0
  if( done != nullptr )
417
0
  {
418
0
    done->setException( e );
419
0
  }
420
0
  if( counter != nullptr )
421
0
  {
422
0
    counter->setException( e );
423
    // Barrier::unlock() in the decrement operator throws, when the counter reaches zero, so we catch it here
424
0
    try
425
0
    {
426
0
      --( *counter );
427
0
    }
428
0
    catch( ... )
429
0
    {
430
0
    }
431
0
  }
432
433
0
  if( slot_state != nullptr )
434
0
  {
435
0
    *slot_state = FREE;
436
0
  }
437
0
}
438
439
#if THREAD_POOL_TASK_NAMES
440
void ThreadPool::printWaitingTasks()
441
{
442
  std::cerr << "Waiting tasks:" << std::endl;
443
  int count = 0;
444
  for( auto& t: m_tasks )
445
  {
446
    if( t.state == WAITING )
447
    {
448
      ++count;
449
      std::cerr << t.taskName << std::endl;
450
    }
451
  }
452
  std::cerr << std::endl << count << " total tasks waiting" << std::endl;
453
}
454
#endif  // THREAD_POOL_TASK_NAMES
455
456
// ---------------------------------------------------------------------------
457
// Chunked Task Queue
458
// ---------------------------------------------------------------------------
459
460
ThreadPool::ChunkedTaskQueue::~ChunkedTaskQueue()
461
0
{
462
0
  Chunk* next = m_firstChunk.m_next;
463
0
  while( next )
464
0
  {
465
0
    Chunk* curr = next;
466
0
    next = curr->m_next;
467
0
    delete curr;
468
0
  }
469
0
}
470
471
ThreadPool::ChunkedTaskQueue::Iterator ThreadPool::ChunkedTaskQueue::grow()
472
0
{
473
0
  std::lock_guard<std::mutex> l( m_resizeMutex );   // prevent concurrent growth of the queue. Read access while growing is no problem
474
475
0
  m_lastChunk->m_next = new Chunk( &m_firstChunk );
476
0
  m_lastChunk         = m_lastChunk->m_next;
477
478
0
  return Iterator{ &m_lastChunk->m_slots.front(), m_lastChunk };
479
0
}
480
481
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::operator++()
482
0
{
483
0
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
484
0
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
485
486
0
  if( m_slot != &m_chunk->m_slots.back() )
487
0
  {
488
0
    ++m_slot;
489
0
  }
490
0
  else
491
0
  {
492
0
    m_chunk = m_chunk->m_next;
493
0
    if( m_chunk )
494
0
    {
495
0
      m_slot = &m_chunk->m_slots.front();
496
0
    }
497
0
    else
498
0
    {
499
0
      m_slot = nullptr;
500
0
    }
501
0
  }
502
0
  return *this;
503
0
}
504
505
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::incWrap()
506
0
{
507
0
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
508
0
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
509
510
0
  if( m_slot != &m_chunk->m_slots.back() )
511
0
  {
512
0
    ++m_slot;
513
0
  }
514
0
  else
515
0
  {
516
0
    if( m_chunk->m_next )
517
0
    {
518
0
      m_chunk = m_chunk->m_next;
519
0
    }
520
0
    else
521
0
    {
522
0
      m_chunk = &m_chunk->m_firstChunk;
523
0
    }
524
0
    m_slot = &m_chunk->m_slots.front();
525
0
  }
526
0
  return *this;
527
0
}
528
529
void ThreadPool::PoolPause::unpauseIfPaused( std::unique_lock<std::mutex> lockOwnership )
530
0
{
531
0
  CHECKD( lockOwnership.mutex() != &m_allThreadsWaitingMutex, "wrong mutex passed into ThreadPool::PoolPause::unpauseIfPaused()" );
532
0
  CHECKD( !lockOwnership.owns_lock(), "lock passed into ThreadPool::PoolPause::unpauseIfPaused() does not own lock" );
533
  // All threads may be sleeping. If so, wake up.
534
0
  m_allThreadsWaiting = false;
535
0
  m_allThreadsWaitingCV.notify_all();
536
0
}
537
538
template<typename Predicate>
539
bool ThreadPool::PoolPause::pauseIfAllOtherThreadsWaiting( Predicate predicate )
540
0
{
541
0
  if( m_nrThreads == 0 )
542
0
  {
543
0
    return false;
544
0
  }
545
0
  const auto nrWaiting = m_waitingForLockThreads.load( std::memory_order_relaxed );
546
0
  if( nrWaiting < m_nrThreads - 1 )
547
0
  {
548
0
    return false;
549
0
  }
550
551
  // All threads are waiting. This (current) threads is the one which locked `idleLock`. All
552
  // other threads are waiting in the above condition for `idleLock.lock();`.
553
  // The only way how more work for the threads can come in is if addBarrierTask is called
554
  // or if the thread pool is closed or destroyed.
555
0
  std::unique_lock<std::mutex> lock( m_allThreadsWaitingMutex );
556
0
  m_allThreadsWaiting = true;
557
0
  m_allThreadsWaitingCV.wait( lock, [this, &predicate] { return !m_allThreadsWaiting || predicate(); } );
558
0
  return true;
559
0
}
560
561
}   // namespace vvdec