Coverage Report

Created: 2026-06-10 07:00

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/vvenc/source/Lib/Utilities/NoMallocThreadPool.cpp
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2019-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVenC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
44
/** \file     NoMallocThreadPool.cpp
45
    \brief    thread pool
46
*/
47
48
#include "NoMallocThreadPool.h"
49
50
#ifdef HAVE_PTHREADS
51
#  include <pthread.h>
52
4.44k
#  define THREAD_MIN_STACK_SIZE 1024 * 1024
53
#endif
54
55
56
//! \ingroup Utilities
57
//! \{
58
59
namespace vvenc {
60
61
#if ENABLE_TIME_PROFILING_MT_MODE
62
thread_local std::unique_ptr<TProfiler> ptls;
63
#endif
64
65
NoMallocThreadPool::NoMallocThreadPool( int numThreads, const char * threadPoolName, const VVEncCfg* encCfg )
66
1.11k
  : m_poolName( threadPoolName )
67
1.11k
{
68
1.11k
  if( numThreads < 0 )
69
0
  {
70
0
    numThreads = std::thread::hardware_concurrency();
71
0
  }
72
73
5.55k
  for( int i = 0; i < numThreads; ++i )
74
4.44k
  {
75
4.44k
    m_threads.emplace_back( &NoMallocThreadPool::threadProc, this, i, *encCfg );
76
4.44k
  }
77
1.11k
}
78
79
NoMallocThreadPool::~NoMallocThreadPool()
80
1.11k
{
81
1.11k
  m_exitThreads = true;
82
83
1.11k
  waitForThreads();
84
1.11k
}
85
86
bool NoMallocThreadPool::processTasksOnMainThread()
87
0
{
88
0
  CHECK( m_threads.size() != 0, "should not be used with multiple threads" );
89
90
0
  bool         progress      = false;
91
0
  TaskIterator firstFailedIt = m_tasks.end();
92
0
  for( auto taskIt = findNextTask( 0, m_tasks.begin() ); taskIt.isValid(); taskIt = findNextTask( 0, taskIt ) )
93
0
  {
94
0
    const bool success = processTask( 0, *taskIt );
95
0
    progress |= success;
96
97
0
    if( taskIt == firstFailedIt )
98
0
    {
99
0
      if( success )
100
0
      {
101
        // first failed was successful -> reset
102
0
        firstFailedIt = m_tasks.end();
103
0
      }
104
0
      else if( progress )
105
0
      {
106
        // reset progress, try another round
107
0
        progress = false;
108
0
      }
109
0
      else
110
0
      {
111
        // no progress -> exit
112
0
        break;
113
0
      }
114
0
    }
115
0
    else if( !success && !firstFailedIt.isValid() )
116
0
    {
117
0
      firstFailedIt = taskIt;
118
0
    }
119
0
  }
120
121
  // return true if all done (-> false if some tasks blocked due to barriers)
122
0
  return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } );
