Coverage Report

Created: 2026-06-15 06:25

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/vvenc/source/Lib/Utilities/NoMallocThreadPool.cpp
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2019-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVenC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
44
/** \file     NoMallocThreadPool.cpp
45
    \brief    thread pool
46
*/
47
48
#include "NoMallocThreadPool.h"
49
50
#ifdef HAVE_PTHREADS
51
#  include <pthread.h>
52
4.34k
#  define THREAD_MIN_STACK_SIZE 1024 * 1024
53
#endif
54
55
56
//! \ingroup Utilities
57
//! \{
58
59
namespace vvenc {
60
61
#if ENABLE_TIME_PROFILING_MT_MODE
62
thread_local std::unique_ptr<TProfiler> ptls;
63
#endif
64
65
NoMallocThreadPool::NoMallocThreadPool( int numThreads, const char * threadPoolName, const VVEncCfg* encCfg )
66
1.08k
  : m_poolName( threadPoolName )
67
1.08k
{
68
1.08k
  if( numThreads < 0 )
69
0
  {
70
0
    numThreads = std::thread::hardware_concurrency();
71
0
  }
72
73
5.43k
  for( int i = 0; i < numThreads; ++i )
74
4.34k
  {
75
4.34k
    m_threads.emplace_back( &NoMallocThreadPool::threadProc, this, i, *encCfg );
76
4.34k
  }
77
1.08k
}
78
79
NoMallocThreadPool::~NoMallocThreadPool()
80
1.08k
{
81
1.08k
  m_exitThreads = true;
82
83
1.08k
  waitForThreads();
84
1.08k
}
85
86
bool NoMallocThreadPool::processTasksOnMainThread()
87
0
{
88
0
  CHECK( m_threads.size() != 0, "should not be used with multiple threads" );
89
90
0
  bool         progress      = false;
91
0
  TaskIterator firstFailedIt = m_tasks.end();
92
0
  for( auto taskIt = findNextTask( 0, m_tasks.begin() ); taskIt.isValid(); taskIt = findNextTask( 0, taskIt ) )
93
0
  {
94
0
    const bool success = processTask( 0, *taskIt );
95
0
    progress |= success;
96
97
0
    if( taskIt == firstFailedIt )
98
0
    {
99
0
      if( success )
100
0
      {
101
        // first failed was successful -> reset
102
0
        firstFailedIt = m_tasks.end();
103
0
      }
104
0
      else if( progress )
105
0
      {
106
        // reset progress, try another round
107
0
        progress = false;
108
0
      }
109
0
      else
110
0
      {
111
        // no progress -> exit
112
0
        break;
113
0
      }
114
0
    }
115
0
    else if( !success && !firstFailedIt.isValid() )
116
0
    {
117
0
      firstFailedIt = taskIt;
118
0
    }
119
0
  }
120
121
  // return true if all done (-> false if some tasks blocked due to barriers)
122
0
  return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } );
