Coverage Report

Created: 2026-06-10 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/vvdec/source/Lib/Utilities/ThreadPool.cpp
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2018-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVdeC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
#include "ThreadPool.h"
44
45
#include <chrono>
46
47
#if __linux
48
# include <pthread.h>
49
#endif
50
51
52
namespace vvdec
53
{
54
using namespace std::chrono_literals;
55
56
std::mutex Barrier::s_exceptionLock{};
57
58
// block threads after busy-waiting this long
59
11
const static auto VVDEC_BUSY_WAIT_TIME_MIN = [] {
60
11
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MIN" );
61
11
  if( env )
62
0
    return std::chrono::microseconds( int( atof( env ) * 1000 ) );
63
11
  return std::chrono::microseconds( 1ms );
64
11
}();
65
66
// block last waiting thread after this time
67
11
const static auto VVDEC_BUSY_WAIT_TIME_MAX = [] {
68
11
  const char* env = getenv( "VVDEC_BUSY_WAIT_TIME_MAX" );
69
11
  if( env )
70
0
    return atoi( env ) * 1ms;
71
11
  return 5ms;
72
11
}();
73
74
75
struct ThreadPool::TaskException : public std::exception
76
{
77
  explicit TaskException( std::exception_ptr e, ThreadPool::Slot& task )
78
0
    : m_originalException( e )
79
0
    , m_task( task )
80
0
  {}
81
  std::exception_ptr m_originalException;
82
  ThreadPool::Slot&  m_task;
83
};
84
85
class ScopeIncDecCounter
86
{
87
  std::atomic_uint& m_cntr;
88
89
public:
90
6.34k
  explicit ScopeIncDecCounter( std::atomic_uint& counter ): m_cntr( counter ) { m_cntr.fetch_add( 1, std::memory_order_relaxed ); }
91
6.37k
  ~ScopeIncDecCounter()                                                       { m_cntr.fetch_sub( 1, std::memory_order_relaxed ); }
92
  CLASS_COPY_MOVE_DELETE( ScopeIncDecCounter )
93
};
94
95
// ---------------------------------------------------------------------------
96
// Thread Pool
97
// ---------------------------------------------------------------------------
98
99
ThreadPool::ThreadPool( int numThreads, const char* threadPoolName )
100
356
  : m_poolName( threadPoolName )
101
356
  , m_threads( numThreads < 0 ? std::thread::hardware_concurrency() : numThreads )
102
356
  , m_poolPause( m_threads.size() )
103
356
{
104
356
  int tid = 0;
105
356
  for( auto& t: m_threads )
106
11.3k
  {
107
11.3k
    t = std::thread( &ThreadPool::threadProc, this, tid++ );
108
11.3k
  }
109
356
}
110
111
ThreadPool::~ThreadPool()
112
356
{
113
356
  m_exitThreads = true;
114
115
356
  waitForThreads();
116
356
}
117
118
bool ThreadPool::processTasksOnMainThread()
119
0
{
120
0
  CHECK_FATAL( m_threads.size() != 0, "should not be used with multiple threads" );
121
122
0
  TaskIterator taskIt;
123
0
  while( true )
124
0
  {
125
0
    try
126
0
    {
127
0
      if( !taskIt.isValid() )
128
0
      {
129
0
        taskIt = findNextTask( 0, m_tasks.begin() );
130
0
      }
131
0
      else
132
0
      {
133
0
        taskIt = findNextTask( 0, taskIt );
134
0
      }
135
136
0
      if( !taskIt.isValid() )
137
0
      {
138
0
        break;
139
0
      }
140
0
      processTask( 0, *taskIt );
141
0
    }
142
0
    catch( TaskException& e )
143
0
    {
144
0
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
145
0
    }
146
0
  }
147
148
  // return true if all done (-> false if some tasks blocked due to barriers)
149
0
  return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } );
