Coverage Report

Created: 2026-05-30 06:10

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/work/vvenc/source/Lib/Utilities/NoMallocThreadPool.cpp
Line
Count
Source
1
/* -----------------------------------------------------------------------------
2
The copyright in this software is being made available under the Clear BSD
3
License, included below. No patent rights, trademark rights and/or 
4
other Intellectual Property Rights other than the copyrights concerning 
5
the Software are granted under this license.
6
7
The Clear BSD License
8
9
Copyright (c) 2019-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVenC Authors.
10
All rights reserved.
11
12
Redistribution and use in source and binary forms, with or without modification,
13
are permitted (subject to the limitations in the disclaimer below) provided that
14
the following conditions are met:
15
16
     * Redistributions of source code must retain the above copyright notice,
17
     this list of conditions and the following disclaimer.
18
19
     * Redistributions in binary form must reproduce the above copyright
20
     notice, this list of conditions and the following disclaimer in the
21
     documentation and/or other materials provided with the distribution.
22
23
     * Neither the name of the copyright holder nor the names of its
24
     contributors may be used to endorse or promote products derived from this
25
     software without specific prior written permission.
26
27
NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY
28
THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
29
CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
30
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
31
PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
32
CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
33
EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
34
PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
35
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER
36
IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
37
ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
38
POSSIBILITY OF SUCH DAMAGE.
39
40
41
------------------------------------------------------------------------------------------- */
42
43
44
/** \file     NoMallocThreadPool.cpp
45
    \brief    thread pool
46
*/
47
48
#include "NoMallocThreadPool.h"
49
50
#ifdef HAVE_PTHREADS
51
#  include <pthread.h>
52
5.19k
#  define THREAD_MIN_STACK_SIZE 1024 * 1024
53
#endif
54
55
56
//! \ingroup Utilities
57
//! \{
58
59
namespace vvenc {
60
61
#if ENABLE_TIME_PROFILING_MT_MODE
62
thread_local std::unique_ptr<TProfiler> ptls;
63
#endif
64
65
NoMallocThreadPool::NoMallocThreadPool( int numThreads, const char * threadPoolName, const VVEncCfg* encCfg )
66
1.29k
  : m_poolName( threadPoolName )
67
1.29k
{
68
1.29k
  if( numThreads < 0 )
69
0
  {
70
0
    numThreads = std::thread::hardware_concurrency();
71
0
  }
72
73
6.49k
  for( int i = 0; i < numThreads; ++i )
74
5.19k
  {
75
5.19k
    m_threads.emplace_back( &NoMallocThreadPool::threadProc, this, i, *encCfg );
76
5.19k
  }
77
1.29k
}
78
79
NoMallocThreadPool::~NoMallocThreadPool()
80
1.29k
{
81
1.29k
  m_exitThreads = true;
82
83
1.29k
  waitForThreads();
84
1.29k
}
85
86
bool NoMallocThreadPool::processTasksOnMainThread()
87
0
{
88
0
  CHECK( m_threads.size() != 0, "should not be used with multiple threads" );
89
90
0
  bool         progress      = false;
91
0
  TaskIterator firstFailedIt = m_tasks.end();
92
0
  for( auto taskIt = findNextTask( 0, m_tasks.begin() ); taskIt.isValid(); taskIt = findNextTask( 0, taskIt ) )
93
0
  {
94
0
    const bool success = processTask( 0, *taskIt );
95
0
    progress |= success;
96
97
0
    if( taskIt == firstFailedIt )
98
0
    {
99
0
      if( success )
100
0
      {
101
        // first failed was successful -> reset
102
0
        firstFailedIt = m_tasks.end();
103
0
      }
104
0
      else if( progress )
105
0
      {
106
        // reset progress, try another round
107
0
        progress = false;
108
0
      }
109
0
      else
110
0
      {
111
        // no progress -> exit
112
0
        break;
113
0
      }
114
0
    }
115
0
    else if( !success && !firstFailedIt.isValid() )
116
0
    {
117
0
      firstFailedIt = taskIt;
118
0
    }
119
0
  }
120
121
  // return true if all done (-> false if some tasks blocked due to barriers)
122
0
  return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } );
