Coverage Report

Created: 2026-06-16 07:20

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/vvenc/source/Lib/Utilities/NoMallocThreadPool.h
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2019-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVenC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
/** \file     NoMallocThreadPool.h
43
    \brief    thread pool
44
*/
45
46
#pragma once
47
48
#include <thread>
49
#include <mutex>
50
#include <condition_variable>
51
#include <atomic>
52
#include <chrono>
53
#include <array>
54
55
#ifdef HAVE_PTHREADS
56
#include <pthread.h>
57
#endif
58
59
#include "CommonLib/CommonDef.h"
60
#if ENABLE_TIME_PROFILING_MT_MODE
61
#include "CommonLib/TimeProfiler.h"
62
#endif
63
64
//! \ingroup Utilities
65
//! \{
66
67
namespace vvenc {
68
69
#ifdef TRACE_ENABLE_ITT
70
static __itt_domain* itt_domain_thrd = __itt_domain_create( "Threading" );
71
72
static __itt_string_handle* itt_handle_TPspinWait = __itt_string_handle_create( "Spin_Wait" );
73
static __itt_string_handle* itt_handle_TPblocked  = __itt_string_handle_create( "Blocked" );
74
static __itt_string_handle* itt_handle_TPaddTask  = __itt_string_handle_create( "Add_Task" );
75
76
//static long itt_TP_blocked = 1;
77
78
#endif //TRACE_ENABLE_ITT
79
80
#if ENABLE_VALGRIND_CODE
81
typedef std::unique_lock< std::mutex > MutexLock;
82
#endif
83
84
// block threads after busy-waiting this long
85
2.54k
const static auto BUSY_WAIT_TIME = [] {
86
2.54k
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
2.54k
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
2.54k
  return std::chrono::milliseconds( 1 );
90
2.54k
}();
vvencimpl.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
EncLib.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
NoMallocThreadPool.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
MCTF.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
EncGOP.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
EncPicture.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
EncSlice.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
EncAdaptiveLoopFilter.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
EncCu.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
IntraSearch.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
254
const static auto BUSY_WAIT_TIME = [] {
86
254
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
254
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
254
  return std::chrono::milliseconds( 1 );
90
254
}();
91
92
93
// enable this if tasks need to be added from mutliple threads
94
#define ADD_TASK_THREAD_SAFE 1
95
96
97
// ---------------------------------------------------------------------------
98
// Synchronization tools
99
// ---------------------------------------------------------------------------
100
101
struct Barrier
102
{
103
  void unlock()
104
0
  {
105
0
    m_lockState.store( false );
106
0
  }
107
108
  void lock()
109
0
  {
110
0
    m_lockState.store( true );
111
0
  }
112
113
  bool isBlocked() const
114
0
  {
115
0
    return m_lockState;
116
0
  }
117
118
  Barrier()  = default;
119
  ~Barrier() = default;
120
0
  explicit Barrier( bool locked ) : m_lockState( locked ) {}
121
122
  Barrier( const Barrier & ) = delete;
123
  Barrier( Barrier && )      = delete;
124
125
  Barrier& operator=( const Barrier & ) = delete;
126
  Barrier& operator=( Barrier && )      = delete;
127
128
private:
129
  std::atomic_bool m_lockState{ true };
130
};
131
132
struct BlockingBarrier
133
{
134
  void unlock()
135
0
  {
136
0
    std::unique_lock<std::mutex> l( m_lock );
137
0
    m_intBarrier.unlock();
138
0
    if( !m_intBarrier.isBlocked() )
139
0
    {
140
0
      m_cond.notify_all();
141
0
    }
142
0
  }
143
144
  void lock()
145
0
  {
146
0
    std::unique_lock<std::mutex> l( m_lock );
147
0
    m_intBarrier.lock();
148
0
  }
149
150
  bool isBlocked() const
151
0
  {
152
0
    return m_intBarrier.isBlocked();
153
0
  }
154
155
  void wait() const
156
0
  {
157
0
    BlockingBarrier* nonconst = const_cast<BlockingBarrier*>(this);
158
0
159
0
    std::unique_lock<std::mutex> l( nonconst->m_lock );
160
0
    nonconst->m_cond.wait( l, [this] { return !m_intBarrier.isBlocked(); } );
161
0
  }
162
163
  BlockingBarrier()  = default;
164
0
  ~BlockingBarrier() { std::unique_lock<std::mutex> l( m_lock ); } // ensure all threads have unlocked the mutex, when we start destruction
165
166
  BlockingBarrier( const BlockingBarrier& ) = delete;
167
  BlockingBarrier( BlockingBarrier&& )      = delete;
168
169
  BlockingBarrier& operator=( const BlockingBarrier& ) = delete;
170
  BlockingBarrier& operator=( BlockingBarrier&& ) = delete;
171
172
  // cast to const ref Barrier, so we can use it for thread pool tasks:
173
0
  operator const Barrier&() const { return m_intBarrier; }
174
175
private:
176
  Barrier                 m_intBarrier;
177
  std::condition_variable m_cond;
178
  std::mutex              m_lock;
179
};
180
181
struct WaitCounter
182
{
183
  int operator++()
184
0
  {
185
0
    std::unique_lock<std::mutex> l( m_lock );
186
0
    done.lock();
187
0
    return ++m_count;
188
0
  }
189
190
  int operator--()
191
0
  {
192
0
    std::unique_lock<std::mutex> l( m_lock );
193
0
    const unsigned int new_count = --m_count;
194
0
    if( new_count == 0 )
195
0
    {
196
0
      m_cond.notify_all();
197
0
      done.unlock();
198
0
    }
199
0
    l.unlock(); // unlock mutex after done-barrier to prevent race between barrier and counter
200
0
    return new_count;
201
0
  }
202
203
  bool isBlocked() const
204
0
  {
205
0
    return 0 != m_count;
206
0
  }
207
208
  void wait() const
209
0
  {
210
0
    WaitCounter* nonconst = const_cast<WaitCounter*>(this);
211
212
0
    std::unique_lock<std::mutex> l( nonconst->m_lock );
213
0
    nonconst->m_cond.wait( l, [this] { return m_count == 0; } );
214
0
  }
215
216
0
  WaitCounter() = default;
217
0
  ~WaitCounter() { std::unique_lock<std::mutex> l( m_lock ); }   // ensure all threads have unlocked the mutex, when we start destruction
218
219
  WaitCounter( const WaitCounter & ) = delete;
220
  WaitCounter( WaitCounter && )      = delete;
221
222
  WaitCounter &operator=( const WaitCounter & ) = delete;
223
  WaitCounter &operator=( WaitCounter && )      = delete;
224
225
  Barrier done{ false };
226
227
private:
228
  std::condition_variable m_cond;
229
  std::mutex              m_lock;
230
  unsigned int            m_count = 0;
231
};
232
233
234
235
// ---------------------------------------------------------------------------
236
// Thread Pool
237
// ---------------------------------------------------------------------------
238
239
using CBarrierVec = std::vector<const Barrier*>;
240
241
class NoMallocThreadPool
242
{
243
  typedef enum
244
  {
245
    FREE = 0,
246
    PREPARING,
247
    WAITING,
248
    RUNNING
249
  } TaskState;
250
251
  using TaskFunc = bool ( * )( int, void * );
252
253
  struct Slot
254
  {
255
    TaskFunc               func      { nullptr };
256
    TaskFunc               readyCheck{ nullptr };
257
    void*                  param     { nullptr };
258
    WaitCounter*           counter   { nullptr };
259
    Barrier*               done      { nullptr };
260
    CBarrierVec            barriers;
261
    std::atomic<TaskState> state     { FREE };
262
  };
263
264
265
  class ChunkedTaskQueue
266
  {
267
    constexpr static int ChunkSize = 128;
268
269
    class Chunk
270
    {
271
      std::array<Slot, ChunkSize> m_slots;
272
      std::atomic<Chunk*>         m_next{ nullptr };
273
      Chunk&                      m_firstChunk;
274
275
0
      Chunk( Chunk* firstPtr ) : m_firstChunk{ *firstPtr } {}
276
277
      friend class ChunkedTaskQueue;
278
    };
279
280
  public:
281
    class Iterator
282
    {
283
      Slot*  m_slot  = nullptr;
284
      Chunk* m_chunk = nullptr;
285
286
    public:
287
0
      Iterator() = default;
288
0
      Iterator( Slot* slot, Chunk* chunk ) : m_slot( slot ), m_chunk( chunk ) {}
289
290
      Iterator& operator++()
291
0
      {
292
0
        CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
293
0
        CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
294
295
0
        if( m_slot != &m_chunk->m_slots.back() )
296
0
        {
297
0
          ++m_slot;
298
0
        }
299
0
        else
300
0
        {
301
0
          m_chunk = m_chunk->m_next;
302
0
          if( m_chunk )
303
0
          {
304
0
            m_slot  = &m_chunk->m_slots.front();
305
0
          }
306
0
          else
307
0
          {
308
0
            m_slot  = nullptr;
309
0
          }
310
0
        }
311
0
        return *this;
312
0
      }
313
314
      // increment iterator and wrap around, if end is reached
315
      Iterator& incWrap()
316
0
      {
317
0
        CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
318
0
        CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
319
320
0
        if( m_slot != &m_chunk->m_slots.back() )
321
0
        {
322
0
          ++m_slot;
323
0
        }
324
0
        else
325
0
        {
326
0
          if( (Chunk*)m_chunk->m_next )
327
0
          {
328
0
            m_chunk = m_chunk->m_next;
329
0
          }
330
0
          else
331
0
          {
332
0
            m_chunk = &m_chunk->m_firstChunk;
333
0
          }
334
0
          m_slot = &m_chunk->m_slots.front();
335
0
        }
336
0
        return *this;
337
0
      }
338
339
0
      bool operator==( const Iterator& rhs ) const { return m_slot == rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
340
0
      bool operator!=( const Iterator& rhs ) const { return m_slot != rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
341
342
0
      Slot& operator*() { return *m_slot; }
343
344
0
      bool isValid() const { return m_slot != nullptr && m_chunk != nullptr; }
345
346
      using iterator_category = std::forward_iterator_tag;
347
      using value_type        = Slot;
348
      using pointer           = Slot*;
349
      using reference         = Slot&;
350
      using difference_type   = ptrdiff_t;
351
    };
352
353
0
    ChunkedTaskQueue() = default;
354
    ~ChunkedTaskQueue()
355
0
    {
356
0
      Chunk* next = m_firstChunk.m_next;
357
0
      while( next )
358
0
      {
359
0
        Chunk* curr = next;
360
0
        next = curr->m_next;
361
0
        delete curr;
362
0
      }
363
0
    }
364
365
    ChunkedTaskQueue( const ChunkedTaskQueue& ) = delete;
366
    ChunkedTaskQueue( ChunkedTaskQueue&& )      = delete;
367
368
    // grow the queue by adding a chunk and return an iterator to the first new task-slot
369
    Iterator grow()
370
0
    {
371
0
      std::unique_lock<std::mutex> l( m_resizeMutex );  // prevent concurrent growth of the queue. Read access while growing is no problem
372
373
0
      m_lastChunk->m_next = new Chunk( &m_firstChunk );
374
0
      m_lastChunk         = m_lastChunk->m_next;
375
376
0
      return Iterator{ &m_lastChunk->m_slots.front(), m_lastChunk };
377
0
    }
378
379
0
    Iterator begin() { return Iterator{ &m_firstChunk.m_slots.front(), &m_firstChunk }; }
380
0
    Iterator end()   { return Iterator{ nullptr, nullptr }; }
381
382
  private:
383
    Chunk  m_firstChunk{ &m_firstChunk };
384
    Chunk* m_lastChunk = &m_firstChunk;
385
386
    std::mutex m_resizeMutex;
387
  };
388
389
390
public:
391
  NoMallocThreadPool( int numThreads = 1, const char *threadPoolName = nullptr, const VVEncCfg* encCfg = nullptr );
392
  ~NoMallocThreadPool();
393
394
  bool addBarrierTask( bool             ( *func )( int, void* ),
395
                       void*               param,
396
                       WaitCounter*        counter                    = nullptr,
397
                       Barrier*            done                       = nullptr,
398
                       const CBarrierVec&& barriers                   = {},
399
                       bool             ( *readyCheck )( int, void* ) = nullptr )
400
0
  {
401
0
    if( m_threads.empty() )
402
0
    {
403
      // if singlethreaded, execute all pending tasks
404
0
      if( m_nextFillSlot != m_tasks.begin() )
405
0
      {
406
0
        processTasksOnMainThread();
407
0
      }
408
409
      // when no barriers block this task, execute it directly
410
0
      if( std::none_of( barriers.begin(), barriers.end(), []( const Barrier* b ) { return b && b->isBlocked(); } )
411
0
          && ( !readyCheck || readyCheck( 0, param ) ) )
412
0
      {
413
0
        if( func( 0, param ) )
414
0
        {
415
0
          if( done != nullptr )
416
0
          {
417
0
            done->unlock();
418
0
          }
419
0
          return true;
420
0
        }
421
0
      }
422
0
    }
423
424
0
    while( true )
425
0
    {
426
0
#if ADD_TASK_THREAD_SAFE
427
0
      std::unique_lock<std::mutex> l(m_nextFillSlotMutex);
428
0
#endif
429
0
      CHECKD( !m_nextFillSlot.isValid(), "Next fill slot iterator should always be valid" );
430
0
      const auto startIt = m_nextFillSlot;
431
432
0
#if ADD_TASK_THREAD_SAFE
433
0
      l.unlock();
434
0
#endif
435
436
0
      bool first = true;
437
0
      for( auto it = startIt; it != startIt || first; it.incWrap() )
438
0
      {
439
#if ENABLE_VALGRIND_CODE
440
        MutexLock lock( m_extraMutex );
441
#endif
442
443
0
        first = false;
444
445
0
        auto& t = *it;
446
0
        auto expected = FREE;
447
0
        if( t.state.load( std::memory_order_relaxed ) == FREE && t.state.compare_exchange_strong( expected, PREPARING ) )
448
0
        {
449
0
          if( counter )
450
0
          {
451
0
            counter->operator++();
452
0
          }
453
454
0
          t.func       = func;
455
0
          t.readyCheck = readyCheck;
456
0
          t.param      = param;
457
0
          t.done       = done;
458
0
          t.counter    = counter;
459
0
          t.barriers   = std::move( barriers );
460
0
          t.state      = WAITING;
461
462
0
#if ADD_TASK_THREAD_SAFE
463
0
          l.lock();
464
0
#endif
465
0
          m_nextFillSlot.incWrap();
466
0
          return true;
467
0
        }
468
0
      }
469
470
0
#if ADD_TASK_THREAD_SAFE
471
0
      l.lock();
472
0
#endif
473
0
      m_nextFillSlot = m_tasks.grow();
474
0
    }
475
0
    return false;
476
0
  }
477
478
  bool processTasksOnMainThread();
479
480
  void shutdown( bool block );
481
  void waitForThreads();
482
483
0
  int numThreads() const { return (int)m_threads.size(); }
484
#if ENABLE_TIME_PROFILING_MT_MODE
485
  const std::vector< TProfiler* >& getProfilers() { return profilers; }
486
#endif
487
488
private:
489
#ifdef HAVE_PTHREADS
490
  struct PThread
491
  {
492
    PThread()                            = default;
493
    ~PThread()                           = default;
494
495
    PThread( const PThread& )            = delete;
496
    PThread& operator=( const PThread& ) = delete;
497
498
0
    PThread( PThread&& other ) { *this = std::move( other ); };
499
    PThread& operator=( PThread&& other );
500
501
    template<class TFunc, class... TArgs>
502
    PThread( TFunc&& func, TArgs&&... args );
503
504
0
    bool joinable() { return m_joinable; }
505
    void join();
506
507
  private:
508
    pthread_t m_id       = 0;
509
    bool      m_joinable = false;
510
  };
511
#endif   // HAVE_PTHREADS
512
513
#if HAVE_PTHREADS
514
  using ThreadImpl = PThread;
515
#else
516
  using ThreadImpl = std::thread;
517
#endif
518
519
  using TaskIterator = ChunkedTaskQueue::Iterator;
520
521
  // members
522
  std::string              m_poolName;
523
  std::atomic_bool         m_exitThreads{ false };
524
  std::vector<ThreadImpl>  m_threads;
525
  ChunkedTaskQueue         m_tasks;
526
  TaskIterator             m_nextFillSlot = m_tasks.begin();
527
#if ADD_TASK_THREAD_SAFE
528
  std::mutex               m_nextFillSlotMutex;
529
#endif
530
  std::mutex               m_idleMutex;
531
  std::atomic_uint         m_waitingThreads{ 0 };
532
#if ENABLE_VALGRIND_CODE
533
  std::mutex               m_extraMutex;
534
#endif
535
#if ENABLE_TIME_PROFILING_MT_MODE
536
  std::vector< TProfiler* > profilers;
537
#endif
538
539
  // internal functions
540
  void         threadProc  ( int threadId, const VVEncCfg& encCfg );
541
  TaskIterator findNextTask( int threadId, TaskIterator startSearch );
542
  bool         processTask ( int threadId, Slot& task );
543
};
544
545
} // namespace vvenc
546
547
//! \}
548