Coverage Report

Created: 2026-04-01 07:49

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/vvdec/source/Lib/Utilities/ThreadPool.h
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2018-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVdeC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
#pragma once
44
45
#include <thread>
46
#include <mutex>
47
#include <condition_variable>
48
#include <atomic>
49
#include <exception>
50
#include <array>
51
52
#include "CommonLib/CommonDef.h"
53
54
namespace vvdec
55
{
56
57
#ifdef TRACE_ENABLE_ITT
58
static __itt_domain* itt_domain_thrd = __itt_domain_create( "Threading" );
59
60
static __itt_string_handle* itt_handle_TPspinWait = __itt_string_handle_create( "Spin_Wait" );
61
static __itt_string_handle* itt_handle_TPblocked  = __itt_string_handle_create( "Blocked" );
62
static __itt_string_handle* itt_handle_TPaddTask  = __itt_string_handle_create( "Add_Task" );
63
64
// static long itt_TP_blocked = 1;
65
#endif   // TRACE_ENABLE_ITT
66
67
#define THREAD_POOL_ADD_TASK_THREAD_SAFE 0   // enable this if tasks need to be added from mutliple threads
68
#define THREAD_POOL_TASK_NAMES           0   // simplify debugging of thread pool deadlocks
69
70
#if THREAD_POOL_TASK_NAMES
71
#  define TP_TASK_NAME_ARG( ... ) __VA_ARGS__ ,
72
#else
73
#  define TP_TASK_NAME_ARG( ... )
74
#endif
75
76
77
// ---------------------------------------------------------------------------
78
// Synchronization tools
79
// ---------------------------------------------------------------------------
80
81
struct Barrier
82
{
83
  virtual void unlock()
84
0
  {
85
0
    checkAndRethrowException();
86
87
0
    m_lockState.store( false );
88
0
  }
89
90
  virtual void lock()
91
0
  {
92
0
    checkAndRethrowException();
93
94
0
    m_lockState.store( true );
95
0
  }
96
97
  bool isBlocked() const
98
0
  {
99
0
    checkAndRethrowException();
100
101
0
    return m_lockState;
102
0
  }
103
104
  enum State
105
  {
106
    unlocked,
107
    locked,
108
    error
109
  };
110
111
  State getState() const
112
0
  {
113
0
    if( m_hasException )
114
0
      return error;
115
116
0
    if( m_lockState )
117
0
      return locked;
118
0
    return unlocked;
119
0
  }
120
121
  virtual void setException( std::exception_ptr e )
122
0
  {
123
0
    std::lock_guard<std::mutex> l( s_exceptionLock );
124
0
    if( m_hasException )
125
0
    {
126
0
      CHECK_FATAL( m_exception == nullptr, "no exception currently stored, but flag is set" );
127
      // exception is already set -> no-op
128
0
      return;
129
0
    }
130
0
    m_exception    = e;
131
0
    m_hasException = true;
132
0
  }
133
134
  virtual void clearException()
135
0
  {
136
0
    if( m_hasException )
137
0
    {
138
0
      std::lock_guard<std::mutex> l( s_exceptionLock );
139
0
      m_hasException = false;
140
0
      m_exception    = nullptr;
141
0
    }
142
0
  }
143
144
  const std::exception_ptr getException() const
145
0
  {
146
0
    if( !m_hasException )
147
0
    {
148
0
      return nullptr;
149
0
    }
150
151
0
    std::lock_guard<std::mutex> l( s_exceptionLock );
152
0
    return m_exception;
153
0
  }
154
155
0
  bool hasException() const { return m_hasException; }
156
157
  inline void checkAndRethrowException() const
158
0
  {
159
0
    if( !m_hasException )
160
0
    {
161
0
      return;
162
0
    }
163
164
0
    std::lock_guard<std::mutex> l( s_exceptionLock );
165
0
    if( m_hasException )
166
0
    {
167
0
      CHECK_FATAL( m_exception == nullptr, "no exception currently stored, but flag is set" );
168
0
      std::rethrow_exception( m_exception );
169
0
    }
170
0
  }
171
172
0
  Barrier()  = default;
173
0
  virtual ~Barrier() = default;
174
0
  explicit Barrier( bool locked ) : m_lockState( locked ) {}
175
  CLASS_COPY_MOVE_DELETE( Barrier )
176
177
private:
178
  std::atomic_bool   m_lockState{ true };
179
  std::atomic_bool   m_hasException{ false };
180
  std::exception_ptr m_exception;
181
  static std::mutex  s_exceptionLock;   // we use one shared mutex for all barriers here. It is only involved, when exceptions actually happen, so there should
182
                                        // be no contention during normal operations
183
};
184
185
struct BlockingBarrier: public Barrier
186
{
187
  void unlock() override
188
0
  {
189
0
    std::lock_guard<std::mutex> l( m_lock );
190
0
    Barrier::unlock();
191
0
    m_cond.notify_all();
192
0
  }
193
194
  void lock() override
195
0
  {
196
0
    std::lock_guard<std::mutex> l( m_lock );
197
0
    Barrier::lock();
198
0
  }
199
200
  void wait() const
201
0
  {
202
0
    std::unique_lock<std::mutex> l( m_lock );
203
0
    if( Barrier::isBlocked() )
204
0
    {
205
0
      m_cond.wait( l, [this] { return !Barrier::isBlocked(); } );
206
0
    }
207
0
  }
208
209
  void setException( std::exception_ptr e ) override
210
0
  {
211
0
    std::lock_guard<std::mutex> l( m_lock );
212
0
    Barrier::setException( e );
213
0
    m_cond.notify_all();
214
0
  }
215
216
  void clearException() override
217
0
  {
218
0
    std::lock_guard<std::mutex> l( m_lock );
219
0
    Barrier::clearException();
220
0
  }
221
222
0
  BlockingBarrier()  = default;
223
0
  ~BlockingBarrier() { std::lock_guard<std::mutex> l( m_lock ); }   // ensure all threads have unlocked the mutex, when we start destruction
224
  CLASS_COPY_MOVE_DELETE( BlockingBarrier )
225
226
private:
227
  mutable std::condition_variable m_cond;
228
  mutable std::mutex              m_lock;
229
};
230
231
struct WaitCounter
232
{
233
  int operator++()
234
0
  {
235
0
    std::lock_guard<std::mutex> l( m_lock );
236
0
    m_done.lock();
237
0
    return ++m_count;
238
0
  }
239
240
  int operator--()
241
0
  {
242
0
    std::unique_lock<std::mutex> l( m_lock );
243
0
    const unsigned int new_count = --m_count;
244
0
    if( new_count == 0 )
245
0
    {
246
0
      m_done.unlock();
247
0
      m_cond.notify_all();
248
0
    }
249
0
    l.unlock(); // unlock mutex after done-barrier to prevent race between barrier and counter
250
0
    return new_count;
251
0
  }
252
253
  bool isBlocked() const
254
0
  {
255
0
    std::lock_guard<std::mutex> l( m_lock );
256
0
    m_done.checkAndRethrowException();
257
0
    return 0 != m_count;
258
0
  }
259
260
  void wait() const
261
0
  {
262
0
    std::unique_lock<std::mutex> l( m_lock );
263
0
    m_cond.wait( l, [this] { return m_count == 0 || m_done.hasException(); } );
264
0
    m_done.checkAndRethrowException();
265
0
  }
266
267
  void wait_nothrow() const
268
0
  {
269
0
    std::unique_lock<std::mutex> l( m_lock );
270
0
    m_cond.wait( l, [this] { return m_count == 0; } );
271
0
  }
272
273
  void setException( std::exception_ptr e )
274
0
  {
275
0
    std::lock_guard<std::mutex> l( m_lock );
276
0
    m_done.setException( e );
277
0
    m_cond.notify_all();
278
0
  }
279
280
  void clearException()
281
0
  {
282
0
    std::lock_guard<std::mutex> l( m_lock );
283
0
    m_done.clearException();
284
0
  }
285
286
0
  bool                     hasException() const { return m_done.hasException(); }
287
0
  const std::exception_ptr getException() const { return m_done.getException(); }
288
289
0
  WaitCounter() = default;
290
0
  ~WaitCounter() { std::lock_guard<std::mutex> l( m_lock ); }   // ensure all threads have unlocked the mutex, when we start destruction
291
  CLASS_COPY_MOVE_DELETE( WaitCounter )
292
293
0
  const Barrier* donePtr() const { return &m_done; }
294
295
private:
296
  mutable std::condition_variable m_cond;
297
  mutable std::mutex              m_lock;
298
  unsigned int                    m_count = 0;
299
  Barrier                         m_done{ false };
300
};
301
302
// ---------------------------------------------------------------------------
303
// Thread Pool
304
// ---------------------------------------------------------------------------
305
306
using CBarrierVec = std::vector<const Barrier*>;
307
308
class ThreadPool
309
{
310
  typedef enum
311
  {
312
    FREE = 0,
313
    PREPARING,
314
    WAITING,
315
    RUNNING
316
  } TaskState;
317
318
  using TaskFunc = bool ( * )( int, void * );
319
320
  struct Slot
321
  {
322
    TaskFunc               func      { nullptr };
323
    TaskFunc               readyCheck{ nullptr };
324
    void*                  param     { nullptr };
325
    WaitCounter*           counter   { nullptr };
326
    Barrier*               done      { nullptr };
327
    CBarrierVec            barriers;
328
    std::atomic<TaskState> state     { FREE };
329
#if THREAD_POOL_TASK_NAMES
330
    std::string            taskName;
331
#endif
332
  };
333
334
  class ChunkedTaskQueue
335
  {
336
    constexpr static int ChunkSize = 128;
337
338
    class Chunk
339
    {
340
      std::array<Slot, ChunkSize> m_slots;
341
      std::atomic<Chunk*>         m_next{ nullptr };
342
      Chunk&                      m_firstChunk;
343
344
0
      Chunk( Chunk* firstPtr ) : m_firstChunk{ *firstPtr } {}
345
346
      friend class ChunkedTaskQueue;
347
    };
348
349
  public:
350
    class Iterator
351
    {
352
      Slot*  m_slot  = nullptr;
353
      Chunk* m_chunk = nullptr;
354
355
    public:
356
0
      Iterator() = default;
357
0
      Iterator( Slot* slot, Chunk* chunk ) : m_slot( slot ), m_chunk( chunk ) {}
358
359
      Iterator& operator++();
360
361
      // increment iterator and wrap around, if end is reached
362
      Iterator& incWrap();
363
364
0
      bool operator==( const Iterator& rhs ) const { return m_slot == rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
365
0
      bool operator!=( const Iterator& rhs ) const { return m_slot != rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
366
367
0
      Slot& operator*() { return *m_slot; }
368
369
0
      bool isValid() const { return m_slot != nullptr && m_chunk != nullptr; }
370
371
      using iterator_category = std::forward_iterator_tag;
372
      using value_type        = Slot;
373
      using pointer           = Slot*;
374
      using reference         = Slot&;
375
      using difference_type   = ptrdiff_t;
376
    };
377
378
0
    ChunkedTaskQueue() = default;
379
    ~ChunkedTaskQueue();
380
    CLASS_COPY_MOVE_DELETE( ChunkedTaskQueue )
381
382
    // grow the queue by adding a chunk and return an iterator to the first new task-slot
383
    Iterator grow();
384
385
0
    Iterator begin() { return Iterator{ &m_firstChunk.m_slots.front(), &m_firstChunk }; }
386
0
    Iterator end()   { return Iterator{ nullptr, nullptr }; }
387
388
  private:
389
    Chunk  m_firstChunk{ &m_firstChunk };
390
    Chunk* m_lastChunk = &m_firstChunk;
391
392
    std::mutex m_resizeMutex;
393
  };
394
395
private:
396
  class PoolPause
397
  {
398
  public:
399
0
    PoolPause( size_t numThreads ) : m_nrThreads( numThreads ){};
400
0
    auto acquireLock() { return std::unique_lock<std::mutex>( m_allThreadsWaitingMutex ); }
401
    void unpauseIfPaused( std::unique_lock<std::mutex> lockOwnership );
402
    template<typename Predicate>
403
    bool pauseIfAllOtherThreadsWaiting( Predicate predicate );
404
0
    ~PoolPause() { unpauseIfPaused( acquireLock() ); }
405
406
    std::atomic_uint m_waitingForLockThreads{ 0 };
407
408
  private:
409
    std::mutex              m_allThreadsWaitingMutex;
410
    std::condition_variable m_allThreadsWaitingCV;
411
    bool                    m_allThreadsWaiting{ false };
412
    size_t                  m_nrThreads{};
413
  };
414
415
public:
416
  ThreadPool( int numThreads = 1, const char *threadPoolName = nullptr );
417
  ~ThreadPool();
418
419
  template<class TParam>
420
  bool addBarrierTask( TP_TASK_NAME_ARG( std::string&& taskName )
421
                       bool       ( *func )( int, TParam* ),
422
                       TParam*       param,
423
                       WaitCounter*  counter                      = nullptr,
424
                       Barrier*      done                         = nullptr,
425
                       CBarrierVec&& barriers                     = {},
426
                       bool       ( *readyCheck )( int, TParam* ) = nullptr )
427
0
  {
428
0
    if( m_threads.empty() )
429
0
    {
430
      // in the single threaded case try to exectute the task directly
431
0
      if( bypassTaskQueue( (TaskFunc)func, param, counter, done, barriers, (TaskFunc)readyCheck ) )
432
0
      {
433
0
        return true;
434
0
      }
435
0
    }
436
0
    else
437
0
    {
438
0
      checkAndThrowThreadPoolException();
439
0
    }
440
441
0
    while( true )
442
0
    {
443
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
444
      std::unique_lock<std::mutex> l( m_nextFillSlotMutex );
445
#endif
446
0
      CHECKD( !m_nextFillSlot.isValid(), "Next fill slot iterator should always be valid" );
447
0
      const auto startIt = m_nextFillSlot;
448
449
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
450
      l.unlock();
451
#endif
452
453
0
      bool first = true;
454
0
      for( auto it = startIt; it != startIt || first; it.incWrap() )
455
0
      {
456
0
        first = false;
457
458
0
        auto& t = *it;
459
0
        auto expected = FREE;
460
0
        if( t.state.load( std::memory_order_relaxed ) == FREE && t.state.compare_exchange_strong( expected, PREPARING ) )
461
0
        {
462
0
          if( counter )
463
0
          {
464
0
            counter->operator++();
465
0
          }
466
467
0
          t.func       = (TaskFunc)func;
468
0
          t.readyCheck = (TaskFunc)readyCheck;
469
0
          t.param      = param;
470
0
          t.done       = done;
471
0
          t.counter    = counter;
472
0
          t.barriers   = std::move( barriers );
473
#if THREAD_POOL_TASK_NAMES
474
          t.taskName   = std::move( taskName );
475
#endif
476
0
          auto poolPauseLock( m_poolPause.acquireLock() );
477
0
          t.state = WAITING;
478
479
0
          m_poolPause.unpauseIfPaused( std::move( poolPauseLock ) );
480
481
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
482
          l.lock();
483
#endif
484
0
          m_nextFillSlot.incWrap();
485
0
          return true;
486
0
        }
487
0
      }
488
489
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
490
      l.lock();
491
#endif
492
0
      m_nextFillSlot = m_tasks.grow();
493
0
    }
494
0
    return false;
495
0
  }
Unexecuted instantiation: vvdecimpl.cpp:bool vvdec::ThreadPool::addBarrierTask<vvdec::VVDecImpl::xAddGrain(vvdecFrame*)::GrainTaskData>(bool (*)(int, vvdec::VVDecImpl::xAddGrain(vvdecFrame*)::GrainTaskData*), vvdec::VVDecImpl::xAddGrain(vvdecFrame*)::GrainTaskData*, vvdec::WaitCounter*, vvdec::Barrier*, std::__1::vector<vvdec::Barrier const*, std::__1::allocator<vvdec::Barrier const*> >&&, bool (*)(int, vvdec::VVDecImpl::xAddGrain(vvdecFrame*)::GrainTaskData*))
Unexecuted instantiation: bool vvdec::ThreadPool::addBarrierTask<vvdec::Slice>(bool (*)(int, vvdec::Slice*), vvdec::Slice*, vvdec::WaitCounter*, vvdec::Barrier*, std::__1::vector<vvdec::Barrier const*, std::__1::allocator<vvdec::Barrier const*> >&&, bool (*)(int, vvdec::Slice*))
Unexecuted instantiation: bool vvdec::ThreadPool::addBarrierTask<vvdec::Picture>(bool (*)(int, vvdec::Picture*), vvdec::Picture*, vvdec::WaitCounter*, vvdec::Barrier*, std::__1::vector<vvdec::Barrier const*, std::__1::allocator<vvdec::Barrier const*> >&&, bool (*)(int, vvdec::Picture*))
Unexecuted instantiation: bool vvdec::ThreadPool::addBarrierTask<vvdec::SubPicExtTask>(bool (*)(int, vvdec::SubPicExtTask*), vvdec::SubPicExtTask*, vvdec::WaitCounter*, vvdec::Barrier*, std::__1::vector<vvdec::Barrier const*, std::__1::allocator<vvdec::Barrier const*> >&&, bool (*)(int, vvdec::SubPicExtTask*))
Unexecuted instantiation: bool vvdec::ThreadPool::addBarrierTask<vvdec::CtuTaskParam>(bool (*)(int, vvdec::CtuTaskParam*), vvdec::CtuTaskParam*, vvdec::WaitCounter*, vvdec::Barrier*, std::__1::vector<vvdec::Barrier const*, std::__1::allocator<vvdec::Barrier const*> >&&, bool (*)(int, vvdec::CtuTaskParam*))
Unexecuted instantiation: bool vvdec::ThreadPool::addBarrierTask<vvdec::FinishPicTaskParam>(bool (*)(int, vvdec::FinishPicTaskParam*), vvdec::FinishPicTaskParam*, vvdec::WaitCounter*, vvdec::Barrier*, std::__1::vector<vvdec::Barrier const*, std::__1::allocator<vvdec::Barrier const*> >&&, bool (*)(int, vvdec::FinishPicTaskParam*))
496
497
  bool processTasksOnMainThread();
498
  void checkAndThrowThreadPoolException();
499
500
  void shutdown( bool block );
501
  void waitForThreads();
502
503
0
  int numThreads() const { return (int)m_threads.size(); }
504
505
private:
506
  using TaskIterator = ChunkedTaskQueue::Iterator;
507
  struct TaskException;
508
509
  // members
510
  std::string              m_poolName;
511
  std::atomic_bool         m_exitThreads{ false };
512
  std::vector<std::thread> m_threads;
513
  ChunkedTaskQueue         m_tasks;
514
  TaskIterator             m_nextFillSlot = m_tasks.begin();
515
#if THREAD_POOL_ADD_TASK_THREAD_SAFE
516
  std::mutex               m_nextFillSlotMutex;
517
#endif
518
  std::mutex               m_idleMutex;
519
  PoolPause                m_poolPause;
520
521
  std::atomic_bool         m_exceptionFlag{ false };
522
  std::exception_ptr       m_threadPoolException;
523
524
525
  // internal functions
526
  void         threadProc     ( int threadId );
527
  static bool  checkTaskReady ( int threadId, CBarrierVec& barriers, TaskFunc readyCheck, void* taskParam );
528
  TaskIterator findNextTask   ( int threadId, TaskIterator startSearch );
529
  static bool  processTask    ( int threadId, Slot& task );
530
  bool         bypassTaskQueue( TaskFunc func, void* param, WaitCounter* counter, Barrier* done, CBarrierVec& barriers, TaskFunc readyCheck );
531
  static void  handleTaskException( const std::exception_ptr e, Barrier* done, WaitCounter* counter, std::atomic<TaskState>* slot_state );
532
#if THREAD_POOL_TASK_NAMES
533
  void         printWaitingTasks();
534
#endif
535
};
536
537
}   // namespace vvdec