123
0
}
124
125
void NoMallocThreadPool::shutdown( bool block )
126
1.29k
{
127
1.29k
  m_exitThreads = true;
128
1.29k
  if( block )
129
1.29k
  {
130
1.29k
    waitForThreads();
131
1.29k
  }
132
1.29k
}
133
134
void NoMallocThreadPool::waitForThreads()
135
2.59k
{
136
2.59k
  for( auto& t: m_threads )
137
10.3k
  {
138
10.3k
    if( t.joinable() )
139
5.19k
      t.join();
140
10.3k
  }
141
2.59k
}
142
143
void NoMallocThreadPool::threadProc( int threadId, const VVEncCfg& encCfg )
144
5.19k
{
145
5.19k
#if __linux
146
5.19k
  if( !m_poolName.empty() )
147
5.19k
  {
148
5.19k
    std::string threadName( m_poolName + std::to_string( threadId ) );
149
5.19k
    pthread_setname_np( pthread_self(), threadName.c_str() );
150
5.19k
  }
151
5.19k
#endif
152
#if ENABLE_TIME_PROFILING_MT_MODE
153
  ptls.reset( timeProfilerCreate( encCfg ) );
154
  {
155
    std::unique_lock< std::mutex > lock( m_nextFillSlotMutex );
156
    TProfiler *tp = ptls.get();
157
    profilers.push_back( tp );
158
  }
159
#endif
160
161
5.19k
  auto nextTaskIt = m_tasks.begin();
162
33.5k
  while( !m_exitThreads )
163
33.5k
  {
164
33.5k
    auto taskIt = findNextTask( threadId, nextTaskIt );
165
33.5k
    if( !taskIt.isValid() )
166
19.1k
    {
167
19.1k
      std::unique_lock<std::mutex> l( m_idleMutex, std::defer_lock );
168
169
19.1k
      ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait );
170
19.1k
      m_waitingThreads.fetch_add( 1, std::memory_order_relaxed );
171
19.1k
      const auto startWait = std::chrono::steady_clock::now();
172
116M
      while( !m_exitThreads )
173
116M
      {
174
116M
        taskIt = findNextTask( threadId, nextTaskIt );
175
116M
        if( taskIt.isValid() || m_exitThreads )
176
18.2k
        {
177
18.2k
          break;
178
18.2k
        }
179
180
116M
        if( !l.owns_lock()
181
1.38M
            && m_waitingThreads.load( std::memory_order_relaxed ) > 1
182
1.19M
            && ( BUSY_WAIT_TIME.count() == 0 || std::chrono::steady_clock::now() - startWait > BUSY_WAIT_TIME )
183
10.8k
            && !m_exitThreads )
184
10.8k
        {
185
10.8k
          ITT_TASKSTART(itt_domain_thrd, itt_handle_TPblocked);
186
10.8k
          l.lock();
187
10.8k
          ITT_TASKEND(itt_domain_thrd, itt_handle_TPblocked);
188
10.8k
        }
189
116M
        else
190
116M
        {
191
116M
          std::this_thread::yield();
192
116M
        }
193
116M
      }
194
19.1k
      m_waitingThreads.fetch_sub( 1, std::memory_order_relaxed );
195
19.1k
      ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait );
196
19.1k
    }
197
33.5k
    if( m_exitThreads )
198
5.19k
    {
199
5.19k
      return;
200
5.19k
    }
201
202
28.3k
    processTask( threadId, *taskIt );
203
204
28.3k
    nextTaskIt = taskIt;
205
28.3k
    nextTaskIt.incWrap();
206
28.3k
  }