123
0
}
124
125
void NoMallocThreadPool::shutdown( bool block )
126
1.08k
{
127
1.08k
  m_exitThreads = true;
128
1.08k
  if( block )
129
1.08k
  {
130
1.08k
    waitForThreads();
131
1.08k
  }
132
1.08k
}
133
134
void NoMallocThreadPool::waitForThreads()
135
2.17k
{
136
2.17k
  for( auto& t: m_threads )
137
8.68k
  {
138
8.68k
    if( t.joinable() )
139
4.34k
      t.join();
140
8.68k
  }
141
2.17k
}
142
143
void NoMallocThreadPool::threadProc( int threadId, const VVEncCfg& encCfg )
144
4.34k
{
145
4.34k
#if __linux
146
4.34k
  if( !m_poolName.empty() )
147
4.34k
  {
148
4.34k
    std::string threadName( m_poolName + std::to_string( threadId ) );
149
4.34k
    pthread_setname_np( pthread_self(), threadName.c_str() );
150
4.34k
  }
151
4.34k
#endif
152
#if ENABLE_TIME_PROFILING_MT_MODE
153
  ptls.reset( timeProfilerCreate( encCfg ) );
154
  {
155
    std::unique_lock< std::mutex > lock( m_nextFillSlotMutex );
156
    TProfiler *tp = ptls.get();
157
    profilers.push_back( tp );
158
  }
159
#endif
160
161
4.34k
  auto nextTaskIt = m_tasks.begin();
162
27.8k
  while( !m_exitThreads )
163
27.8k
  {
164
27.8k
    auto taskIt = findNextTask( threadId, nextTaskIt );
165
27.8k
    if( !taskIt.isValid() )
166
15.7k
    {
167
15.7k
      std::unique_lock<std::mutex> l( m_idleMutex, std::defer_lock );
168
169
15.7k
      ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait );
170
15.7k
      m_waitingThreads.fetch_add( 1, std::memory_order_relaxed );
171
15.7k
      const auto startWait = std::chrono::steady_clock::now();
172
93.8M
      while( !m_exitThreads )
173
93.8M
      {
174
93.8M
        taskIt = findNextTask( threadId, nextTaskIt );
175
93.8M
        if( taskIt.isValid() || m_exitThreads )
176
15.1k
        {
177
15.1k
          break;
178
15.1k
        }
179
180
93.8M
        if( !l.owns_lock()
181
1.32M
            && m_waitingThreads.load( std::memory_order_relaxed ) > 1
182
1.20M
            && ( BUSY_WAIT_TIME.count() == 0 || std::chrono::steady_clock::now() - startWait > BUSY_WAIT_TIME )
183
8.63k
            && !m_exitThreads )
184
8.63k
        {
185
8.63k
          ITT_TASKSTART(itt_domain_thrd, itt_handle_TPblocked);
186
8.63k
          l.lock();
187
8.63k
          ITT_TASKEND(itt_domain_thrd, itt_handle_TPblocked);
188
8.63k
        }
189
93.8M
        else
190
93.8M
        {
191
93.8M
          std::this_thread::yield();
192
93.8M
        }
193
93.8M
      }
194
15.7k
      m_waitingThreads.fetch_sub( 1, std::memory_order_relaxed );
195
15.7k
      ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait );
196
15.7k
    }
197
27.8k
    if( m_exitThreads )
198
4.33k
    {
199
4.33k
      return;
200
4.33k
    }
201
202
23.5k
    processTask( threadId, *taskIt );
203
204
23.5k
    nextTaskIt = taskIt;
205
23.5k
    nextTaskIt.incWrap();
206
23.5k
  }