123
0
}
124
125
void NoMallocThreadPool::shutdown( bool block )
126
1.11k
{
127
1.11k
  m_exitThreads = true;
128
1.11k
  if( block )
129
1.11k
  {
130
1.11k
    waitForThreads();
131
1.11k
  }
132
1.11k
}
133
134
void NoMallocThreadPool::waitForThreads()
135
2.22k
{
136
2.22k
  for( auto& t: m_threads )
137
8.88k
  {
138
8.88k
    if( t.joinable() )
139
4.44k
      t.join();
140
8.88k
  }
141
2.22k
}
142
143
void NoMallocThreadPool::threadProc( int threadId, const VVEncCfg& encCfg )
144
4.44k
{
145
4.44k
#if __linux
146
4.44k
  if( !m_poolName.empty() )
147
4.44k
  {
148
4.44k
    std::string threadName( m_poolName + std::to_string( threadId ) );
149
4.44k
    pthread_setname_np( pthread_self(), threadName.c_str() );
150
4.44k
  }
151
4.44k
#endif
152
#if ENABLE_TIME_PROFILING_MT_MODE
153
  ptls.reset( timeProfilerCreate( encCfg ) );
154
  {
155
    std::unique_lock< std::mutex > lock( m_nextFillSlotMutex );
156
    TProfiler *tp = ptls.get();
157
    profilers.push_back( tp );
158
  }
159
#endif
160
161
4.44k
  auto nextTaskIt = m_tasks.begin();
162
28.7k
  while( !m_exitThreads )
163
28.7k
  {
164
28.7k
    auto taskIt = findNextTask( threadId, nextTaskIt );
165
28.7k
    if( !taskIt.isValid() )
166
16.2k
    {
167
16.2k
      std::unique_lock<std::mutex> l( m_idleMutex, std::defer_lock );
168
169
16.2k
      ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait );
170
16.2k
      m_waitingThreads.fetch_add( 1, std::memory_order_relaxed );
171
16.2k
      const auto startWait = std::chrono::steady_clock::now();
172
79.6M
      while( !m_exitThreads )
173
79.6M
      {
174
79.6M
        taskIt = findNextTask( threadId, nextTaskIt );
175
79.6M
        if( taskIt.isValid() || m_exitThreads )
176
14.8k
        {
177
14.8k
          break;
178
14.8k
        }
179
180
79.6M
        if( !l.owns_lock()
181
724k
            && m_waitingThreads.load( std::memory_order_relaxed ) > 1
182
667k
            && ( BUSY_WAIT_TIME.count() == 0 || std::chrono::steady_clock::now() - startWait > BUSY_WAIT_TIME )
183
9.45k
            && !m_exitThreads )
184
9.44k
        {
185
9.44k
          ITT_TASKSTART(itt_domain_thrd, itt_handle_TPblocked);
186
9.44k
          l.lock();
187
9.44k
          ITT_TASKEND(itt_domain_thrd, itt_handle_TPblocked);
188
9.44k
        }
189
79.6M
        else
190
79.6M
        {
191
79.6M
          std::this_thread::yield();
192
79.6M
        }
193
79.6M
      }
194
16.2k
      m_waitingThreads.fetch_sub( 1, std::memory_order_relaxed );
195
16.2k
      ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait );
196
16.2k
    }
197
28.7k
    if( m_exitThreads )
198
4.43k
    {
199
4.43k
      return;
200
4.43k
    }
201
202
24.3k
    processTask( threadId, *taskIt );
203
204
24.3k
    nextTaskIt = taskIt;
205
24.3k
    nextTaskIt.incWrap();
206
24.3k
  }