207
5.19k
}
208
209
NoMallocThreadPool::TaskIterator NoMallocThreadPool::findNextTask( int threadId, TaskIterator startSearch )
210
116M
{
211
116M
  if( !startSearch.isValid() )
212
0
  {
213
0
    startSearch = m_tasks.begin();
214
0
  }
215
116M
  bool first = true;
216
15.0G
  for( auto it = startSearch; it != startSearch || first; it.incWrap() )
217
14.9G
  {
218
#if ENABLE_VALGRIND_CODE
219
    MutexLock lock( m_extraMutex );
220
#endif
221
222
14.9G
    first = false;
223
224
14.9G
    Slot& t = *it;
225
14.9G
    auto expected = WAITING;
226
14.9G
    if( t.state.load( std::memory_order_relaxed ) == WAITING && t.state.compare_exchange_strong( expected, RUNNING ) )
227
188M
    {
228
188M
      if( !t.barriers.empty() )
229
57.5M
      {
230
57.5M
        if( std::any_of( t.barriers.cbegin(), t.barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) )
231
57.5M
        {
232
          // reschedule
233
57.5M
          t.state.store( WAITING );
234
57.5M
          continue;
235
57.5M
        }
236
1.29k
        t.barriers.clear();   // clear barriers, so we don't need to check them on the next try (we assume they won't get locked again)
237
1.29k
      }
238
131M
      if( t.readyCheck && t.readyCheck( threadId, t.param ) == false )
239
131M
      {
240
        // reschedule
241
131M
        t.state.store( WAITING );
242
131M
        continue;
243
131M
      }
244
245
29.5k
      return it;
246
131M
    }
247
14.9G
  }
248
116M
  return {};
249
116M
}
250
251
bool NoMallocThreadPool::processTask( int threadId, NoMallocThreadPool::Slot& task )
252
28.3k
{
253
28.3k
  const bool success = task.func( threadId, task.param );
254
#if ENABLE_VALGRIND_CODE
255
  MutexLock lock( m_extraMutex );
256
#endif
257
28.3k
  if( !success )
258
23.0k
  {
259
23.0k
    task.state = WAITING;
260
23.0k
    return false;
261
23.0k
  }
262
263
5.34k
  if( task.done != nullptr )
264
0
  {
265
0
    task.done->unlock();
266
0
  }
267
5.34k
  if( task.counter != nullptr )
268
4.05k
  {
269
4.05k
    --(*task.counter);
270
4.05k
  }
271
272
5.34k
  task.state = FREE;
273
274
5.34k
  return true;
275
28.3k
}
276
277
#ifdef HAVE_PTHREADS
278
279
template<class TFunc, class... TArgs>
280
NoMallocThreadPool::PThread::PThread( TFunc&& func, TArgs&&... args )
281
5.19k
{
282
5.19k
  using WrappedCall     = std::function<void()>;
283
5.19k
  std::unique_ptr<WrappedCall> call = std::make_unique<WrappedCall>( std::bind( func, args... ) );
284
285
5.19k
  using PThreadsStartFn = void* (*) ( void* );
286
5.19k
  PThreadsStartFn threadFn = []( void* p ) -> void*
287
5.19k
  {
288
5.19k
    std::unique_ptr<WrappedCall> call( static_cast<WrappedCall*>( p ) );
289
290
5.19k
    ( *call )();
291
292
5.19k
    return nullptr;
293
5.19k
  };
294
295
5.19k
  pthread_attr_t attr;
296
5.19k
  int ret = pthread_attr_init( &attr );
297
5.19k
  CHECK( ret != 0, "pthread_attr_init() failed" );
298
299
5.19k
  try
300
5.19k
  {
301
5.19k
    size_t currStackSize = 0;
302
5.19k
    ret = pthread_attr_getstacksize( &attr, &currStackSize );
303
5.19k
    CHECK( ret != 0, "pthread_attr_getstacksize() failed" );
304
305
5.19k
    if( currStackSize < THREAD_MIN_STACK_SIZE )
306
0
    {
307
0
      ret = pthread_attr_setstacksize( &attr, THREAD_MIN_STACK_SIZE );
308
0
      CHECK( ret != 0, "pthread_attr_setstacksize() failed" );
309
310
0
#  if defined( _DEBUG ) && !defined( __MINGW32__ ) && !defined( __MINGW64__ )
311
0
      ret = pthread_attr_setguardsize( &attr, 1024 * 1024 );   // set stack guard size to 1MB to more reliably deteck stack overflows
312
0
      CHECK( ret != 0, "pthread_attr_setguardsize() failed" );
313
0
#  endif
314
0
    }
315
5.19k
    m_joinable = 0 == pthread_create( &m_id, &attr, threadFn, call.get() );
316
5.19k
    CHECK( !m_joinable, "pthread_create() faild" );
317
318
5.19k
    call.release();   // will now be freed by the thread
319
320
5.19k
    pthread_attr_destroy( &attr );
321
5.19k
  }
322
5.19k
  catch( ... )
323
5.19k
  {
324
0
    pthread_attr_destroy( &attr );
325
0
    throw;
326
0
  }
327
5.19k
}
328
329
NoMallocThreadPool::PThread& NoMallocThreadPool::PThread::operator=( PThread&& other )
330
3.89k
{
331
3.89k
  m_id             = other.m_id;
332
3.89k
  m_joinable       = other.m_joinable;
333
3.89k
  other.m_id       = 0;
334
3.89k
  other.m_joinable = false;
335
3.89k
  return *this;
336
3.89k
}
337
338
void NoMallocThreadPool::PThread::join()
339
5.19k
{
340
5.19k
  if( m_joinable )
341
5.19k
  {
342
5.19k
    m_joinable = false;
343
5.19k
    pthread_join( m_id, nullptr );
344
5.19k
  }
345
5.19k
}
346
347
#endif   // HAVE_PTHREADS
348
349
} // namespace vvenc
350
351
//! \}
352