/src/vvenc/source/Lib/Utilities/NoMallocThreadPool.h
Line | Count | Source |
1 | | /* ----------------------------------------------------------------------------- |
2 | | The copyright in this software is being made available under the Clear BSD |
3 | | License, included below. No patent rights, trademark rights and/or |
4 | | other Intellectual Property Rights other than the copyrights concerning |
5 | | the Software are granted under this license. |
6 | | |
7 | | The Clear BSD License |
8 | | |
9 | | Copyright (c) 2019-2026, Fraunhofer-Gesellschaft zur Förderung der angewandten Forschung e.V. & The VVenC Authors. |
10 | | All rights reserved. |
11 | | |
12 | | Redistribution and use in source and binary forms, with or without modification, |
13 | | are permitted (subject to the limitations in the disclaimer below) provided that |
14 | | the following conditions are met: |
15 | | |
16 | | * Redistributions of source code must retain the above copyright notice, |
17 | | this list of conditions and the following disclaimer. |
18 | | |
19 | | * Redistributions in binary form must reproduce the above copyright |
20 | | notice, this list of conditions and the following disclaimer in the |
21 | | documentation and/or other materials provided with the distribution. |
22 | | |
23 | | * Neither the name of the copyright holder nor the names of its |
24 | | contributors may be used to endorse or promote products derived from this |
25 | | software without specific prior written permission. |
26 | | |
27 | | NO EXPRESS OR IMPLIED LICENSES TO ANY PARTY'S PATENT RIGHTS ARE GRANTED BY |
28 | | THIS LICENSE. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND |
29 | | CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
30 | | LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A |
31 | | PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR |
32 | | CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, |
33 | | EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, |
34 | | PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR |
35 | | BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER |
36 | | IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
37 | | ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
38 | | POSSIBILITY OF SUCH DAMAGE. |
39 | | |
40 | | |
41 | | ------------------------------------------------------------------------------------------- */ |
42 | | /** \file NoMallocThreadPool.h |
43 | | \brief thread pool |
44 | | */ |
45 | | |
46 | | #pragma once |
47 | | |
48 | | #include <thread> |
49 | | #include <mutex> |
50 | | #include <condition_variable> |
51 | | #include <atomic> |
52 | | #include <chrono> |
53 | | #include <array> |
54 | | |
55 | | #ifdef HAVE_PTHREADS |
56 | | #include <pthread.h> |
57 | | #endif |
58 | | |
59 | | #include "CommonLib/CommonDef.h" |
60 | | #if ENABLE_TIME_PROFILING_MT_MODE |
61 | | #include "CommonLib/TimeProfiler.h" |
62 | | #endif |
63 | | |
64 | | //! \ingroup Utilities |
65 | | //! \{ |
66 | | |
67 | | namespace vvenc { |
68 | | |
69 | | #ifdef TRACE_ENABLE_ITT |
70 | | static __itt_domain* itt_domain_thrd = __itt_domain_create( "Threading" ); |
71 | | |
72 | | static __itt_string_handle* itt_handle_TPspinWait = __itt_string_handle_create( "Spin_Wait" ); |
73 | | static __itt_string_handle* itt_handle_TPblocked = __itt_string_handle_create( "Blocked" ); |
74 | | static __itt_string_handle* itt_handle_TPaddTask = __itt_string_handle_create( "Add_Task" ); |
75 | | |
76 | | //static long itt_TP_blocked = 1; |
77 | | |
78 | | #endif //TRACE_ENABLE_ITT |
79 | | |
80 | | #if ENABLE_VALGRIND_CODE |
81 | | typedef std::unique_lock< std::mutex > MutexLock; |
82 | | #endif |
83 | | |
84 | | // block threads after busy-waiting this long |
85 | 2.56k | const static auto BUSY_WAIT_TIME = [] { |
86 | 2.56k | const char *env = getenv( "BUSY_WAIT_TIME" ); |
87 | 2.56k | if( env ) |
88 | 0 | return std::chrono::milliseconds( atoi( env ) ); |
89 | 2.56k | return std::chrono::milliseconds( 1 ); |
90 | 2.56k | }(); vvencimpl.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
EncLib.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
NoMallocThreadPool.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
MCTF.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
EncGOP.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
EncPicture.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
EncSlice.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
EncAdaptiveLoopFilter.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
EncCu.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
IntraSearch.cpp:vvenc::$_0::operator()() const Line | Count | Source | 85 | 256 | const static auto BUSY_WAIT_TIME = [] { | 86 | 256 | const char *env = getenv( "BUSY_WAIT_TIME" ); | 87 | 256 | if( env ) | 88 | 0 | return std::chrono::milliseconds( atoi( env ) ); | 89 | 256 | return std::chrono::milliseconds( 1 ); | 90 | 256 | }(); |
|
91 | | |
92 | | |
93 | | // enable this if tasks need to be added from mutliple threads |
94 | | #define ADD_TASK_THREAD_SAFE 1 |
95 | | |
96 | | |
97 | | // --------------------------------------------------------------------------- |
98 | | // Synchronization tools |
99 | | // --------------------------------------------------------------------------- |
100 | | |
101 | | struct Barrier |
102 | | { |
103 | | void unlock() |
104 | 0 | { |
105 | 0 | m_lockState.store( false ); |
106 | 0 | } |
107 | | |
108 | | void lock() |
109 | 0 | { |
110 | 0 | m_lockState.store( true ); |
111 | 0 | } |
112 | | |
113 | | bool isBlocked() const |
114 | 0 | { |
115 | 0 | return m_lockState; |
116 | 0 | } |
117 | | |
118 | | Barrier() = default; |
119 | | ~Barrier() = default; |
120 | 0 | explicit Barrier( bool locked ) : m_lockState( locked ) {} |
121 | | |
122 | | Barrier( const Barrier & ) = delete; |
123 | | Barrier( Barrier && ) = delete; |
124 | | |
125 | | Barrier& operator=( const Barrier & ) = delete; |
126 | | Barrier& operator=( Barrier && ) = delete; |
127 | | |
128 | | private: |
129 | | std::atomic_bool m_lockState{ true }; |
130 | | }; |
131 | | |
132 | | struct BlockingBarrier |
133 | | { |
134 | | void unlock() |
135 | 0 | { |
136 | 0 | std::unique_lock<std::mutex> l( m_lock ); |
137 | 0 | m_intBarrier.unlock(); |
138 | 0 | if( !m_intBarrier.isBlocked() ) |
139 | 0 | { |
140 | 0 | m_cond.notify_all(); |
141 | 0 | } |
142 | 0 | } |
143 | | |
144 | | void lock() |
145 | 0 | { |
146 | 0 | std::unique_lock<std::mutex> l( m_lock ); |
147 | 0 | m_intBarrier.lock(); |
148 | 0 | } |
149 | | |
150 | | bool isBlocked() const |
151 | 0 | { |
152 | 0 | return m_intBarrier.isBlocked(); |
153 | 0 | } |
154 | | |
155 | | void wait() const |
156 | 0 | { |
157 | 0 | BlockingBarrier* nonconst = const_cast<BlockingBarrier*>(this); |
158 | 0 |
|
159 | 0 | std::unique_lock<std::mutex> l( nonconst->m_lock ); |
160 | 0 | nonconst->m_cond.wait( l, [this] { return !m_intBarrier.isBlocked(); } ); |
161 | 0 | } |
162 | | |
163 | | BlockingBarrier() = default; |
164 | 0 | ~BlockingBarrier() { std::unique_lock<std::mutex> l( m_lock ); } // ensure all threads have unlocked the mutex, when we start destruction |
165 | | |
166 | | BlockingBarrier( const BlockingBarrier& ) = delete; |
167 | | BlockingBarrier( BlockingBarrier&& ) = delete; |
168 | | |
169 | | BlockingBarrier& operator=( const BlockingBarrier& ) = delete; |
170 | | BlockingBarrier& operator=( BlockingBarrier&& ) = delete; |
171 | | |
172 | | // cast to const ref Barrier, so we can use it for thread pool tasks: |
173 | 0 | operator const Barrier&() const { return m_intBarrier; } |
174 | | |
175 | | private: |
176 | | Barrier m_intBarrier; |
177 | | std::condition_variable m_cond; |
178 | | std::mutex m_lock; |
179 | | }; |
180 | | |
181 | | struct WaitCounter |
182 | | { |
183 | | int operator++() |
184 | 0 | { |
185 | 0 | std::unique_lock<std::mutex> l( m_lock ); |
186 | 0 | done.lock(); |
187 | 0 | return ++m_count; |
188 | 0 | } |
189 | | |
190 | | int operator--() |
191 | 0 | { |
192 | 0 | std::unique_lock<std::mutex> l( m_lock ); |
193 | 0 | const unsigned int new_count = --m_count; |
194 | 0 | if( new_count == 0 ) |
195 | 0 | { |
196 | 0 | m_cond.notify_all(); |
197 | 0 | done.unlock(); |
198 | 0 | } |
199 | 0 | l.unlock(); // unlock mutex after done-barrier to prevent race between barrier and counter |
200 | 0 | return new_count; |
201 | 0 | } |
202 | | |
203 | | bool isBlocked() const |
204 | 0 | { |
205 | 0 | return 0 != m_count; |
206 | 0 | } |
207 | | |
208 | | void wait() const |
209 | 0 | { |
210 | 0 | WaitCounter* nonconst = const_cast<WaitCounter*>(this); |
211 | |
|
212 | 0 | std::unique_lock<std::mutex> l( nonconst->m_lock ); |
213 | 0 | nonconst->m_cond.wait( l, [this] { return m_count == 0; } ); |
214 | 0 | } |
215 | | |
216 | 0 | WaitCounter() = default; |
217 | 0 | ~WaitCounter() { std::unique_lock<std::mutex> l( m_lock ); } // ensure all threads have unlocked the mutex, when we start destruction |
218 | | |
219 | | WaitCounter( const WaitCounter & ) = delete; |
220 | | WaitCounter( WaitCounter && ) = delete; |
221 | | |
222 | | WaitCounter &operator=( const WaitCounter & ) = delete; |
223 | | WaitCounter &operator=( WaitCounter && ) = delete; |
224 | | |
225 | | Barrier done{ false }; |
226 | | |
227 | | private: |
228 | | std::condition_variable m_cond; |
229 | | std::mutex m_lock; |
230 | | unsigned int m_count = 0; |
231 | | }; |
232 | | |
233 | | |
234 | | |
235 | | // --------------------------------------------------------------------------- |
236 | | // Thread Pool |
237 | | // --------------------------------------------------------------------------- |
238 | | |
239 | | using CBarrierVec = std::vector<const Barrier*>; |
240 | | |
241 | | class NoMallocThreadPool |
242 | | { |
243 | | typedef enum |
244 | | { |
245 | | FREE = 0, |
246 | | PREPARING, |
247 | | WAITING, |
248 | | RUNNING |
249 | | } TaskState; |
250 | | |
251 | | using TaskFunc = bool ( * )( int, void * ); |
252 | | |
253 | | struct Slot |
254 | | { |
255 | | TaskFunc func { nullptr }; |
256 | | TaskFunc readyCheck{ nullptr }; |
257 | | void* param { nullptr }; |
258 | | WaitCounter* counter { nullptr }; |
259 | | Barrier* done { nullptr }; |
260 | | CBarrierVec barriers; |
261 | | std::atomic<TaskState> state { FREE }; |
262 | | }; |
263 | | |
264 | | |
265 | | class ChunkedTaskQueue |
266 | | { |
267 | | constexpr static int ChunkSize = 128; |
268 | | |
269 | | class Chunk |
270 | | { |
271 | | std::array<Slot, ChunkSize> m_slots; |
272 | | std::atomic<Chunk*> m_next{ nullptr }; |
273 | | Chunk& m_firstChunk; |
274 | | |
275 | 0 | Chunk( Chunk* firstPtr ) : m_firstChunk{ *firstPtr } {} |
276 | | |
277 | | friend class ChunkedTaskQueue; |
278 | | }; |
279 | | |
280 | | public: |
281 | | class Iterator |
282 | | { |
283 | | Slot* m_slot = nullptr; |
284 | | Chunk* m_chunk = nullptr; |
285 | | |
286 | | public: |
287 | 0 | Iterator() = default; |
288 | 0 | Iterator( Slot* slot, Chunk* chunk ) : m_slot( slot ), m_chunk( chunk ) {} |
289 | | |
290 | | Iterator& operator++() |
291 | 0 | { |
292 | 0 | CHECKD( m_slot == nullptr, "incrementing invalid iterator" ); |
293 | 0 | CHECKD( m_chunk == nullptr, "incrementing invalid iterator" ); |
294 | |
|
295 | 0 | if( m_slot != &m_chunk->m_slots.back() ) |
296 | 0 | { |
297 | 0 | ++m_slot; |
298 | 0 | } |
299 | 0 | else |
300 | 0 | { |
301 | 0 | m_chunk = m_chunk->m_next; |
302 | 0 | if( m_chunk ) |
303 | 0 | { |
304 | 0 | m_slot = &m_chunk->m_slots.front(); |
305 | 0 | } |
306 | 0 | else |
307 | 0 | { |
308 | 0 | m_slot = nullptr; |
309 | 0 | } |
310 | 0 | } |
311 | 0 | return *this; |
312 | 0 | } |
313 | | |
314 | | // increment iterator and wrap around, if end is reached |
315 | | Iterator& incWrap() |
316 | 0 | { |
317 | 0 | CHECKD( m_slot == nullptr, "incrementing invalid iterator" ); |
318 | 0 | CHECKD( m_chunk == nullptr, "incrementing invalid iterator" ); |
319 | |
|
320 | 0 | if( m_slot != &m_chunk->m_slots.back() ) |
321 | 0 | { |
322 | 0 | ++m_slot; |
323 | 0 | } |
324 | 0 | else |
325 | 0 | { |
326 | 0 | if( (Chunk*)m_chunk->m_next ) |
327 | 0 | { |
328 | 0 | m_chunk = m_chunk->m_next; |
329 | 0 | } |
330 | 0 | else |
331 | 0 | { |
332 | 0 | m_chunk = &m_chunk->m_firstChunk; |
333 | 0 | } |
334 | 0 | m_slot = &m_chunk->m_slots.front(); |
335 | 0 | } |
336 | 0 | return *this; |
337 | 0 | } |
338 | | |
339 | 0 | bool operator==( const Iterator& rhs ) const { return m_slot == rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer |
340 | 0 | bool operator!=( const Iterator& rhs ) const { return m_slot != rhs.m_slot; } // don't need to compare m_chunk, because m_slot is a pointer |
341 | | |
342 | 0 | Slot& operator*() { return *m_slot; } |
343 | | |
344 | 0 | bool isValid() const { return m_slot != nullptr && m_chunk != nullptr; } |
345 | | |
346 | | using iterator_category = std::forward_iterator_tag; |
347 | | using value_type = Slot; |
348 | | using pointer = Slot*; |
349 | | using reference = Slot&; |
350 | | using difference_type = ptrdiff_t; |
351 | | }; |
352 | | |
353 | 0 | ChunkedTaskQueue() = default; |
354 | | ~ChunkedTaskQueue() |
355 | 0 | { |
356 | 0 | Chunk* next = m_firstChunk.m_next; |
357 | 0 | while( next ) |
358 | 0 | { |
359 | 0 | Chunk* curr = next; |
360 | 0 | next = curr->m_next; |
361 | 0 | delete curr; |
362 | 0 | } |
363 | 0 | } |
364 | | |
365 | | ChunkedTaskQueue( const ChunkedTaskQueue& ) = delete; |
366 | | ChunkedTaskQueue( ChunkedTaskQueue&& ) = delete; |
367 | | |
368 | | // grow the queue by adding a chunk and return an iterator to the first new task-slot |
369 | | Iterator grow() |
370 | 0 | { |
371 | 0 | std::unique_lock<std::mutex> l( m_resizeMutex ); // prevent concurrent growth of the queue. Read access while growing is no problem |
372 | |
|
373 | 0 | m_lastChunk->m_next = new Chunk( &m_firstChunk ); |
374 | 0 | m_lastChunk = m_lastChunk->m_next; |
375 | |
|
376 | 0 | return Iterator{ &m_lastChunk->m_slots.front(), m_lastChunk }; |
377 | 0 | } |
378 | | |
379 | 0 | Iterator begin() { return Iterator{ &m_firstChunk.m_slots.front(), &m_firstChunk }; } |
380 | 0 | Iterator end() { return Iterator{ nullptr, nullptr }; } |
381 | | |
382 | | private: |
383 | | Chunk m_firstChunk{ &m_firstChunk }; |
384 | | Chunk* m_lastChunk = &m_firstChunk; |
385 | | |
386 | | std::mutex m_resizeMutex; |
387 | | }; |
388 | | |
389 | | |
390 | | public: |
391 | | NoMallocThreadPool( int numThreads = 1, const char *threadPoolName = nullptr, const VVEncCfg* encCfg = nullptr ); |
392 | | ~NoMallocThreadPool(); |
393 | | |
394 | | template<class TParam> |
395 | | bool addBarrierTask( bool ( *func )( int, TParam* ), |
396 | | TParam* param, |
397 | | WaitCounter* counter = nullptr, |
398 | | Barrier* done = nullptr, |
399 | | const CBarrierVec&& barriers = {}, |
400 | | bool ( *readyCheck )( int, TParam* ) = nullptr ) |
401 | 0 | { |
402 | 0 | if( m_threads.empty() ) |
403 | 0 | { |
404 | | // if singlethreaded, execute all pending tasks |
405 | 0 | if( m_nextFillSlot != m_tasks.begin() ) |
406 | 0 | { |
407 | 0 | processTasksOnMainThread(); |
408 | 0 | } |
409 | | |
410 | | // when no barriers block this task, execute it directly |
411 | 0 | if( std::none_of( barriers.begin(), barriers.end(), []( const Barrier* b ) { return b && b->isBlocked(); } )Unexecuted instantiation: MCTF.cpp:vvenc::NoMallocThreadPool::addBarrierTask<vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams>(bool (*)(int, vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams*), vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams*))::{lambda(vvenc::Barrier const*)#1}::operator()(vvenc::Barrier const*) constUnexecuted instantiation: MCTF.cpp:vvenc::NoMallocThreadPool::addBarrierTask<vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams>(bool (*)(int, vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams*), vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams*))::{lambda(vvenc::Barrier const*)#1}::operator()(vvenc::Barrier const*) constUnexecuted instantiation: vvenc::NoMallocThreadPool::addBarrierTask<vvenc::FinishTaskParam>(bool (*)(int, vvenc::FinishTaskParam*), vvenc::FinishTaskParam*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::FinishTaskParam*))::{lambda(vvenc::Barrier const*)#1}::operator()(vvenc::Barrier const*) constUnexecuted instantiation: vvenc::NoMallocThreadPool::addBarrierTask<vvenc::CtuEncParam>(bool (*)(int, vvenc::CtuEncParam*), vvenc::CtuEncParam*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::CtuEncParam*))::{lambda(vvenc::Barrier const*)#1}::operator()(vvenc::Barrier const*) const |
412 | 0 | && ( !readyCheck || readyCheck( 0, param ) ) ) |
413 | 0 | { |
414 | 0 | if( func( 0, param ) ) |
415 | 0 | { |
416 | 0 | if( done != nullptr ) |
417 | 0 | { |
418 | 0 | done->unlock(); |
419 | 0 | } |
420 | 0 | return true; |
421 | 0 | } |
422 | 0 | } |
423 | 0 | } |
424 | | |
425 | 0 | while( true ) |
426 | 0 | { |
427 | 0 | #if ADD_TASK_THREAD_SAFE |
428 | 0 | std::unique_lock<std::mutex> l(m_nextFillSlotMutex); |
429 | 0 | #endif |
430 | 0 | CHECKD( !m_nextFillSlot.isValid(), "Next fill slot iterator should always be valid" ); |
431 | 0 | const auto startIt = m_nextFillSlot; |
432 | |
|
433 | 0 | #if ADD_TASK_THREAD_SAFE |
434 | 0 | l.unlock(); |
435 | 0 | #endif |
436 | |
|
437 | 0 | bool first = true; |
438 | 0 | for( auto it = startIt; it != startIt || first; it.incWrap() ) |
439 | 0 | { |
440 | | #if ENABLE_VALGRIND_CODE |
441 | | MutexLock lock( m_extraMutex ); |
442 | | #endif |
443 | |
|
444 | 0 | first = false; |
445 | |
|
446 | 0 | auto& t = *it; |
447 | 0 | auto expected = FREE; |
448 | 0 | if( t.state.load( std::memory_order_relaxed ) == FREE && t.state.compare_exchange_strong( expected, PREPARING ) ) |
449 | 0 | { |
450 | 0 | if( counter ) |
451 | 0 | { |
452 | 0 | counter->operator++(); |
453 | 0 | } |
454 | |
|
455 | 0 | t.func = (TaskFunc)func; |
456 | 0 | t.readyCheck = (TaskFunc)readyCheck; |
457 | 0 | t.param = param; |
458 | 0 | t.done = done; |
459 | 0 | t.counter = counter; |
460 | 0 | t.barriers = std::move( barriers ); |
461 | 0 | t.state = WAITING; |
462 | |
|
463 | 0 | #if ADD_TASK_THREAD_SAFE |
464 | 0 | l.lock(); |
465 | 0 | #endif |
466 | 0 | m_nextFillSlot.incWrap(); |
467 | 0 | return true; |
468 | 0 | } |
469 | 0 | } |
470 | | |
471 | 0 | #if ADD_TASK_THREAD_SAFE |
472 | 0 | l.lock(); |
473 | 0 | #endif |
474 | 0 | m_nextFillSlot = m_tasks.grow(); |
475 | 0 | } |
476 | 0 | return false; |
477 | 0 | } Unexecuted instantiation: MCTF.cpp:bool vvenc::NoMallocThreadPool::addBarrierTask<vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams>(bool (*)(int, vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams*), vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::MCTF::motionEstimationLuma(vvenc::Array2D<vvenc::MotionVector>&, vvenc::PelStorage const&, vvenc::PelStorage const&, int, vvenc::Array2D<vvenc::MotionVector> const*, int, bool) const::EstParams*)) Unexecuted instantiation: MCTF.cpp:bool vvenc::NoMallocThreadPool::addBarrierTask<vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams>(bool (*)(int, vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams*), vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::MCTF::bilateralFilter(vvenc::PelStorage const&, std::__1::deque<vvenc::TemporalFilterSourcePicInfo, std::__1::allocator<vvenc::TemporalFilterSourcePicInfo> >&, vvenc::PelStorage&, double) const::FltParams*)) Unexecuted instantiation: bool vvenc::NoMallocThreadPool::addBarrierTask<vvenc::FinishTaskParam>(bool (*)(int, vvenc::FinishTaskParam*), vvenc::FinishTaskParam*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::FinishTaskParam*)) Unexecuted instantiation: bool vvenc::NoMallocThreadPool::addBarrierTask<vvenc::CtuEncParam>(bool (*)(int, vvenc::CtuEncParam*), vvenc::CtuEncParam*, vvenc::WaitCounter*, vvenc::Barrier*, std::__1::vector<vvenc::Barrier const*, std::__1::allocator<vvenc::Barrier const*> > const&&, bool (*)(int, vvenc::CtuEncParam*)) |
478 | | |
479 | | bool processTasksOnMainThread(); |
480 | | |
481 | | void shutdown( bool block ); |
482 | | void waitForThreads(); |
483 | | |
484 | 0 | int numThreads() const { return (int)m_threads.size(); } |
485 | | #if ENABLE_TIME_PROFILING_MT_MODE |
486 | | const std::vector< TProfiler* >& getProfilers() { return profilers; } |
487 | | #endif |
488 | | |
489 | | private: |
490 | | #ifdef HAVE_PTHREADS |
491 | | struct PThread |
492 | | { |
493 | | PThread() = default; |
494 | | ~PThread() = default; |
495 | | |
496 | | PThread( const PThread& ) = delete; |
497 | | PThread& operator=( const PThread& ) = delete; |
498 | | |
499 | 0 | PThread( PThread&& other ) { *this = std::move( other ); }; |
500 | | PThread& operator=( PThread&& other ); |
501 | | |
502 | | template<class TFunc, class... TArgs> |
503 | | PThread( TFunc&& func, TArgs&&... args ); |
504 | | |
505 | 0 | bool joinable() { return m_joinable; } |
506 | | void join(); |
507 | | |
508 | | private: |
509 | | pthread_t m_id = 0; |
510 | | bool m_joinable = false; |
511 | | }; |
512 | | #endif // HAVE_PTHREADS |
513 | | |
514 | | #if HAVE_PTHREADS |
515 | | using ThreadImpl = PThread; |
516 | | #else |
517 | | using ThreadImpl = std::thread; |
518 | | #endif |
519 | | |
520 | | using TaskIterator = ChunkedTaskQueue::Iterator; |
521 | | |
522 | | // members |
523 | | std::string m_poolName; |
524 | | std::atomic_bool m_exitThreads{ false }; |
525 | | std::vector<ThreadImpl> m_threads; |
526 | | ChunkedTaskQueue m_tasks; |
527 | | TaskIterator m_nextFillSlot = m_tasks.begin(); |
528 | | #if ADD_TASK_THREAD_SAFE |
529 | | std::mutex m_nextFillSlotMutex; |
530 | | #endif |
531 | | std::mutex m_idleMutex; |
532 | | std::atomic_uint m_waitingThreads{ 0 }; |
533 | | #if ENABLE_VALGRIND_CODE |
534 | | std::mutex m_extraMutex; |
535 | | #endif |
536 | | #if ENABLE_TIME_PROFILING_MT_MODE |
537 | | std::vector< TProfiler* > profilers; |
538 | | #endif |
539 | | |
540 | | // internal functions |
541 | | void threadProc ( int threadId, const VVEncCfg& encCfg ); |
542 | | TaskIterator findNextTask( int threadId, TaskIterator startSearch ); |
543 | | bool processTask ( int threadId, Slot& task ); |
544 | | }; |
545 | | |
546 | | } // namespace vvenc |
547 | | |
548 | | //! \} |
549 | | |