207
4.44k
}
208
209
NoMallocThreadPool::TaskIterator NoMallocThreadPool::findNextTask( int threadId, TaskIterator startSearch )
210
79.7M
{
211
79.7M
  if( !startSearch.isValid() )
212
0
  {
213
0
    startSearch = m_tasks.begin();
214
0
  }
215
79.7M
  bool first = true;
216
10.2G
  for( auto it = startSearch; it != startSearch || first; it.incWrap() )
217
10.1G
  {
218
#if ENABLE_VALGRIND_CODE
219
    MutexLock lock( m_extraMutex );
220
#endif
221
222
10.1G
    first = false;
223
224
10.1G
    Slot& t = *it;
225
10.1G
    auto expected = WAITING;
226
10.1G
    if( t.state.load( std::memory_order_relaxed ) == WAITING && t.state.compare_exchange_strong( expected, RUNNING ) )
227
124M
    {
228
124M
      if( !t.barriers.empty() )
229
36.1M
      {
230
36.1M
        if( std::any_of( t.barriers.cbegin(), t.barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) )
231
36.1M
        {
232
          // reschedule
233
36.1M
          t.state.store( WAITING );
234
36.1M
          continue;
235
36.1M
        }
236
1.11k
        t.barriers.clear();   // clear barriers, so we don't need to check them on the next try (we assume they won't get locked again)
237
1.11k
      }
238
88.5M
      if( t.readyCheck && t.readyCheck( threadId, t.param ) == false )
239
88.4M
      {
240
        // reschedule
241
88.4M
        t.state.store( WAITING );
242
88.4M
        continue;
243
88.4M
      }
244
245
42.2k
      return it;
246
88.5M
    }
247
10.1G
  }
248
79.6M
  return {};
249
79.7M
}
250
251
bool NoMallocThreadPool::processTask( int threadId, NoMallocThreadPool::Slot& task )
252
24.2k
{
253
24.2k
  const bool success = task.func( threadId, task.param );
254
#if ENABLE_VALGRIND_CODE
255
  MutexLock lock( m_extraMutex );
256
#endif
257
24.2k
  if( !success )
258
19.7k
  {
259
19.7k
    task.state = WAITING;
260
19.7k
    return false;
261
19.7k
  }
262
263
4.53k
  if( task.done != nullptr )
264
0
  {
265
0
    task.done->unlock();
266
0
  }
267
4.53k
  if( task.counter != nullptr )
268
3.46k
  {
269
3.46k
    --(*task.counter);
270
3.46k
  }
271
272
4.53k
  task.state = FREE;
273
274
4.53k
  return true;
275
24.2k
}
276
277
#ifdef HAVE_PTHREADS
278
279
template<class TFunc, class... TArgs>
280
NoMallocThreadPool::PThread::PThread( TFunc&& func, TArgs&&... args )
281
4.44k
{
282
4.44k
  using WrappedCall     = std::function<void()>;
283
4.44k
  std::unique_ptr<WrappedCall> call = std::make_unique<WrappedCall>( std::bind( func, args... ) );
284
285
4.44k
  using PThreadsStartFn = void* (*) ( void* );
286
4.44k
  PThreadsStartFn threadFn = []( void* p ) -> void*
287
4.44k
  {
288
4.44k
    std::unique_ptr<WrappedCall> call( static_cast<WrappedCall*>( p ) );
289
290
4.44k
    ( *call )();
291
292
4.44k
    return nullptr;
293
4.44k
  };
294
295
4.44k
  pthread_attr_t attr;
296
4.44k
  int ret = pthread_attr_init( &attr );
297
4.44k
  CHECK( ret != 0, "pthread_attr_init() failed" );
298
299
4.44k
  try
300
4.44k
  {
301
4.44k
    size_t currStackSize = 0;
302
4.44k
    ret = pthread_attr_getstacksize( &attr, &currStackSize );
303
4.44k
    CHECK( ret != 0, "pthread_attr_getstacksize() failed" );
304
305
4.44k
    if( currStackSize < THREAD_MIN_STACK_SIZE )
306
0
    {
307
0
      ret = pthread_attr_setstacksize( &attr, THREAD_MIN_STACK_SIZE );
308
0
      CHECK( ret != 0, "pthread_attr_setstacksize() failed" );
309
310
0
#  if defined( _DEBUG ) && !defined( __MINGW32__ ) && !defined( __MINGW64__ )
311
0
      ret = pthread_attr_setguardsize( &attr, 1024 * 1024 );   // set stack guard size to 1MB to more reliably deteck stack overflows
312
0
      CHECK( ret != 0, "pthread_attr_setguardsize() failed" );
313
0
#  endif
314
0
    }
315
4.44k
    m_joinable = 0 == pthread_create( &m_id, &attr, threadFn, call.get() );
316
4.44k
    CHECK( !m_joinable, "pthread_create() faild" );
317
318
4.44k
    call.release();   // will now be freed by the thread
319
320
4.44k
    pthread_attr_destroy( &attr );
321
4.44k
  }
322
4.44k
  catch( ... )
323
4.44k
  {
324
0
    pthread_attr_destroy( &attr );
325
0
    throw;
326
0
  }
327
4.44k
}
328
329
NoMallocThreadPool::PThread& NoMallocThreadPool::PThread::operator=( PThread&& other )
330
3.33k
{
331
3.33k
  m_id             = other.m_id;
332
3.33k
  m_joinable       = other.m_joinable;
333
3.33k
  other.m_id       = 0;
334
3.33k
  other.m_joinable = false;
335
3.33k
  return *this;
336
3.33k
}
337
338
void NoMallocThreadPool::PThread::join()
339
4.44k
{
340
4.44k
  if( m_joinable )
341
4.44k
  {
342
4.44k
    m_joinable = false;
343
4.44k
    pthread_join( m_id, nullptr );
344
4.44k
  }
345
4.44k
}
346
347
#endif   // HAVE_PTHREADS
348
349
} // namespace vvenc
350
351
//! \}
352