150
0
}
151
152
void ThreadPool::shutdown( bool block )
153
356
{
154
356
  m_exitThreads = true;
155
356
  if( block )
156
356
  {
157
356
    waitForThreads();
158
356
  }
159
356
}
160
161
void ThreadPool::waitForThreads()
162
712
{
163
712
  m_poolPause.unpauseIfPaused( m_poolPause.acquireLock() );
164
165
712
  for( auto& t: m_threads )
166
22.7k
  {
167
22.7k
    if( t.joinable() )
168
11.3k
      t.join();
169
22.7k
  }
170
712
}
171
172
void ThreadPool::checkAndThrowThreadPoolException()
173
0
{
174
0
  if( !m_exceptionFlag.load() )
175
0
  {
176
0
    return;
177
0
  }
178
179
0
  msg( WARNING, "ThreadPool is in exception state." );
180
181
0
  std::exception_ptr tmp = m_threadPoolException;
182
0
  m_threadPoolException  = tmp;
183
0
  m_exceptionFlag.store( false );
184
185
0
  std::rethrow_exception( tmp );
186
0
}
187
188
void ThreadPool::threadProc( int threadId )
189
11.3k
{
190
11.3k
#if __linux
191
11.3k
  if( !m_poolName.empty() )
192
11.3k
  {
193
11.3k
    std::string threadName( m_poolName + std::to_string( threadId ) );
194
11.3k
    pthread_setname_np( pthread_self(), threadName.c_str() );
195
11.3k
  }
196
11.3k
#endif
197
198
11.3k
  auto nextTaskIt = m_tasks.begin();
199
11.3k
  while( !m_exitThreads )
200
11.3k
  {
201
11.3k
    try
202
11.3k
    {
203
11.3k
      auto taskIt = findNextTask( threadId, nextTaskIt );
204
11.3k
      if( !taskIt.isValid() )   // immediately try again without any delay
205
10.9k
      {
206
10.9k
        taskIt = findNextTask( threadId, nextTaskIt );
207
10.9k
      }
208
11.3k
      if( !taskIt.isValid() )   // still nothing found, go into idle loop searching for more tasks
209
11.0k
      {
210
11.0k
        ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait );
211
212
11.0k
        std::unique_lock<std::mutex> idleLock( m_idleMutex, std::defer_lock );
213
214
11.0k
        const auto startWait = std::chrono::steady_clock::now();
215
11.0k
        bool       didBlock  = false;   // if the previous iteration did block we don't want to yield in this iteration
216
69.1k
        while( !m_exitThreads )
217
62.4k
        {
218
62.4k
          if( !didBlock )
219
62.0k
          {
220
62.0k
            std::this_thread::yield();
221
62.0k
          }
222
62.4k
          didBlock = false;
223
224
62.4k
          taskIt = findNextTask( threadId, nextTaskIt );
225
62.4k
          if( taskIt.isValid() || m_exitThreads.load( std::memory_order_relaxed ) )
226
4.29k
          {
227
4.29k
            break;
228
4.29k
          }
229
230
58.1k
          if( !idleLock.owns_lock() )
231
46.1k
          {
232
47.3k
            if( VVDEC_BUSY_WAIT_TIME_MIN.count() == 0 || std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MIN )
233
6.34k
            {
234
6.34k
              ITT_TASKSTART( itt_domain_thrd, itt_handle_TPblocked );
235
6.34k
              ScopeIncDecCounter cntr( m_poolPause.m_waitingForLockThreads );
236
6.34k
              idleLock.lock();
237
6.34k
              didBlock = true;
238
6.34k
              ITT_TASKEND( itt_domain_thrd, itt_handle_TPblocked );
239
6.34k
            }
240
46.1k
          }
241
12.0k
          else if( std::chrono::steady_clock::now() - startWait > VVDEC_BUSY_WAIT_TIME_MAX )
242
4
          {
243
#if THREAD_POOL_TASK_NAMES
244
            printWaitingTasks();
245
#endif
246
4
            didBlock = m_poolPause.pauseIfAllOtherThreadsWaiting(
247
4
              [&]
248
4
              {
249
0
                taskIt = findNextTask( threadId, nextTaskIt );
250
0
                return taskIt.isValid() || m_exitThreads;
251
0
              } );
252
4
            if( taskIt.isValid() )
253
0
            {
254
0
              break;
255
0
            }
256
4
          }
257
58.1k
        }
258
259
11.0k
        ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait );
260
11.0k
      }
261
11.3k
      if( m_exitThreads )
262
10.4k
      {
263
10.4k
        return;
264
10.4k
      }
265
266
886
      processTask( threadId, *taskIt );
267
268
886
      nextTaskIt = taskIt;
269
886
      nextTaskIt.incWrap();
270
886
    }
271
11.3k
    catch( TaskException& e )
272
11.3k
    {
273
0
      handleTaskException( e.m_originalException, e.m_task.done, e.m_task.counter, &e.m_task.state );
274
0
    }
275
11.3k
    catch( std::exception& e )
276
11.3k
    {
277
0
      msg( ERROR, "ERROR: Caught unexpected exception from within the thread pool: %s", e.what() );
278
279
0
      if( m_exceptionFlag.exchange( true ) )
280
0
      {
281
0
        msg( ERROR, "ERROR: Another exception has already happend in the thread pool, but we can only store one." );
282
0
        return;
283
0
      }
284
0
      m_threadPoolException = std::current_exception();
285
0
      return;
286
0
    }
287
11.3k
  }
