/src/vvenc/source/Lib/Utilities/NoMallocThreadPool.cpp
Line | Count | Source |
1 | | /* ----------------------------------------------------------------------------- |
2 | | The copyright in this software is being made available under the Clear BSD |
3 | | License, included below. No patent rights, trademark rights and/or |
4 | | other Intellectual Property Rights other than the copyrights concerning |
5 | | the Software are granted under this license. |
6 | | |
7 | | The Clear BSD License |
8 | | |
9 | | Copyright (c) 2019-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVenC Authors. |
10 | | All rights reserved. |
11 | | |
12 | | Redistribution and use in source and binary forms, with or without modification, |
13 | | are permitted (subject to the limitations in the disclaimer below) provided that |
14 | | the following conditions are met: |
15 | | |
16 | | * Redistributions of source code must retain the above copyright notice, |
17 | | this list of conditions and the following disclaimer. |
18 | | |
19 | | * Redistributions in binary form must reproduce the above copyright |
20 | | notice, this list of conditions and the following disclaimer in the |
21 | | documentation and/or other materials provided with the distribution. |
22 | | |
23 | | * Neither the name of the copyright holder nor the names of its |
24 | | contributors may be used to endorse or promote products derived from this |
25 | | software without specific prior written permission. |
26 | | |
27 | | NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY |
28 | | THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND |
29 | | CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
30 | | LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A |
31 | | PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
32 | | CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
33 | | EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
34 | | PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR |
35 | | BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER |
36 | | IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
37 | | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
38 | | POSSIBILITY OF SUCH DAMAGE. |
39 | | |
40 | | |
41 | | ------------------------------------------------------------------------------------------- */ |
42 | | |
43 | | |
44 | | /** \file NoMallocThreadPool.cpp |
45 | | \brief thread pool |
46 | | */ |
47 | | |
48 | | #include "NoMallocThreadPool.h" |
49 | | |
50 | | #ifdef HAVE_PTHREADS |
51 | | # include <pthread.h> |
52 | 0 | # define THREAD_MIN_STACK_SIZE 1024 * 1024 |
53 | | #endif |
54 | | |
55 | | |
56 | | //! \ingroup Utilities |
57 | | //! \{ |
58 | | |
59 | | namespace vvenc { |
60 | | |
61 | | #if ENABLE_TIME_PROFILING_MT_MODE |
62 | | thread_local std::unique_ptr<TProfiler> ptls; |
63 | | #endif |
64 | | |
65 | | NoMallocThreadPool::NoMallocThreadPool( int numThreads, const char * threadPoolName, const VVEncCfg* encCfg ) |
66 | 0 | : m_poolName( threadPoolName ) |
67 | 0 | { |
68 | 0 | if( numThreads < 0 ) |
69 | 0 | { |
70 | 0 | numThreads = std::thread::hardware_concurrency(); |
71 | 0 | } |
72 | |
|
73 | 0 | for( int i = 0; i < numThreads; ++i ) |
74 | 0 | { |
75 | 0 | m_threads.emplace_back( &NoMallocThreadPool::threadProc, this, i, *encCfg ); |
76 | 0 | } |
77 | 0 | } |
78 | | |
79 | | NoMallocThreadPool::~NoMallocThreadPool() |
80 | 0 | { |
81 | 0 | m_exitThreads = true; |
82 | |
|
83 | 0 | waitForThreads(); |
84 | 0 | } |
85 | | |
86 | | bool NoMallocThreadPool::processTasksOnMainThread() |
87 | 0 | { |
88 | 0 | CHECK( m_threads.size() != 0, "should not be used with multiple threads" ); |
89 | |
|
90 | 0 | bool progress = false; |
91 | 0 | TaskIterator firstFailedIt = m_tasks.end(); |
92 | 0 | for( auto taskIt = findNextTask( 0, m_tasks.begin() ); taskIt.isValid(); taskIt = findNextTask( 0, taskIt ) ) |
93 | 0 | { |
94 | 0 | const bool success = processTask( 0, *taskIt ); |
95 | 0 | progress |= success; |
96 | |
|
97 | 0 | if( taskIt == firstFailedIt ) |
98 | 0 | { |
99 | 0 | if( success ) |
100 | 0 | { |
101 | | // first failed was successful -> reset |
102 | 0 | firstFailedIt = m_tasks.end(); |
103 | 0 | } |
104 | 0 | else if( progress ) |
105 | 0 | { |
106 | | // reset progress, try another round |
107 | 0 | progress = false; |
108 | 0 | } |
109 | 0 | else |
110 | 0 | { |
111 | | // no progress -> exit |
112 | 0 | break; |
113 | 0 | } |
114 | 0 | } |
115 | 0 | else if( !success && !firstFailedIt.isValid() ) |
116 | 0 | { |
117 | 0 | firstFailedIt = taskIt; |
118 | 0 | } |
119 | 0 | } |
120 | | |
121 | | // return true if all done (-> false if some tasks blocked due to barriers) |
122 | 0 | return std::all_of( m_tasks.begin(), m_tasks.end(), []( Slot& t ) { return t.state == FREE; } ); |
123 | 0 | } |
124 | | |
125 | | void NoMallocThreadPool::shutdown( bool block ) |
126 | 0 | { |
127 | 0 | m_exitThreads = true; |
128 | 0 | if( block ) |
129 | 0 | { |
130 | 0 | waitForThreads(); |
131 | 0 | } |
132 | 0 | } |
133 | | |
134 | | void NoMallocThreadPool::waitForThreads() |
135 | 0 | { |
136 | 0 | for( auto& t: m_threads ) |
137 | 0 | { |
138 | 0 | if( t.joinable() ) |
139 | 0 | t.join(); |
140 | 0 | } |
141 | 0 | } |
142 | | |
143 | | void NoMallocThreadPool::threadProc( int threadId, const VVEncCfg& encCfg ) |
144 | 0 | { |
145 | 0 | #if __linux |
146 | 0 | if( !m_poolName.empty() ) |
147 | 0 | { |
148 | 0 | std::string threadName( m_poolName + std::to_string( threadId ) ); |
149 | 0 | pthread_setname_np( pthread_self(), threadName.c_str() ); |
150 | 0 | } |
151 | 0 | #endif |
152 | | #if ENABLE_TIME_PROFILING_MT_MODE |
153 | | ptls.reset( timeProfilerCreate( encCfg ) ); |
154 | | { |
155 | | std::unique_lock< std::mutex > lock( m_nextFillSlotMutex ); |
156 | | TProfiler *tp = ptls.get(); |
157 | | profilers.push_back( tp ); |
158 | | } |
159 | | #endif |
160 | |
|
161 | 0 | auto nextTaskIt = m_tasks.begin(); |
162 | 0 | while( !m_exitThreads ) |
163 | 0 | { |
164 | 0 | auto taskIt = findNextTask( threadId, nextTaskIt ); |
165 | 0 | if( !taskIt.isValid() ) |
166 | 0 | { |
167 | 0 | std::unique_lock<std::mutex> l( m_idleMutex, std::defer_lock ); |
168 | |
|
169 | 0 | ITT_TASKSTART( itt_domain_thrd, itt_handle_TPspinWait ); |
170 | 0 | m_waitingThreads.fetch_add( 1, std::memory_order_relaxed ); |
171 | 0 | const auto startWait = std::chrono::steady_clock::now(); |
172 | 0 | while( !m_exitThreads ) |
173 | 0 | { |
174 | 0 | taskIt = findNextTask( threadId, nextTaskIt ); |
175 | 0 | if( taskIt.isValid() || m_exitThreads ) |
176 | 0 | { |
177 | 0 | break; |
178 | 0 | } |
179 | | |
180 | 0 | if( !l.owns_lock() |
181 | 0 | && m_waitingThreads.load( std::memory_order_relaxed ) > 1 |
182 | 0 | && ( BUSY_WAIT_TIME.count() == 0 || std::chrono::steady_clock::now() - startWait > BUSY_WAIT_TIME ) |
183 | 0 | && !m_exitThreads ) |
184 | 0 | { |
185 | 0 | ITT_TASKSTART(itt_domain_thrd, itt_handle_TPblocked); |
186 | 0 | l.lock(); |
187 | 0 | ITT_TASKEND(itt_domain_thrd, itt_handle_TPblocked); |
188 | 0 | } |
189 | 0 | else |
190 | 0 | { |
191 | 0 | std::this_thread::yield(); |
192 | 0 | } |
193 | 0 | } |
194 | 0 | m_waitingThreads.fetch_sub( 1, std::memory_order_relaxed ); |
195 | 0 | ITT_TASKEND( itt_domain_thrd, itt_handle_TPspinWait ); |
196 | 0 | } |
197 | 0 | if( m_exitThreads ) |
198 | 0 | { |
199 | 0 | return; |
200 | 0 | } |
201 | | |
202 | 0 | processTask( threadId, *taskIt ); |
203 | |
|
204 | 0 | nextTaskIt = taskIt; |
205 | 0 | nextTaskIt.incWrap(); |
206 | 0 | } |
207 | 0 | } |
208 | | |
209 | | NoMallocThreadPool::TaskIterator NoMallocThreadPool::findNextTask( int threadId, TaskIterator startSearch ) |
210 | 0 | { |
211 | 0 | if( !startSearch.isValid() ) |
212 | 0 | { |
213 | 0 | startSearch = m_tasks.begin(); |
214 | 0 | } |
215 | 0 | bool first = true; |
216 | 0 | for( auto it = startSearch; it != startSearch || first; it.incWrap() ) |
217 | 0 | { |
218 | | #if ENABLE_VALGRIND_CODE |
219 | | MutexLock lock( m_extraMutex ); |
220 | | #endif |
221 | |
|
222 | 0 | first = false; |
223 | |
|
224 | 0 | Slot& t = *it; |
225 | 0 | auto expected = WAITING; |
226 | 0 | if( t.state.load( std::memory_order_relaxed ) == WAITING && t.state.compare_exchange_strong( expected, RUNNING ) ) |
227 | 0 | { |
228 | 0 | if( !t.barriers.empty() ) |
229 | 0 | { |
230 | 0 | if( std::any_of( t.barriers.cbegin(), t.barriers.cend(), []( const Barrier* b ) { return b && b->isBlocked(); } ) ) |
231 | 0 | { |
232 | | // reschedule |
233 | 0 | t.state.store( WAITING ); |
234 | 0 | continue; |
235 | 0 | } |
236 | 0 | t.barriers.clear(); // clear barriers, so we don't need to check them on the next try (we assume they won't get locked again) |
237 | 0 | } |
238 | 0 | if( t.readyCheck && t.readyCheck( threadId, t.param ) == false ) |
239 | 0 | { |
240 | | // reschedule |
241 | 0 | t.state.store( WAITING ); |
242 | 0 | continue; |
243 | 0 | } |
244 | | |
245 | 0 | return it; |
246 | 0 | } |
247 | 0 | } |
248 | 0 | return {}; |
249 | 0 | } |
250 | | |
251 | | bool NoMallocThreadPool::processTask( int threadId, NoMallocThreadPool::Slot& task ) |
252 | 0 | { |
253 | 0 | const bool success = task.func( threadId, task.param ); |
254 | | #if ENABLE_VALGRIND_CODE |
255 | | MutexLock lock( m_extraMutex ); |
256 | | #endif |
257 | 0 | if( !success ) |
258 | 0 | { |
259 | 0 | task.state = WAITING; |
260 | 0 | return false; |
261 | 0 | } |
262 | | |
263 | 0 | if( task.done != nullptr ) |
264 | 0 | { |
265 | 0 | task.done->unlock(); |
266 | 0 | } |
267 | 0 | if( task.counter != nullptr ) |
268 | 0 | { |
269 | 0 | --(*task.counter); |
270 | 0 | } |
271 | |
|
272 | 0 | task.state = FREE; |
273 | |
|
274 | 0 | return true; |
275 | 0 | } |
276 | | |
277 | | #ifdef HAVE_PTHREADS |
278 | | |
279 | | template<class TFunc, class... TArgs> |
280 | | NoMallocThreadPool::PThread::PThread( TFunc&& func, TArgs&&... args ) |
281 | 0 | { |
282 | 0 | using WrappedCall = std::function<void()>; |
283 | 0 | std::unique_ptr<WrappedCall> call = std::make_unique<WrappedCall>( std::bind( func, args... ) ); |
284 | |
|
285 | 0 | using PThreadsStartFn = void* (*) ( void* ); |
286 | 0 | PThreadsStartFn threadFn = []( void* p ) -> void* |
287 | 0 | { |
288 | 0 | std::unique_ptr<WrappedCall> call( static_cast<WrappedCall*>( p ) ); |
289 | |
|
290 | 0 | ( *call )(); |
291 | |
|
292 | 0 | return nullptr; |
293 | 0 | }; |
294 | |
|
295 | 0 | pthread_attr_t attr; |
296 | 0 | int ret = pthread_attr_init( &attr ); |
297 | 0 | CHECK( ret != 0, "pthread_attr_init() failed" ); |
298 | |
|
299 | 0 | try |
300 | 0 | { |
301 | 0 | size_t currStackSize = 0; |
302 | 0 | ret = pthread_attr_getstacksize( &attr, &currStackSize ); |
303 | 0 | CHECK( ret != 0, "pthread_attr_getstacksize() failed" ); |
304 | |
|
305 | 0 | if( currStackSize < THREAD_MIN_STACK_SIZE ) |
306 | 0 | { |
307 | 0 | ret = pthread_attr_setstacksize( &attr, THREAD_MIN_STACK_SIZE ); |
308 | 0 | CHECK( ret != 0, "pthread_attr_setstacksize() failed" ); |
309 | |
|
310 | 0 | # if defined( _DEBUG ) && !defined( __MINGW32__ ) && !defined( __MINGW64__ ) |
311 | 0 | ret = pthread_attr_setguardsize( &attr, 1024 * 1024 ); // set stack guard size to 1MB to more reliably deteck stack overflows |
312 | 0 | CHECK( ret != 0, "pthread_attr_setguardsize() failed" ); |
313 | 0 | # endif |
314 | 0 | } |
315 | 0 | m_joinable = 0 == pthread_create( &m_id, &attr, threadFn, call.get() ); |
316 | 0 | CHECK( !m_joinable, "pthread_create() faild" ); |
317 | |
|
318 | 0 | call.release(); // will now be freed by the thread |
319 | |
|
320 | 0 | pthread_attr_destroy( &attr ); |
321 | 0 | } |
322 | 0 | catch( ... ) |
323 | 0 | { |
324 | 0 | pthread_attr_destroy( &attr ); |
325 | 0 | throw; |
326 | 0 | } |
327 | 0 | } |
328 | | |
329 | | NoMallocThreadPool::PThread& NoMallocThreadPool::PThread::operator=( PThread&& other ) |
330 | 0 | { |
331 | 0 | m_id = other.m_id; |
332 | 0 | m_joinable = other.m_joinable; |
333 | 0 | other.m_id = 0; |
334 | 0 | other.m_joinable = false; |
335 | 0 | return *this; |
336 | 0 | } |
337 | | |
338 | | void NoMallocThreadPool::PThread::join() |
339 | 0 | { |
340 | 0 | if( m_joinable ) |
341 | 0 | { |
342 | 0 | m_joinable = false; |
343 | 0 | pthread_join( m_id, nullptr ); |
344 | 0 | } |
345 | 0 | } |
346 | | |
347 | | #endif // HAVE_PTHREADS |
348 | | |
349 | | } // namespace vvenc |
350 | | |
351 | | //! \} |
352 | | |