Coverage Report

Created: 2026-06-16 07:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/vvdec/source/Lib/Utilities/ThreadPool.h
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2018-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVdeC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
#pragma once
44
45
#include <thread>
46
#include <mutex>
47
#include <condition_variable>
48
#include <atomic>
49
#include <exception>
50
#include <array>
51
52
#include "CommonLib/CommonDef.h"
53
54
namespace vvdec
55
{
56
57
#ifdef TRACE_ENABLE_ITT
58
static __itt_domain* itt_domain_thrd = __itt_domain_create( "Threading" );
59
60
static __itt_string_handle* itt_handle_TPspinWait = __itt_string_handle_create( "Spin_Wait" );
61
static __itt_string_handle* itt_handle_TPblocked  = __itt_string_handle_create( "Blocked" );
62
static __itt_string_handle* itt_handle_TPaddTask  = __itt_string_handle_create( "Add_Task" );
63
64
// static long itt_TP_blocked = 1;
65
#endif   // TRACE_ENABLE_ITT
66
67
#define THREAD_POOL_ADD_TASK_THREAD_SAFE 0   // enable this if tasks need to be added from mutliple threads
68
#define THREAD_POOL_TASK_NAMES           0   // simplify debugging of thread pool deadlocks
69
70
#if THREAD_POOL_TASK_NAMES
71
#  define TP_TASK_NAME_ARG( ... ) __VA_ARGS__ ,
72
#else
73
#  define TP_TASK_NAME_ARG( ... )
74
#endif
75
76
77
// ---------------------------------------------------------------------------
78
// Synchronization tools
79
// ---------------------------------------------------------------------------
80
81
struct Barrier
82
{
83
  virtual void unlock()
84
6.77k
  {
85
6.77k
    checkAndRethrowException();
86
87
6.77k
    m_lockState.store( false );
88
6.77k
  }
89
90
  virtual void lock()
91
21.7k
  {
92
21.7k
    checkAndRethrowException();
93
94
21.7k
    m_lockState.store( true );
95
21.7k
  }
96
97
  bool isBlocked() const
98
4.58M
  {
99
4.58M
    checkAndRethrowException();
100
101
4.58M
    return m_lockState;
102
4.58M
  }
103
104
  enum State
105
  {
106
    unlocked,
107
    locked,
108
    error
109
  };
110
111
  State getState() const
112
713
  {
113
713
    if( m_hasException )
114
0
      return error;
115
116
713
    if( m_lockState )
117
713
      return locked;
118
0
    return unlocked;
119
713
  }
120
121
  virtual void setException( std::exception_ptr e )
122
21.7k
  {
123
21.7k
    std::lock_guard<std::mutex> l( s_exceptionLock );
124
21.7k
    if( m_hasException )
125
9.03k
    {
126
9.03k
      CHECK_FATAL( m_exception == nullptr, "no exception currently stored, but flag is set" );
127
      // exception is already set -> no-op
128
9.03k
      return;
129
9.03k
    }
130
12.6k
    m_exception    = e;
131
12.6k
    m_hasException = true;
132
12.6k
  }
133
134
  virtual void clearException()
135
4.99k
  {
136
4.99k
    if( m_hasException )
137
2.75k
    {
138
2.75k
      std::lock_guard<std::mutex> l( s_exceptionLock );
139
2.75k
      m_hasException = false;
140
2.75k
      m_exception    = nullptr;
141
2.75k
    }
142
4.99k
  }
143
144
  const std::exception_ptr getException() const
145
3.47k
  {
146
3.47k
    if( !m_hasException )
147
0
    {
148
0
      return nullptr;
149
0
    }
150
151
3.47k
    std::lock_guard<std::mutex> l( s_exceptionLock );
152
3.47k
    return m_exception;
153
3.47k
  }
154
155
646k
  bool hasException() const { return m_hasException; }
156
157
  inline void checkAndRethrowException() const
158
4.61M
  {
159
4.61M
    if LIKELY( !m_hasException )
160
4.60M
    {
161
4.60M
      return;
162
4.60M
    }
163
164
8.20k
    std::lock_guard<std::mutex> l( s_exceptionLock );
165
8.20k
    if( m_hasException )
166
8.64k
    {
167
8.64k
      CHECK_FATAL( m_exception == nullptr, "no exception currently stored, but flag is set" );
168
8.64k
      std::rethrow_exception( m_exception );
169
8.64k
    }
170
8.20k
  }
171
172
13.0k
  Barrier()  = default;
173
15.1k
  virtual ~Barrier() = default;
174
2.14k
  explicit Barrier( bool locked ) : m_lockState( locked ) {}
175
  CLASS_COPY_MOVE_DELETE( Barrier )
176
177
private:
178
  std::atomic_bool   m_lockState{ true };
179
  std::atomic_bool   m_hasException{ false };
180
  std::exception_ptr m_exception;
181
  static std::mutex  s_exceptionLock;   // we use one shared mutex for all barriers here. It is only involved, when exceptions actually happen, so there should
182
                                        // be no contention during normal operations
183
};
184
185
struct BlockingBarrier: public Barrier
186
{
187
  void unlock() override
188
716
  {
189
716
    std::lock_guard<std::mutex> l( m_lock );
190
716
    Barrier::unlock();
191
716
    m_cond.notify_all();
192
716
  }
193
194
  void lock() override
195
1.42k
  {
196
1.42k
    std::lock_guard<std::mutex> l( m_lock );
197
1.42k
    Barrier::lock();
198
1.42k
  }
199
200
  void wait() const
201
713
  {
202
713
    std::unique_lock<std::mutex> l( m_lock );
203
713
    if( Barrier::isBlocked() )
204
621
    {
205
1.24k
      m_cond.wait( l, [this] { return !Barrier::isBlocked(); } );
206
621
    }
207
713
  }
208
209
  void setException( std::exception_ptr e ) override
210
1.42k
  {
211
1.42k
    std::lock_guard<std::mutex> l( m_lock );
212
1.42k
    Barrier::setException( e );
213
1.42k
    m_cond.notify_all();
214
1.42k
  }
215
216
  void clearException() override
217
2.13k
  {
218
2.13k
    std::lock_guard<std::mutex> l( m_lock );
219
2.13k
    Barrier::clearException();
220
2.13k
  }
221
222
1.42k
  BlockingBarrier()  = default;
223
1.42k
  ~BlockingBarrier() { std::lock_guard<std::mutex> l( m_lock ); }   // ensure all threads have unlocked the mutex, when we start destruction
224
  CLASS_COPY_MOVE_DELETE( BlockingBarrier )
225
226
private:
227
  mutable std::condition_variable m_cond;
228
  mutable std::mutex              m_lock;
229
};
230
231
struct WaitCounter
232
{
233
  int operator++()
234
10.3k
  {
235
10.3k
    std::lock_guard<std::mutex> l( m_lock );
236
10.3k
    m_done.lock();
237
10.3k
    return ++m_count;
238
10.3k
  }
239
240
  int operator--()
241
10.3k
  {
242
10.3k
    std::unique_lock<std::mutex> l( m_lock );
243
10.3k
    const unsigned int new_count = --m_count;
244
10.3k
    if( new_count == 0 )
245
1.33k
    {
246
1.33k
      m_cond.notify_all();   // we can notify before unlocking the barrier, because wait() and wait_nothrow() wait for m_count and not for m_done
247
1.33k
      m_done.unlock();
248
1.33k
    }
249
10.3k
    l.unlock(); // unlock mutex after done-barrier to prevent race between barrier and counter
250
10.3k
    return new_count;
251
10.3k
  }
252
253
  bool isBlocked() const
254
0
  {
255
0
    std::lock_guard<std::mutex> l( m_lock );
256
0
    m_done.checkAndRethrowException();
257
0
    return 0 != m_count;
258
0
  }
259
260
  void wait() const
261
0
  {
262
0
    std::unique_lock<std::mutex> l( m_lock );
263
0
    m_cond.wait( l, [this] { return m_count == 0 || m_done.hasException(); } );
264
0
    m_done.checkAndRethrowException();
265
0
  }
266
267
  void wait_nothrow() const
268
4.26k
  {
269
4.26k
    std::unique_lock<std::mutex> l( m_lock );
270
6.96k
    m_cond.wait( l, [this] { return m_count == 0; } );
271
4.26k
  }
272
273
  void setException( std::exception_ptr e )
274
10.3k
  {
275
10.3k
    std::lock_guard<std::mutex> l( m_lock );
276
10.3k
    m_done.setException( e );
277
10.3k
    m_cond.notify_all();
278
10.3k
  }
279
280
  void clearException()
281
2.14k
  {
282
2.14k
    std::lock_guard<std::mutex> l( m_lock );
283
2.14k
    m_done.clearException();
284
2.14k
  }
285
286
646k
  bool                     hasException() const { return m_done.hasException(); }
287
3.47k
  const std::exception_ptr getException() const { return m_done.getException(); }
288
289
2.14k
  WaitCounter() = default;
290
2.14k
  ~WaitCounter() { std::lock_guard<std::mutex> l( m_lock ); }   // ensure all threads have unlocked the mutex, when we start destruction
291
  CLASS_COPY_MOVE_DELETE( WaitCounter )
292
293
621
  const Barrier* donePtr() const { return &m_done; }
294
295
private:
296
  mutable std::condition_variable m_cond;
297
  mutable std::mutex              m_lock;
298
  unsigned int                    m_count = 0;
299
  Barrier                         m_done{ false };
300
};
301
302
// ---------------------------------------------------------------------------
303
// Thread Pool
304
// ---------------------------------------------------------------------------
305
306
using CBarrierVec = std::vector<const Barrier*>;
307
308
class ThreadPool
309
{
310
  typedef enum
311
  {
312
    FREE = 0,
313
    PREPARING,
314
    WAITING,
315
    RUNNING
316
  } TaskState;
317
318
  using TaskFunc = bool ( * )( int, void * );
319
320
  struct Slot
321
  {
322
    TaskFunc               func      { nullptr };
323
    TaskFunc               readyCheck{ nullptr };
324
    void*                  param     { nullptr };
325
    WaitCounter*           counter   { nullptr };
326
    Barrier*               done      { nullptr };
327
    CBarrierVec            barriers;
328
    std::atomic<TaskState> state     { FREE };
329
#if THREAD_POOL_TASK_NAMES
330
    std::string            taskName;
331
#endif
332
  };
333
334
  class ChunkedTaskQueue
335
  {
336
    constexpr static int ChunkSize = 128;
337
338
    class Chunk
339
    {
340
      std::array<Slot, ChunkSize> m_slots;
341
      std::atomic<Chunk*>         m_next{ nullptr };
342
      Chunk&                      m_firstChunk;
343
344
935
      Chunk( Chunk* firstPtr ) : m_firstChunk{ *firstPtr } {}
345
346
      friend class ChunkedTaskQueue;
347
    };
348
349
  public:
350
    class Iterator
351
    {
352
      Slot*  m_slot  = nullptr;
353
      Chunk* m_chunk = nullptr;
354
355
    public:
356
752k
      Iterator() = default;
357
30.8k
      Iterator( Slot* slot, Chunk* chunk ) : m_slot( slot ), m_chunk( chunk ) {}
358
359
      Iterator& operator++();
360
361
      // increment iterator and wrap around, if end is reached
362
      Iterator& incWrap();
363
364
0
      bool operator==( const Iterator& rhs ) const { return m_slot == rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
365
76.7M
      bool operator!=( const Iterator& rhs ) const { return m_slot != rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
366
367
76.0M
      Slot& operator*() { return *m_slot; }
368
369
1.71M
      bool isValid() const { return m_slot != nullptr && m_chunk != nullptr; }
370
371
      using iterator_category = std::forward_iterator_tag;
372
      using value_type        = Slot;
373
      using pointer           = Slot*;
374
      using reference         = Slot&;
375
      using difference_type   = ptrdiff_t;
376
    };
377
378
935
    ChunkedTaskQueue() = default;
379
    ~ChunkedTaskQueue();
380
    CLASS_COPY_MOVE_DELETE( ChunkedTaskQueue )
381
382
    // grow the queue by adding a chunk and return an iterator to the first new task-slot
383
    Iterator grow();
384
385
30.8k
    Iterator begin() { return Iterator{ &m_firstChunk.m_slots.front(), &m_firstChunk }; }
386
0
    Iterator end()   { return Iterator{ nullptr, nullptr }; }
387
388
  private:
389
    Chunk  m_firstChunk{ &m_firstChunk };
390
    Chunk* m_lastChunk = &m_firstChunk;
391
392
    std::mutex m_resizeMutex;
393
  };
394
395
private:
396
  class PoolPause
397
  {
398
  public:
399
935
    PoolPause( size_t numThreads ) : m_nrThreads( numThreads ){};
400
13.1k
    auto acquireLock() { return std::unique_lock<std::mutex>( m_allThreadsWaitingMutex ); }
401
    void unpauseIfPaused( std::unique_lock<std::mutex> lockOwnership );
402
    template<typename Predicate>
403
    bool pauseIfAllOtherThreadsWaiting( Predicate predicate );
404
935
    ~PoolPause() { unpauseIfPaused( acquireLock() ); }
405
406
    std::atomic_uint m_waitingForLockThreads{ 0 };
407
408
  private:
409
    std::mutex              m_allThreadsWaitingMutex;
410
    std::condition_variable m_allThreadsWaitingCV;
411
    bool                    m_allThreadsWaiting{ false };
412
    size_t                  m_nrThreads{};
413
  };
414
415
public:
416
  ThreadPool( int numThreads = 1, const char *threadPoolName = nullptr );
417
  ~ThreadPool();
418
419
  bool addBarrierTask( TP_TASK_NAME_ARG( std::string&& taskName )
420
                       bool       ( *func )( int, void* ),
421
                       void*         param,
422
                       WaitCounter*  counter                    = nullptr,
423
                       Barrier*      done                       = nullptr,
424
                       CBarrierVec&& barriers                   = {},
425
                       bool       ( *readyCheck )( int, void* ) = nullptr )
426
10.3k
  {
427
10.3k
    if( m_threads.empty() )
428
0
    {
429
      // in the single threaded case try to exectute the task directly
430
0
      if( bypassTaskQueue( (TaskFunc)func, param, counter, done, barriers, (TaskFunc)readyCheck ) )
431
0
      {
432
0
        return true;
433
0
      }
434
0
    }
435
10.3k
    else
436
10.3k
    {
437
10.3k
      checkAndThrowThreadPoolException();
438
10.3k
    }
439
440
10.3k
    while( true )
441
10.3k
    {
442
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
443
      std::unique_lock<std::mutex> l( m_nextFillSlotMutex );
444
#endif
445
10.3k
      CHECKD( !m_nextFillSlot.isValid(), "Next fill slot iterator should always be valid" );
446
10.3k
      const auto startIt = m_nextFillSlot;
447
448
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
449
      l.unlock();
450
#endif
451
452
10.3k
      bool first = true;
453
10.3k
      for( auto it = startIt; it != startIt || first; it.incWrap() )
454
10.3k
      {
455
10.3k
        first = false;
456
457
10.3k
        auto& t = *it;
458
10.3k
        auto expected = FREE;
459
10.3k
        if( t.state.load( std::memory_order_relaxed ) == FREE && t.state.compare_exchange_strong( expected, PREPARING ) )
460
10.3k
        {
461
10.3k
          if( counter )
462
10.3k
          {
463
10.3k
            counter->operator++();
464
10.3k
          }
465
466
10.3k
          t.func       = (TaskFunc)func;
467
10.3k
          t.readyCheck = (TaskFunc)readyCheck;
468
10.3k
          t.param      = param;
469
10.3k
          t.done       = done;
470
10.3k
          t.counter    = counter;
471
10.3k
          t.barriers   = std::move( barriers );
472
#if THREAD_POOL_TASK_NAMES
473
          t.taskName   = std::move( taskName );
474
#endif
475
10.3k
          auto poolPauseLock( m_poolPause.acquireLock() );
476
10.3k
          t.state = WAITING;
477
478
10.3k
          m_poolPause.unpauseIfPaused( std::move( poolPauseLock ) );
479
480
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
481
          l.lock();
482
#endif
483
10.3k
          m_nextFillSlot.incWrap();
484
10.3k
          return true;
485
10.3k
        }
486
10.3k
      }
487
488
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
489
      l.lock();
490
#endif
491
0
      m_nextFillSlot = m_tasks.grow();
492
0
    }
493
0
    return false;
494
10.3k
  }
495
496
  bool processTasksOnMainThread();
497
  void checkAndThrowThreadPoolException();
498
499
  void shutdown( bool block );
500
  void waitForThreads();
501
502
6.05k
  int numThreads() const { return (int)m_threads.size(); }
503
504
private:
505
  using TaskIterator = ChunkedTaskQueue::Iterator;
506
  struct TaskException;
507
508
  // members
509
  std::string              m_poolName;
510
  std::atomic_bool         m_exitThreads{ false };
511
  std::vector<std::thread> m_threads;
512
  ChunkedTaskQueue         m_tasks;
513
  TaskIterator             m_nextFillSlot = m_tasks.begin();
514
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
515
  std::mutex               m_nextFillSlotMutex;
516
#endif
517
  std::mutex               m_idleMutex;
518
  PoolPause                m_poolPause;
519
520
  std::atomic_bool         m_exceptionFlag{ false };
521
  std::exception_ptr       m_threadPoolException;
522
523
524
  // internal functions
525
  void         threadProc     ( int threadId );
526
  static bool  checkTaskReady ( int threadId, CBarrierVec& barriers, TaskFunc readyCheck, void* taskParam );
527
  TaskIterator findNextTask   ( int threadId, TaskIterator startSearch );
528
  static bool  processTask    ( int threadId, Slot& task );
529
  bool         bypassTaskQueue( TaskFunc func, void* param, WaitCounter* counter, Barrier* done, CBarrierVec& barriers, TaskFunc readyCheck );
530
  static void  handleTaskException( const std::exception_ptr e, Barrier* done, WaitCounter* counter, std::atomic<TaskState>* slot_state );
531
#if THREAD_POOL_TASK_NAMES
532
  void         printWaitingTasks();
533
#endif
534
};
535
536
}   // namespace vvdec