207
4.34k
}
208
209
NoMallocThreadPool::TaskIterator NoMallocThreadPool::findNextTask( int threadId, TaskIterator startSearch )
210
93.8M
{
211
93.8M
  if( !startSearch.isValid() )
212
0
  {
213
0
    startSearch = m_tasks.begin();
214
0
  }
215
93.8M
  bool first = true;
216
12.0G
  for( auto it = startSearch; it != startSearch || first; it.incWrap() )
217
11.9G
  {
218
#if ENABLE_VALGRIND_CODE
219
    MutexLock lock( m_extraMutex );
220
#endif
221
222
11.9G
    first = false;
223
224
11.9G
    Slot& t = *it;
225
11.9G
    auto expected = WAITING;
226
11.9G
    if( t.state.load( std::memory_order_relaxed ) == WAITING && t.state.compare_exchange_strong( expected, RUNNING ) )
227
153M
    {
228
153M
      if( !t.barriers.empty() )
229
46.7M
      {
230
46.7M
        if( std::any_of( t.barriers.cbegin(), t.barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) )
231
46.7M
        {
232
          // reschedule
233
46.7M
          t.state.store( WAITING );
234
46.7M
          continue;
235
46.7M
        }
236
1.08k
        t.barriers.clear();   // clear barriers, so we don't need to check them on the next try (we assume they won't get locked again)
237
1.08k
      }
238
106M
      if( t.readyCheck && t.readyCheck( threadId, t.param ) == false )
239
106M
      {
240
        // reschedule
241
106M
        t.state.store( WAITING );
242
106M
        continue;
243
106M
      }
244
245
25.4k
      return it;
246
106M
    }
247
11.9G
  }
248
93.8M
  return {};
249
93.8M
}
250
251
bool NoMallocThreadPool::processTask( int threadId, NoMallocThreadPool::Slot& task )
252
23.5k
{
253
23.5k
  const bool success = task.func( threadId, task.param );
254
#if ENABLE_VALGRIND_CODE
255
  MutexLock lock( m_extraMutex );
256
#endif
257
23.5k
  if( !success )
258
19.0k
  {
259
19.0k
    task.state = WAITING;
260
19.0k
    return false;
261
19.0k
  }
262
263
4.41k
  if( task.done != nullptr )
264
0
  {
265
0
    task.done->unlock();
266
0
  }
267
4.41k
  if( task.counter != nullptr )
268
3.33k
  {
269
3.33k
    --(*task.counter);
270
3.33k
  }
271
272
4.41k
  task.state = FREE;
273
274
4.41k
  return true;
275
23.5k
}
276
277
#ifdef HAVE_PTHREADS
278
279
template<class TFunc, class... TArgs>
280
NoMallocThreadPool::PThread::PThread( TFunc&& func, TArgs&&... args )
281
4.34k
{
282
4.34k
  using WrappedCall     = std::function<void()>;
283
4.34k
  std::unique_ptr<WrappedCall> call = std::make_unique<WrappedCall>( std::bind( func, args... ) );
284
285
4.34k
  using PThreadsStartFn = void* (*) ( void* );
286
4.34k
  PThreadsStartFn threadFn = []( void* p ) -> void*
287
4.34k
  {
288
4.34k
    std::unique_ptr<WrappedCall> call( static_cast<WrappedCall*>( p ) );
289
290
4.34k
    ( *call )();
291
292
4.34k
    return nullptr;
293
4.34k
  };
294
295
4.34k
  pthread_attr_t attr;
296
4.34k
  int ret = pthread_attr_init( &attr );
297
4.34k
  CHECK( ret != 0, "pthread_attr_init() failed" );
298
299
4.34k
  try
300
4.34k
  {
301
4.34k
    size_t currStackSize = 0;
302
4.34k
    ret = pthread_attr_getstacksize( &attr, &currStackSize );
303
4.34k
    CHECK( ret != 0, "pthread_attr_getstacksize() failed" );
304
305
4.34k
    if( currStackSize < THREAD_MIN_STACK_SIZE )
306
0
    {
307
0
      ret = pthread_attr_setstacksize( &attr, THREAD_MIN_STACK_SIZE );
308
0
      CHECK( ret != 0, "pthread_attr_setstacksize() failed" );
309
310
0
#  if defined( _DEBUG ) && !defined( __MINGW32__ ) && !defined( __MINGW64__ )
311
0
      ret = pthread_attr_setguardsize( &attr, 1024 * 1024 );   // set stack guard size to 1MB to more reliably deteck stack overflows
312
0
      CHECK( ret != 0, "pthread_attr_setguardsize() failed" );
313
0
#  endif
314
0
    }
315
4.34k
    m_joinable = 0 == pthread_create( &m_id, &attr, threadFn, call.get() );
316
4.34k
    CHECK( !m_joinable, "pthread_create() faild" );
317
318
4.34k
    call.release();   // will now be freed by the thread
319
320
4.34k
    pthread_attr_destroy( &attr );
321
4.34k
  }
322
4.34k
  catch( ... )
323
4.34k
  {
324
0
    pthread_attr_destroy( &attr );
325
0
    throw;
326
0
  }
327
4.34k
}
328
329
NoMallocThreadPool::PThread& NoMallocThreadPool::PThread::operator=( PThread&& other )
330
3.25k
{
331
3.25k
  m_id             = other.m_id;
332
3.25k
  m_joinable       = other.m_joinable;
333
3.25k
  other.m_id       = 0;
334
3.25k
  other.m_joinable = false;
335
3.25k
  return *this;
336
3.25k
}
337
338
void NoMallocThreadPool::PThread::join()
339
4.34k
{
340
4.34k
  if( m_joinable )
341
4.34k
  {
342
4.34k
    m_joinable = false;
343
4.34k
    pthread_join( m_id, nullptr );
344
4.34k
  }
345
4.34k
}
346
347
#endif   // HAVE_PTHREADS
348
349
} // namespace vvenc
350
351
//! \}
352