/src/astc-encoder/Source/astcenc_internal_entry.h
Line | Count | Source |
1 | | // SPDX-License-Identifier: Apache-2.0 |
2 | | // ---------------------------------------------------------------------------- |
3 | | // Copyright 2011-2026 Arm Limited |
4 | | // |
5 | | // Licensed under the Apache License, Version 2.0 (the "License"); you may not |
6 | | // use this file except in compliance with the License. You may obtain a copy |
7 | | // of the License at: |
8 | | // |
9 | | // http://www.apache.org/licenses/LICENSE-2.0 |
10 | | // |
11 | | // Unless required by applicable law or agreed to in writing, software |
12 | | // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
13 | | // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
14 | | // License for the specific language governing permissions and limitations |
15 | | // under the License. |
16 | | // ---------------------------------------------------------------------------- |
17 | | |
18 | | /** |
19 | | * @brief Functions and data declarations for the outer context. |
20 | | * |
21 | | * The outer context includes thread-pool management, which is slower to |
22 | | * compile due to increased use of C++ stdlib. The inner context used in the |
23 | | * majority of the codec library does not include this. |
24 | | */ |
25 | | |
26 | | #ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED |
27 | | #define ASTCENC_INTERNAL_ENTRY_INCLUDED |
28 | | |
29 | | #include <atomic> |
30 | | #include <condition_variable> |
31 | | #include <functional> |
32 | | #include <mutex> |
33 | | |
34 | | #include "astcenc_internal.h" |
35 | | |
36 | | /* ============================================================================ |
37 | | Parallel execution control |
38 | | ============================================================================ */ |
39 | | |
40 | | /** |
41 | | * @brief A simple counter-based manager for parallel task execution. |
42 | | * |
43 | | * The task processing execution consists of: |
44 | | * |
45 | | * * A single-threaded init stage. |
46 | | * * A multi-threaded processing stage. |
47 | | * * A condition variable so threads can wait for processing completion. |
48 | | * |
49 | | * The init stage will be executed by the first thread to arrive in the critical section, there is |
50 | | * no main thread in the thread pool. |
51 | | * |
52 | | * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand |
53 | | * basis. Threads may each therefore executed different numbers of tasks, depending on their |
54 | | * processing complexity. The task queue and the task tickets are just counters; the caller must map |
55 | | * these integers to an actual processing partition in a specific problem domain. |
56 | | * |
57 | | * The exit wait condition is needed to ensure processing has finished before a worker thread can |
58 | | * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage |
59 | | * because there are no new tasks to assign to it while other worker threads are still processing. |
60 | | * Calling @c wait() will ensure that all other worker have finished before the thread can proceed. |
61 | | * |
62 | | * The basic usage model: |
63 | | * |
64 | | * // --------- From single-threaded code --------- |
65 | | * |
66 | | * // Reset the tracker state |
67 | | * manager->reset() |
68 | | * |
69 | | * // --------- From multi-threaded code --------- |
70 | | * |
71 | | * // Run the stage init; only first thread actually runs the lambda |
72 | | * manager->init(<lambda>) |
73 | | * |
74 | | * do |
75 | | * { |
76 | | * // Request a task assignment |
77 | | * uint task_count; |
78 | | * uint base_index = manager->get_tasks(<granule>, task_count); |
79 | | * |
80 | | * // Process any tasks we were given (task_count <= granule size) |
81 | | * if (task_count) |
82 | | * { |
83 | | * // Run the user task processing code for N tasks here |
84 | | * ... |
85 | | * |
86 | | * // Flag these tasks as complete |
87 | | * manager->complete_tasks(task_count); |
88 | | * } |
89 | | * } while (task_count); |
90 | | * |
91 | | * // Wait for all threads to complete tasks before progressing |
92 | | * manager->wait() |
93 | | * |
94 | | * // Run the stage term; only first thread actually runs the lambda |
95 | | * manager->term(<lambda>) |
96 | | */ |
97 | | class ParallelManager |
98 | | { |
99 | | private: |
100 | | /** @brief Lock used for critical section and condition synchronization. */ |
101 | | std::mutex m_lock; |
102 | | |
103 | | /** @brief True if the current operation is cancelled. */ |
104 | | std::atomic<bool> m_is_cancelled; |
105 | | |
106 | | /** @brief True if the stage init() step has been executed. */ |
107 | | bool m_init_done; |
108 | | |
109 | | /** @brief True if the stage term() step has been executed. */ |
110 | | bool m_term_done; |
111 | | |
112 | | /** @brief Condition variable for tracking stage processing completion. */ |
113 | | std::condition_variable m_complete; |
114 | | |
115 | | /** @brief Number of tasks started, but not necessarily finished. */ |
116 | | std::atomic<size_t> m_start_count; |
117 | | |
118 | | /** @brief Number of tasks finished. */ |
119 | | size_t m_done_count; |
120 | | |
121 | | /** @brief Number of tasks that need to be processed. */ |
122 | | size_t m_task_count; |
123 | | |
124 | | /** @brief Progress callback (optional). */ |
125 | | astcenc_progress_callback m_callback; |
126 | | |
127 | | /** @brief Lock used for callback synchronization. */ |
128 | | std::mutex m_callback_lock; |
129 | | |
130 | | /** @brief Minimum progress before making a callback. */ |
131 | | float m_callback_min_diff; |
132 | | |
133 | | /** @brief Last progress callback value. */ |
134 | | float m_callback_last_value; |
135 | | |
136 | | public: |
137 | | /** @brief Create a new ParallelManager. */ |
138 | | ParallelManager() |
139 | 10.6k | { |
140 | 10.6k | reset(); |
141 | 10.6k | } |
142 | | |
143 | | /** |
144 | | * @brief Reset the tracker for a new processing batch. |
145 | | * |
146 | | * This must be called from single-threaded code before starting the multi-threaded processing |
147 | | * operations. |
148 | | */ |
149 | | void reset() |
150 | 16.1k | { |
151 | 16.1k | m_init_done = false; |
152 | 16.1k | m_term_done = false; |
153 | 16.1k | m_is_cancelled = false; |
154 | 16.1k | m_start_count = 0; |
155 | 16.1k | m_done_count = 0; |
156 | 16.1k | m_task_count = 0; |
157 | 16.1k | m_callback = nullptr; |
158 | 16.1k | m_callback_last_value = 0.0f; |
159 | 16.1k | m_callback_min_diff = 1.0f; |
160 | 16.1k | } |
161 | | |
162 | | /** |
163 | | * @brief Clear the tracker and stop new tasks being assigned. |
164 | | * |
165 | | * Note, all in-flight tasks in a worker will still complete normally. |
166 | | */ |
167 | | void cancel() |
168 | 0 | { |
169 | 0 | m_is_cancelled = true; |
170 | 0 | } |
171 | | |
172 | | /** |
173 | | * @brief Trigger the pipeline stage init step. |
174 | | * |
175 | | * This can be called from multi-threaded code. The first thread to hit this will process the |
176 | | * initialization. Other threads will block and wait for it to complete. |
177 | | * |
178 | | * @param init_func Callable which executes the stage initialization. It must return the |
179 | | * total number of tasks in the stage. |
180 | | */ |
181 | | void init(std::function<size_t(void)> init_func) |
182 | 0 | { |
183 | 0 | std::lock_guard<std::mutex> lck(m_lock); |
184 | 0 | if (!m_init_done) |
185 | 0 | { |
186 | 0 | m_task_count = init_func(); |
187 | 0 | m_init_done = true; |
188 | 0 | } |
189 | 0 | } |
190 | | |
191 | | /** |
192 | | * @brief Trigger the pipeline stage init step. |
193 | | * |
194 | | * This can be called from multi-threaded code. The first thread to hit this will process the |
195 | | * initialization. Other threads will block and wait for it to complete. |
196 | | * |
197 | | * @param task_count Total number of tasks needing processing. |
198 | | * @param callback Function pointer for progress status callbacks. |
199 | | */ |
200 | | void init(size_t task_count, astcenc_progress_callback callback) |
201 | 3.29k | { |
202 | 3.29k | std::lock_guard<std::mutex> lck(m_lock); |
203 | 3.29k | if (!m_init_done) |
204 | 3.29k | { |
205 | 3.29k | m_callback = callback; |
206 | 3.29k | m_task_count = task_count; |
207 | 3.29k | m_init_done = true; |
208 | | |
209 | | // Report every 1% or 4096 blocks, whichever is larger, to avoid callback overhead |
210 | 3.29k | float min_diff = (4096.0f / static_cast<float>(task_count)) * 100.0f; |
211 | 3.29k | m_callback_min_diff = astc::max(min_diff, 1.0f); |
212 | 3.29k | } |
213 | 3.29k | } |
214 | | |
215 | | /** |
216 | | * @brief Request a task assignment. |
217 | | * |
218 | | * Assign up to @c granule tasks to the caller for processing. |
219 | | * |
220 | | * @param granule Maximum number of tasks that can be assigned. |
221 | | * @param[out] count Actual number of tasks assigned, or zero if no tasks were assigned. |
222 | | * |
223 | | * @return Task index of the first assigned task; assigned tasks increment from this. |
224 | | */ |
225 | | size_t get_task_assignment(size_t granule, size_t& count) |
226 | 6.58k | { |
227 | 6.58k | size_t base = m_start_count.fetch_add(granule, std::memory_order_relaxed); |
228 | 6.58k | if (m_is_cancelled || base >= m_task_count) |
229 | 3.29k | { |
230 | 3.29k | count = 0; |
231 | 3.29k | return 0; |
232 | 3.29k | } |
233 | | |
234 | 3.29k | count = astc::min(m_task_count - base, granule); |
235 | 3.29k | return base; |
236 | 6.58k | } |
237 | | |
238 | | /** |
239 | | * @brief Complete a task assignment. |
240 | | * |
241 | | * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this |
242 | | * completes the processing of the stage. |
243 | | * |
244 | | * @param count The number of completed tasks. |
245 | | */ |
246 | | void complete_task_assignment(size_t count) |
247 | 3.29k | { |
248 | | // Note: m_done_count cannot use an atomic without the mutex; this has a race between the |
249 | | // update here and the wait() for other threads |
250 | 3.29k | size_t local_count; |
251 | 3.29k | float local_last_value; |
252 | 3.29k | { |
253 | 3.29k | std::unique_lock<std::mutex> lck(m_lock); |
254 | 3.29k | m_done_count += count; |
255 | 3.29k | local_count = m_done_count; |
256 | 3.29k | local_last_value = m_callback_last_value; |
257 | | |
258 | | // Ensure the progress bar hits 100% |
259 | 3.29k | if (m_callback && m_done_count == m_task_count) |
260 | 0 | { |
261 | 0 | std::unique_lock<std::mutex> cblck(m_callback_lock); |
262 | 0 | m_callback(100.0f); |
263 | 0 | m_callback_last_value = 100.0f; |
264 | 0 | } |
265 | | |
266 | | // Notify if nothing left to do |
267 | 3.29k | if (m_is_cancelled || m_done_count == m_task_count) |
268 | 3.29k | { |
269 | 3.29k | lck.unlock(); |
270 | 3.29k | m_complete.notify_all(); |
271 | 3.29k | } |
272 | 3.29k | } |
273 | | |
274 | | // Process progress callback if we have one |
275 | 3.29k | if (m_callback) |
276 | 0 | { |
277 | | // Initial lockless test - have we progressed enough to emit? |
278 | 0 | float num = static_cast<float>(local_count); |
279 | 0 | float den = static_cast<float>(m_task_count); |
280 | 0 | float this_value = (num / den) * 100.0f; |
281 | 0 | bool report_test = (this_value - local_last_value) > m_callback_min_diff; |
282 | | |
283 | | // Recheck under lock, because another thread might report first |
284 | 0 | if (report_test) |
285 | 0 | { |
286 | 0 | std::unique_lock<std::mutex> cblck(m_callback_lock); |
287 | 0 | bool report_retest = (this_value - m_callback_last_value) > m_callback_min_diff; |
288 | 0 | if (report_retest) |
289 | 0 | { |
290 | 0 | m_callback(this_value); |
291 | 0 | m_callback_last_value = this_value; |
292 | 0 | } |
293 | 0 | } |
294 | 0 | } |
295 | 3.29k | } |
296 | | |
297 | | /** |
298 | | * @brief Wait for stage processing to complete. |
299 | | */ |
300 | | void wait() |
301 | 4.46k | { |
302 | 4.46k | std::unique_lock<std::mutex> lck(m_lock); |
303 | 4.46k | m_complete.wait(lck, [this]{ return m_is_cancelled || m_done_count == m_task_count; }); |
304 | 4.46k | } |
305 | | |
306 | | /** |
307 | | * @brief Trigger the pipeline stage term step. |
308 | | * |
309 | | * This can be called from multi-threaded code. The first thread to hit this will process the |
310 | | * work pool termination. Caller must have called @c wait() prior to calling this function to |
311 | | * ensure that processing is complete. |
312 | | * |
313 | | * @param term_func Callable which executes the stage termination. |
314 | | */ |
315 | | void term(std::function<void(void)> term_func) |
316 | 2.23k | { |
317 | 2.23k | std::lock_guard<std::mutex> lck(m_lock); |
318 | 2.23k | if (!m_term_done) |
319 | 2.23k | { |
320 | 2.23k | term_func(); |
321 | 2.23k | m_term_done = true; |
322 | 2.23k | } |
323 | 2.23k | } |
324 | | }; |
325 | | |
326 | | /** |
327 | | * @brief The astcenc compression context. |
328 | | */ |
329 | | struct astcenc_context |
330 | | { |
331 | | /** @brief The context internal state. */ |
332 | | astcenc_contexti context; |
333 | | |
334 | | #if !defined(ASTCENC_DECOMPRESS_ONLY) |
335 | | /** @brief The parallel manager for averages computation. */ |
336 | | ParallelManager manage_avg; |
337 | | |
338 | | /** @brief The parallel manager for compression. */ |
339 | | ParallelManager manage_compress; |
340 | | #endif |
341 | | |
342 | | /** @brief The parallel manager for decompression. */ |
343 | | ParallelManager manage_decompress; |
344 | | }; |
345 | | |
346 | | #endif |