288
11.3k
}
289
290
bool ThreadPool::checkTaskReady( int threadId, CBarrierVec& barriers, ThreadPool::TaskFunc readyCheck, void* taskParam )
291
0
{
292
0
  if( !barriers.empty() )
293
0
  {
294
    // don't break early, because isBlocked() also checks exception state
295
0
    if( std::count_if( barriers.cbegin(), barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) )
296
0
    {
297
0
      return false;
298
0
    }
299
0
    barriers.clear();
300
0
  }
301
302
0
  if( readyCheck && readyCheck( threadId, taskParam ) == false )
303
0
  {
304
0
    return false;
305
0
  }
306
307
0
  return true;
308
0
}
309
310
ThreadPool::TaskIterator ThreadPool::findNextTask( int threadId, TaskIterator startSearch )
311
83.1k
{
312
83.1k
  if( !startSearch.isValid() )
313
0
  {
314
0
    startSearch = m_tasks.begin();
315
0
  }
316
83.1k
  bool first = true;
317
2.13M
  for( auto it = startSearch; it != startSearch || first; it.incWrap() )
318
2.05M
  {
319
2.05M
    first = false;
320
2.05M
    try
321
2.05M
    {
322
2.05M
      Slot& task     = *it;
323
2.05M
      auto  expected = WAITING;
324
2.05M
      if( task.state == expected && task.state.compare_exchange_strong( expected, RUNNING ) )
325
0
      {
326
0
        if( checkTaskReady( threadId, task.barriers, task.readyCheck, task.param ) )
327
0
        {
328
0
          return it;
329
0
        }
330
331
        // reschedule
332
0
        task.state = WAITING;
333
0
      }
334
2.05M
    }
335
2.05M
    catch( ... )
336
2.05M
    {
337
0
      throw TaskException( std::current_exception(), *it );
338
0
    }
339
2.05M
  }
340
82.4k
  return {};
341
83.1k
}
342
343
bool ThreadPool::processTask( int threadId, ThreadPool::Slot& task )
344
0
{
345
0
  try
346
0
  {
347
0
    const bool success = task.func( threadId, task.param );
348
0
    if( !success )
349
0
    {
350
0
      task.state = WAITING;
351
0
      return false;
352
0
    }
353
354
0
    if( task.done != nullptr )
355
0
    {
356
0
      task.done->unlock();
357
0
    }
358
0
    if( task.counter != nullptr )
359
0
    {
360
0
      --(*task.counter);
361
0
    }
362
0
  }
363
0
  catch( ... )
364
0
  {
365
0
    throw TaskException( std::current_exception(), task );
366
0
  }
367
368
0
  task.state = FREE;
369
370
0
  return true;
371
0
}
372
373
bool ThreadPool::bypassTaskQueue( TaskFunc func, void* param, WaitCounter* counter, Barrier* done, CBarrierVec& barriers, TaskFunc readyCheck )
374
0
{
375
0
  CHECKD( numThreads() > 0, "the task queue should only be bypassed, when running single-threaded." );
376
0
  try
377
0
  {
378
    // if singlethreaded, execute all pending tasks
379
0
    bool waiting_tasks = m_nextFillSlot != m_tasks.begin();
380
0
    bool is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
381
0
    if( !is_ready && waiting_tasks )
382
0
    {
383
0
      waiting_tasks = processTasksOnMainThread();
384
0
      is_ready      = checkTaskReady( 0, barriers, (TaskFunc)readyCheck, param );
385
0
    }
386
387
    // when no barriers block this task, execute it directly
388
0
    if( is_ready )
389
0
    {
390
0
      if( func( 0, param ) )
391
0
      {
392
0
        if( done != nullptr )
393
0
        {
394
0
          done->unlock();
395
0
        }
396
397
0
        if( waiting_tasks )
398
0
        {
399
0
          processTasksOnMainThread();
400
0
        }
401
0
        return true;
402
0
      }
403
0
    }
404
0
  }
405
0
  catch( ... )
406
0
  {
407
0
    handleTaskException( std::current_exception(), done, counter, nullptr );
408
0
  }
409
410
  // direct execution of the task failed
411
0
  return false;
412
0
}
413
414
void ThreadPool::handleTaskException( const std::exception_ptr e, Barrier* done, WaitCounter* counter, std::atomic<TaskState>* slot_state )
415
0
{
416
0
  if( done != nullptr )
417
0
  {
418
0
    done->setException( e );
419
0
  }
420
0
  if( counter != nullptr )
421
0
  {
422
0
    counter->setException( e );
423
    // Barrier::unlock() in the decrement operator throws, when the counter reaches zero, so we catch it here
424
0
    try
425
0
    {
426
0
      --( *counter );
427
0
    }
428
0
    catch( ... )
429
0
    {
430
0
    }
431
0
  }
432
433
0
  if( slot_state != nullptr )
434
0
  {
435
0
    *slot_state = FREE;
436
0
  }
437
0
}
438
439
#if THREAD_POOL_TASK_NAMES
440
void ThreadPool::printWaitingTasks()
441
{
442
  std::cerr << "Waiting tasks:" << std::endl;
443
  int count = 0;
444
  for( auto& t: m_tasks )
445
  {
446
    if( t.state == WAITING )
447
    {
448
      ++count;
449
      std::cerr << t.taskName << std::endl;
450
    }
451
  }
452
  std::cerr << std::endl << count << " total tasks waiting" << std::endl;
453
}
454
#endif  // THREAD_POOL_TASK_NAMES
455
456
// ---------------------------------------------------------------------------
457
// Chunked Task Queue
458
// ---------------------------------------------------------------------------
459
460
ThreadPool::ChunkedTaskQueue::~ChunkedTaskQueue()
461
356
{
462
356
  Chunk* next = m_firstChunk.m_next;
463
356
  while( next )
464
0
  {
465
0
    Chunk* curr = next;
466
0
    next = curr->m_next;
467
0
    delete curr;
468
0
  }
469
356
}
470
471
ThreadPool::ChunkedTaskQueue::Iterator ThreadPool::ChunkedTaskQueue::grow()
472
0
{
473
0
  std::lock_guard<std::mutex> l( m_resizeMutex );   // prevent concurrent growth of the queue. Read access while growing is no problem
474
475
0
  m_lastChunk->m_next = new Chunk( &m_firstChunk );
476
0
  m_lastChunk         = m_lastChunk->m_next;
477
478
0
  return Iterator{ &m_lastChunk->m_slots.front(), m_lastChunk };
479
0
}
480
481
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::operator++()
482
0
{
483
0
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
484
0
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
485
486
0
  if( m_slot != &m_chunk->m_slots.back() )
487
0
  {
488
0
    ++m_slot;
489
0
  }
490
0
  else
491
0
  {
492
0
    m_chunk = m_chunk->m_next;
493
0
    if( m_chunk )
494
0
    {
495
0
      m_slot = &m_chunk->m_slots.front();
496
0
    }
497
0
    else
498
0
    {
499
0
      m_slot = nullptr;
500
0
    }
501
0
  }
502
0
  return *this;
503
0
}
504
505
ThreadPool::ChunkedTaskQueue::Iterator& ThreadPool::ChunkedTaskQueue::Iterator::incWrap()
506
2.05M
{
507
2.05M
  CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
508
2.05M
  CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
509
510
2.05M
  if( m_slot != &m_chunk->m_slots.back() )
511
2.04M
  {
512
2.04M
    ++m_slot;
513
2.04M
  }
514
8.68k
  else
515
8.68k
  {
516
8.68k
    if( m_chunk->m_next )
517
0
    {
518
0
      m_chunk = m_chunk->m_next;
519
0
    }
520
8.68k
    else
521
8.68k
    {
522
8.68k
      m_chunk = &m_chunk->m_firstChunk;
523
8.68k
    }
524
8.68k
    m_slot = &m_chunk->m_slots.front();
525
8.68k
  }
526
2.05M
  return *this;
527
2.05M
}
528
529
void ThreadPool::PoolPause::unpauseIfPaused( std::unique_lock<std::mutex> lockOwnership )
530
1.06k
{
531
1.06k
  CHECKD( lockOwnership.mutex() != &m_allThreadsWaitingMutex, "wrong mutex passed into ThreadPool::PoolPause::unpauseIfPaused()" );
532
1.06k
  CHECKD( !lockOwnership.owns_lock(), "lock passed into ThreadPool::PoolPause::unpauseIfPaused() does not own lock" );
533
  // All threads may be sleeping. If so, wake up.
534
1.06k
  m_allThreadsWaiting = false;
535
1.06k
  m_allThreadsWaitingCV.notify_all();
536
1.06k
}
537
538
template<typename Predicate>
539
bool ThreadPool::PoolPause::pauseIfAllOtherThreadsWaiting( Predicate predicate )
540
4
{
541
4
  if( m_nrThreads == 0 )
542
0
  {
543
0
    return false;
544
0
  }
545
4
  const auto nrWaiting = m_waitingForLockThreads.load( std::memory_order_relaxed );
546
4
  if( nrWaiting < m_nrThreads - 1 )
547
4
  {
548
4
    return false;
549
4
  }
550
551
  // All threads are waiting. This (current) threads is the one which locked `idleLock`. All
552
  // other threads are waiting in the above condition for `idleLock.lock();`.
553
  // The only way how more work for the threads can come in is if addBarrierTask is called
554
  // or if the thread pool is closed or destroyed.
555
0
  std::unique_lock<std::mutex> lock( m_allThreadsWaitingMutex );
556
0
  m_allThreadsWaiting = true;
557
0
  m_allThreadsWaitingCV.wait( lock, [this, &predicate] { return !m_allThreadsWaiting || predicate(); } );
558
0
  return true;
559
4
}
560
561
}   // namespace vvdec