Coverage Report

Created: 2026-05-30 06:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/vvenc/source/Lib/Utilities/NoMallocThreadPool.h
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2019-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVenC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
/** \file     NoMallocThreadPool.h
43
    \brief    thread pool
44
*/
45
46
#pragma once
47
48
#include <thread>
49
#include <mutex>
50
#include <condition_variable>
51
#include <atomic>
52
#include <chrono>
53
#include <array>
54
55
#ifdef HAVE_PTHREADS
56
#include <pthread.h>
57
#endif
58
59
#include "CommonLib/CommonDef.h"
60
#if ENABLE_TIME_PROFILING_MT_MODE
61
#include "CommonLib/TimeProfiler.h"
62
#endif
63
64
//! \ingroup Utilities
65
//! \{
66
67
namespace vvenc {
68
69
#ifdef TRACE_ENABLE_ITT
70
static __itt_domain* itt_domain_thrd = __itt_domain_create( "Threading" );
71
72
static __itt_string_handle* itt_handle_TPspinWait = __itt_string_handle_create( "Spin_Wait" );
73
static __itt_string_handle* itt_handle_TPblocked  = __itt_string_handle_create( "Blocked" );
74
static __itt_string_handle* itt_handle_TPaddTask  = __itt_string_handle_create( "Add_Task" );
75
76
//static long itt_TP_blocked = 1;
77
78
#endif //TRACE_ENABLE_ITT
79
80
#if ENABLE_VALGRIND_CODE
81
typedef std::unique_lock< std::mutex > MutexLock;
82
#endif
83
84
// block threads after busy-waiting this long
85
120
const static auto BUSY_WAIT_TIME = [] {
86
120
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
120
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
120
  return std::chrono::milliseconds( 1 );
90
120
}();
vvencimpl.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
EncLib.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
NoMallocThreadPool.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
MCTF.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
EncGOP.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
EncPicture.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
EncSlice.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
EncAdaptiveLoopFilter.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
EncCu.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
IntraSearch.cpp:vvenc::$_0::operator()() const
Line
Count
Source
85
12
const static auto BUSY_WAIT_TIME = [] {
86
12
  const char *env = getenv( "BUSY_WAIT_TIME" );
87
12
  if( env )
88
0
    return std::chrono::milliseconds( atoi( env ) );
89
12
  return std::chrono::milliseconds( 1 );
90
12
}();
91
92
93
// enable this if tasks need to be added from mutliple threads
94
#define ADD_TASK_THREAD_SAFE 1
95
96
97
// ---------------------------------------------------------------------------
98
// Synchronization tools
99
// ---------------------------------------------------------------------------
100
101
struct Barrier
102
{
103
  void unlock()
104
1.29k
  {
105
1.29k
    m_lockState.store( false );
106
1.29k
  }
107
108
  void lock()
109
4.05k
  {
110
4.05k
    m_lockState.store( true );
111
4.05k
  }
112
113
  bool isBlocked() const
114
57.5M
  {
115
57.5M
    return m_lockState;
116
57.5M
  }
117
118
  Barrier()  = default;
119
  ~Barrier() = default;
120
5.19k
  explicit Barrier( bool locked ) : m_lockState( locked ) {}
121
122
  Barrier( const Barrier & ) = delete;
123
  Barrier( Barrier && )      = delete;
124
125
  Barrier& operator=( const Barrier & ) = delete;
126
  Barrier& operator=( Barrier && )      = delete;
127
128
private:
129
  std::atomic_bool m_lockState{ true };
130
};
131
132
struct BlockingBarrier
133
{
134
  void unlock()
135
0
  {
136
0
    std::unique_lock<std::mutex> l( m_lock );
137
0
    m_intBarrier.unlock();
138
0
    if( !m_intBarrier.isBlocked() )
139
0
    {
140
0
      m_cond.notify_all();
141
0
    }
142
0
  }
143
144
  void lock()
145
0
  {
146
0
    std::unique_lock<std::mutex> l( m_lock );
147
0
    m_intBarrier.lock();
148
0
  }
149
150
  bool isBlocked() const
151
0
  {
152
0
    return m_intBarrier.isBlocked();
153
0
  }
154
155
  void wait() const
156
0
  {
157
0
    BlockingBarrier* nonconst = const_cast<BlockingBarrier*>(this);
158
0
159
0
    std::unique_lock<std::mutex> l( nonconst->m_lock );
160
0
    nonconst->m_cond.wait( l, [this] { return !m_intBarrier.isBlocked(); } );
161
0
  }
162
163
  BlockingBarrier()  = default;
164
0
  ~BlockingBarrier() { std::unique_lock<std::mutex> l( m_lock ); } // ensure all threads have unlocked the mutex, when we start destruction
165
166
  BlockingBarrier( const BlockingBarrier& ) = delete;
167
  BlockingBarrier( BlockingBarrier&& )      = delete;
168
169
  BlockingBarrier& operator=( const BlockingBarrier& ) = delete;
170
  BlockingBarrier& operator=( BlockingBarrier&& ) = delete;
171
172
  // cast to const ref Barrier, so we can use it for thread pool tasks:
173
0
  operator const Barrier&() const { return m_intBarrier; }
174
175
private:
176
  Barrier                 m_intBarrier;
177
  std::condition_variable m_cond;
178
  std::mutex              m_lock;
179
};
180
181
struct WaitCounter
182
{
183
  int operator++()
184
4.05k
  {
185
4.05k
    std::unique_lock<std::mutex> l( m_lock );
186
4.05k
    done.lock();
187
4.05k
    return ++m_count;
188
4.05k
  }
189
190
  int operator--()
191
4.05k
  {
192
4.05k
    std::unique_lock<std::mutex> l( m_lock );
193
4.05k
    const unsigned int new_count = --m_count;
194
4.05k
    if( new_count == 0 )
195
1.29k
    {
196
1.29k
      m_cond.notify_all();
197
1.29k
      done.unlock();
198
1.29k
    }
199
4.05k
    l.unlock(); // unlock mutex after done-barrier to prevent race between barrier and counter
200
4.05k
    return new_count;
201
4.05k
  }
202
203
  bool isBlocked() const
204
0
  {
205
0
    return 0 != m_count;
206
0
  }
207
208
  void wait() const
209
0
  {
210
0
    WaitCounter* nonconst = const_cast<WaitCounter*>(this);
211
212
0
    std::unique_lock<std::mutex> l( nonconst->m_lock );
213
0
    nonconst->m_cond.wait( l, [this] { return m_count == 0; } );
214
0
  }
215
216
5.19k
  WaitCounter() = default;
217
5.19k
  ~WaitCounter() { std::unique_lock<std::mutex> l( m_lock ); }   // ensure all threads have unlocked the mutex, when we start destruction
218
219
  WaitCounter( const WaitCounter & ) = delete;
220
  WaitCounter( WaitCounter && )      = delete;
221
222
  WaitCounter &operator=( const WaitCounter & ) = delete;
223
  WaitCounter &operator=( WaitCounter && )      = delete;
224
225
  Barrier done{ false };
226
227
private:
228
  std::condition_variable m_cond;
229
  std::mutex              m_lock;
230
  unsigned int            m_count = 0;
231
};
232
233
234
235
// ---------------------------------------------------------------------------
236
// Thread Pool
237
// ---------------------------------------------------------------------------
238
239
using CBarrierVec = std::vector<const Barrier*>;
240
241
class NoMallocThreadPool
242
{
243
  typedef enum
244
  {
245
    FREE = 0,
246
    PREPARING,
247
    WAITING,
248
    RUNNING
249
  } TaskState;
250
251
  using TaskFunc = bool ( * )( int, void * );
252
253
  struct Slot
254
  {
255
    TaskFunc               func      { nullptr };
256
    TaskFunc               readyCheck{ nullptr };
257
    void*                  param     { nullptr };
258
    WaitCounter*           counter   { nullptr };
259
    Barrier*               done      { nullptr };
260
    CBarrierVec            barriers;
261
    std::atomic<TaskState> state     { FREE };
262
  };
263
264
265
  class ChunkedTaskQueue
266
  {
267
    constexpr static int ChunkSize = 128;
268
269
    class Chunk
270
    {
271
      std::array<Slot, ChunkSize> m_slots;
272
      std::atomic<Chunk*>         m_next{ nullptr };
273
      Chunk&                      m_firstChunk;
274
275
1.29k
      Chunk( Chunk* firstPtr ) : m_firstChunk{ *firstPtr } {}
276
277
      friend class ChunkedTaskQueue;
278
    };
279
280
  public:
281
    class Iterator
282
    {
283
      Slot*  m_slot  = nullptr;
284
      Chunk* m_chunk = nullptr;
285
286
    public:
287
116M
      Iterator() = default;
288
6.49k
      Iterator( Slot* slot, Chunk* chunk ) : m_slot( slot ), m_chunk( chunk ) {}
289
290
      Iterator& operator++()
291
0
      {
292
0
        CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
293
0
        CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
294
295
0
        if( m_slot != &m_chunk->m_slots.back() )
296
0
        {
297
0
          ++m_slot;
298
0
        }
299
0
        else
300
0
        {
301
0
          m_chunk = m_chunk->m_next;
302
0
          if( m_chunk )
303
0
          {
304
0
            m_slot  = &m_chunk->m_slots.front();
305
0
          }
306
0
          else
307
0
          {
308
0
            m_slot  = nullptr;
309
0
          }
310
0
        }
311
0
        return *this;
312
0
      }
313
314
      // increment iterator and wrap around, if end is reached
315
      Iterator& incWrap()
316
14.9G
      {
317
14.9G
        CHECKD( m_slot == nullptr, "incrementing invalid iterator" );
318
14.9G
        CHECKD( m_chunk == nullptr, "incrementing invalid iterator" );
319
320
14.9G
        if( m_slot != &m_chunk->m_slots.back() )
321
14.7G
        {
322
14.7G
          ++m_slot;
323
14.7G
        }
324
116M
        else
325
116M
        {
326
116M
          if( (Chunk*)m_chunk->m_next )
327
0
          {
328
0
            m_chunk = m_chunk->m_next;
329
0
          }
330
116M
          else
331
116M
          {
332
116M
            m_chunk = &m_chunk->m_firstChunk;
333
116M
          }
334
116M
          m_slot = &m_chunk->m_slots.front();
335
116M
        }
336
14.9G
        return *this;
337
14.9G
      }
338
339
0
      bool operator==( const Iterator& rhs ) const { return m_slot == rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
340
15.0G
      bool operator!=( const Iterator& rhs ) const { return m_slot != rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer
341
342
14.9G
      Slot& operator*() { return *m_slot; }
343
344
233M
      bool isValid() const { return m_slot != nullptr && m_chunk != nullptr; }
345
346
      using iterator_category = std::forward_iterator_tag;
347
      using value_type        = Slot;
348
      using pointer           = Slot*;
349
      using reference         = Slot&;
350
      using difference_type   = ptrdiff_t;
351
    };
352
353
1.29k
    ChunkedTaskQueue() = default;
354
    ~ChunkedTaskQueue()
355
1.29k
    {
356
1.29k
      Chunk* next = m_firstChunk.m_next;
357
1.29k
      while( next )
358
0
      {
359
0
        Chunk* curr = next;
360
0
        next = curr->m_next;
361
0
        delete curr;
362
0
      }
363
1.29k
    }
364
365
    ChunkedTaskQueue( const ChunkedTaskQueue& ) = delete;
366
    ChunkedTaskQueue( ChunkedTaskQueue&& )      = delete;
367
368
    // grow the queue by adding a chunk and return an iterator to the first new task-slot
369
    Iterator grow()
370
0
    {
371
0
      std::unique_lock<std::mutex> l( m_resizeMutex );  // prevent concurrent growth of the queue. Read access while growing is no problem
372
373
0
      m_lastChunk->m_next = new Chunk( &m_firstChunk );
374
0
      m_lastChunk         = m_lastChunk->m_next;
375
376
0
      return Iterator{ &m_lastChunk->m_slots.front(), m_lastChunk };
377
0
    }
378
379
6.49k
    Iterator begin() { return Iterator{ &m_firstChunk.m_slots.front(), &m_firstChunk }; }
380
0
    Iterator end()   { return Iterator{ nullptr, nullptr }; }
381
382
  private:
383
    Chunk  m_firstChunk{ &m_firstChunk };
384
    Chunk* m_lastChunk = &m_firstChunk;
385
386
    std::mutex m_resizeMutex;
387
  };
388
389
390
public:
391
  NoMallocThreadPool( int numThreads = 1, const char *threadPoolName = nullptr, const VVEncCfg* encCfg = nullptr );
392
  ~NoMallocThreadPool();
393
394
  bool addBarrierTask( bool             ( *func )( int, void* ),
395
                       void*               param,
396
                       WaitCounter*        counter                    = nullptr,
397
                       Barrier*            done                       = nullptr,
398
                       const CBarrierVec&& barriers                   = {},
399
                       bool             ( *readyCheck )( int, void* ) = nullptr )
400
5.34k
  {
401
5.34k
    if( m_threads.empty() )
402
0
    {
403
      // if singlethreaded, execute all pending tasks
404
0
      if( m_nextFillSlot != m_tasks.begin() )
405
0
      {
406
0
        processTasksOnMainThread();
407
0
      }
408
409
      // when no barriers block this task, execute it directly
410
0
      if( std::none_of( barriers.begin(), barriers.end(), []( const Barrier* b ) { return b && b->isBlocked(); } )
411
0
          && ( !readyCheck || readyCheck( 0, param ) ) )
412
0
      {
413
0
        if( func( 0, param ) )
414
0
        {
415
0
          if( done != nullptr )
416
0
          {
417
0
            done->unlock();
418
0
          }
419
0
          return true;
420
0
        }
421
0
      }
422
0
    }
423
424
5.34k
    while( true )
425
5.34k
    {
426
5.34k
#if ADD_TASK_THREAD_SAFE
427
5.34k
      std::unique_lock<std::mutex> l(m_nextFillSlotMutex);
428
5.34k
#endif
429
5.34k
      CHECKD( !m_nextFillSlot.isValid(), "Next fill slot iterator should always be valid" );
430
5.34k
      const auto startIt = m_nextFillSlot;
431
432
5.34k
#if ADD_TASK_THREAD_SAFE
433
5.34k
      l.unlock();
434
5.34k
#endif
435
436
5.34k
      bool first = true;
437
5.34k
      for( auto it = startIt; it != startIt || first; it.incWrap() )
438
5.34k
      {
439
#if ENABLE_VALGRIND_CODE
440
        MutexLock lock( m_extraMutex );
441
#endif
442
443
5.34k
        first = false;
444
445
5.34k
        auto& t = *it;
446
5.34k
        auto expected = FREE;
447
5.34k
        if( t.state.load( std::memory_order_relaxed ) == FREE && t.state.compare_exchange_strong( expected, PREPARING ) )
448
5.34k
        {
449
5.34k
          if( counter )
450
4.05k
          {
451
4.05k
            counter->operator++();
452
4.05k
          }
453
454
5.34k
          t.func       = func;
455
5.34k
          t.readyCheck = readyCheck;
456
5.34k
          t.param      = param;
457
5.34k
          t.done       = done;
458
5.34k
          t.counter    = counter;
459
5.34k
          t.barriers   = std::move( barriers );
460
5.34k
          t.state      = WAITING;
461
462
5.34k
#if ADD_TASK_THREAD_SAFE
463
5.34k
          l.lock();
464
5.34k
#endif
465
5.34k
          m_nextFillSlot.incWrap();
466
5.34k
          return true;
467
5.34k
        }
468
5.34k
      }
469
470
0
#if ADD_TASK_THREAD_SAFE
471
0
      l.lock();
472
0
#endif
473
0
      m_nextFillSlot = m_tasks.grow();
474
0
    }
475
0
    return false;
476
5.34k
  }
477
478
  bool processTasksOnMainThread();
479
480
  void shutdown( bool block );
481
  void waitForThreads();
482
483
5.19k
  int numThreads() const { return (int)m_threads.size(); }
484
#if ENABLE_TIME_PROFILING_MT_MODE
485
  const std::vector< TProfiler* >& getProfilers() { return profilers; }
486
#endif
487
488
private:
489
#ifdef HAVE_PTHREADS
490
  struct PThread
491
  {
492
    PThread()                            = default;
493
    ~PThread()                           = default;
494
495
    PThread( const PThread& )            = delete;
496
    PThread& operator=( const PThread& ) = delete;
497
498
3.89k
    PThread( PThread&& other ) { *this = std::move( other ); };
499
    PThread& operator=( PThread&& other );
500
501
    template<class TFunc, class... TArgs>
502
    PThread( TFunc&& func, TArgs&&... args );
503
504
10.3k
    bool joinable() { return m_joinable; }
505
    void join();
506
507
  private:
508
    pthread_t m_id       = 0;
509
    bool      m_joinable = false;
510
  };
511
#endif   // HAVE_PTHREADS
512
513
#if HAVE_PTHREADS
514
  using ThreadImpl = PThread;
515
#else
516
  using ThreadImpl = std::thread;
517
#endif
518
519
  using TaskIterator = ChunkedTaskQueue::Iterator;
520
521
  // members
522
  std::string              m_poolName;
523
  std::atomic_bool         m_exitThreads{ false };
524
  std::vector<ThreadImpl>  m_threads;
525
  ChunkedTaskQueue         m_tasks;
526
  TaskIterator             m_nextFillSlot = m_tasks.begin();
527
#if ADD_TASK_THREAD_SAFE
528
  std::mutex               m_nextFillSlotMutex;
529
#endif
530
  std::mutex               m_idleMutex;
531
  std::atomic_uint         m_waitingThreads{ 0 };
532
#if ENABLE_VALGRIND_CODE
533
  std::mutex               m_extraMutex;
534
#endif
535
#if ENABLE_TIME_PROFILING_MT_MODE
536
  std::vector< TProfiler* > profilers;
537
#endif
538
539
  // internal functions
540
  void         threadProc  ( int threadId, const VVEncCfg& encCfg );
541
  TaskIterator findNextTask( int threadId, TaskIterator startSearch );
542
  bool         processTask ( int threadId, Slot& task );
543
};
544
545
} // namespace vvenc
546
547
//! \}
548