Coverage Report

Created: 2026-06-10 07:06

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/astc-encoder/Source/astcenc_internal_entry.h
Line
Count
Source
1
// SPDX-License-Identifier: Apache-2.0
2
// ----------------------------------------------------------------------------
3
// Copyright 2011-2026 Arm Limited
4
//
5
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
6
// use this file except in compliance with the License. You may obtain a copy
7
// of the License at:
8
//
9
//     http://www.apache.org/licenses/LICENSE-2.0
10
//
11
// Unless required by applicable law or agreed to in writing, software
12
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14
// License for the specific language governing permissions and limitations
15
// under the License.
16
// ----------------------------------------------------------------------------
17
18
/**
19
 * @brief Functions and data declarations for the outer context.
20
 *
21
 * The outer context includes thread-pool management, which is slower to
22
 * compile due to increased use of C++ stdlib. The inner context used in the
23
 * majority of the codec library does not include this.
24
 */
25
26
#ifndef ASTCENC_INTERNAL_ENTRY_INCLUDED
27
#define ASTCENC_INTERNAL_ENTRY_INCLUDED
28
29
#include <atomic>
30
#include <condition_variable>
31
#include <functional>
32
#include <mutex>
33
34
#include "astcenc_internal.h"
35
36
/* ============================================================================
37
  Parallel execution control
38
============================================================================ */
39
40
/**
41
 * @brief A simple counter-based manager for parallel task execution.
42
 *
43
 * The task processing execution consists of:
44
 *
45
 *     * A single-threaded init stage.
46
 *     * A multi-threaded processing stage.
47
 *     * A condition variable so threads can wait for processing completion.
48
 *
49
 * The init stage will be executed by the first thread to arrive in the critical section, there is
50
 * no main thread in the thread pool.
51
 *
52
 * The processing stage uses dynamic dispatch to assign task tickets to threads on an on-demand
53
 * basis. Threads may each therefore executed different numbers of tasks, depending on their
54
 * processing complexity. The task queue and the task tickets are just counters; the caller must map
55
 * these integers to an actual processing partition in a specific problem domain.
56
 *
57
 * The exit wait condition is needed to ensure processing has finished before a worker thread can
58
 * progress to the next stage of the pipeline. Specifically a worker may exit the processing stage
59
 * because there are no new tasks to assign to it while other worker threads are still processing.
60
 * Calling @c wait() will ensure that all other worker have finished before the thread can proceed.
61
 *
62
 * The basic usage model:
63
 *
64
 *     // --------- From single-threaded code ---------
65
 *
66
 *     // Reset the tracker state
67
 *     manager->reset()
68
 *
69
 *     // --------- From multi-threaded code ---------
70
 *
71
 *     // Run the stage init; only first thread actually runs the lambda
72
 *     manager->init(<lambda>)
73
 *
74
 *     do
75
 *     {
76
 *         // Request a task assignment
77
 *         uint task_count;
78
 *         uint base_index = manager->get_tasks(<granule>, task_count);
79
 *
80
 *         // Process any tasks we were given (task_count <= granule size)
81
 *         if (task_count)
82
 *         {
83
 *             // Run the user task processing code for N tasks here
84
 *             ...
85
 *
86
 *             // Flag these tasks as complete
87
 *             manager->complete_tasks(task_count);
88
 *         }
89
 *     } while (task_count);
90
 *
91
 *     // Wait for all threads to complete tasks before progressing
92
 *     manager->wait()
93
 *
94
  *     // Run the stage term; only first thread actually runs the lambda
95
 *     manager->term(<lambda>)
96
 */
97
class ParallelManager
98
{
99
private:
100
  /** @brief Lock used for critical section and condition synchronization. */
101
  std::mutex m_lock;
102
103
  /** @brief True if the current operation is cancelled. */
104
  std::atomic<bool> m_is_cancelled;
105
106
  /** @brief True if the stage init() step has been executed. */
107
  bool m_init_done;
108
109
  /** @brief True if the stage term() step has been executed. */
110
  bool m_term_done;
111
112
  /** @brief Condition variable for tracking stage processing completion. */
113
  std::condition_variable m_complete;
114
115
  /** @brief Number of tasks started, but not necessarily finished. */
116
  std::atomic<size_t> m_start_count;
117
118
  /** @brief Number of tasks finished. */
119
  size_t m_done_count;
120
121
  /** @brief Number of tasks that need to be processed. */
122
  size_t m_task_count;
123
124
  /** @brief Progress callback (optional). */
125
  astcenc_progress_callback m_callback;
126
127
  /** @brief Lock used for callback synchronization. */
128
  std::mutex m_callback_lock;
129
130
  /** @brief Minimum progress before making a callback. */
131
  float m_callback_min_diff;
132
133
  /** @brief Last progress callback value. */
134
  float m_callback_last_value;
135
136
public:
137
  /** @brief Create a new ParallelManager. */
138
  ParallelManager()
139
10.6k
  {
140
10.6k
    reset();
141
10.6k
  }
142
143
  /**
144
   * @brief Reset the tracker for a new processing batch.
145
   *
146
   * This must be called from single-threaded code before starting the multi-threaded processing
147
   * operations.
148
   */
149
  void reset()
150
16.1k
  {
151
16.1k
    m_init_done = false;
152
16.1k
    m_term_done = false;
153
16.1k
    m_is_cancelled = false;
154
16.1k
    m_start_count = 0;
155
16.1k
    m_done_count = 0;
156
16.1k
    m_task_count = 0;
157
16.1k
    m_callback = nullptr;
158
16.1k
    m_callback_last_value = 0.0f;
159
16.1k
    m_callback_min_diff = 1.0f;
160
16.1k
  }
161
162
  /**
163
   * @brief Clear the tracker and stop new tasks being assigned.
164
   *
165
   * Note, all in-flight tasks in a worker will still complete normally.
166
   */
167
  void cancel()
168
0
  {
169
0
    m_is_cancelled = true;
170
0
  }
171
172
  /**
173
   * @brief Trigger the pipeline stage init step.
174
   *
175
   * This can be called from multi-threaded code. The first thread to hit this will process the
176
   * initialization. Other threads will block and wait for it to complete.
177
   *
178
   * @param init_func   Callable which executes the stage initialization. It must return the
179
   *                    total number of tasks in the stage.
180
   */
181
  void init(std::function<size_t(void)> init_func)
182
0
  {
183
0
    std::lock_guard<std::mutex> lck(m_lock);
184
0
    if (!m_init_done)
185
0
    {
186
0
      m_task_count = init_func();
187
0
      m_init_done = true;
188
0
    }
189
0
  }
190
191
  /**
192
   * @brief Trigger the pipeline stage init step.
193
   *
194
   * This can be called from multi-threaded code. The first thread to hit this will process the
195
   * initialization. Other threads will block and wait for it to complete.
196
   *
197
   * @param task_count   Total number of tasks needing processing.
198
   * @param callback     Function pointer for progress status callbacks.
199
   */
200
  void init(size_t task_count, astcenc_progress_callback callback)
201
3.29k
  {
202
3.29k
    std::lock_guard<std::mutex> lck(m_lock);
203
3.29k
    if (!m_init_done)
204
3.29k
    {
205
3.29k
      m_callback = callback;
206
3.29k
      m_task_count = task_count;
207
3.29k
      m_init_done = true;
208
209
      // Report every 1% or 4096 blocks, whichever is larger, to avoid callback overhead
210
3.29k
      float min_diff = (4096.0f / static_cast<float>(task_count)) * 100.0f;
211
3.29k
      m_callback_min_diff = astc::max(min_diff, 1.0f);
212
3.29k
    }
213
3.29k
  }
214
215
  /**
216
   * @brief Request a task assignment.
217
   *
218
   * Assign up to @c granule tasks to the caller for processing.
219
   *
220
   * @param      granule   Maximum number of tasks that can be assigned.
221
   * @param[out] count     Actual number of tasks assigned, or zero if no tasks were assigned.
222
   *
223
   * @return Task index of the first assigned task; assigned tasks increment from this.
224
   */
225
  size_t get_task_assignment(size_t granule, size_t& count)
226
6.58k
  {
227
6.58k
    size_t base = m_start_count.fetch_add(granule, std::memory_order_relaxed);
228
6.58k
    if (m_is_cancelled || base >= m_task_count)
229
3.29k
    {
230
3.29k
      count = 0;
231
3.29k
      return 0;
232
3.29k
    }
233
234
3.29k
    count = astc::min(m_task_count - base, granule);
235
3.29k
    return base;
236
6.58k
  }
237
238
  /**
239
   * @brief Complete a task assignment.
240
   *
241
   * Mark @c count tasks as complete. This will notify all threads blocked on @c wait() if this
242
   * completes the processing of the stage.
243
   *
244
   * @param count   The number of completed tasks.
245
   */
246
  void complete_task_assignment(size_t count)
247
3.29k
  {
248
    // Note: m_done_count cannot use an atomic without the mutex; this has a race between the
249
    // update here and the wait() for other threads
250
3.29k
    size_t local_count;
251
3.29k
    float local_last_value;
252
3.29k
    {
253
3.29k
      std::unique_lock<std::mutex> lck(m_lock);
254
3.29k
      m_done_count += count;
255
3.29k
      local_count = m_done_count;
256
3.29k
      local_last_value = m_callback_last_value;
257
258
      // Ensure the progress bar hits 100%
259
3.29k
      if (m_callback && m_done_count == m_task_count)
260
0
      {
261
0
        std::unique_lock<std::mutex> cblck(m_callback_lock);
262
0
        m_callback(100.0f);
263
0
        m_callback_last_value = 100.0f;
264
0
      }
265
266
      // Notify if nothing left to do
267
3.29k
      if (m_is_cancelled || m_done_count == m_task_count)
268
3.29k
      {
269
3.29k
        lck.unlock();
270
3.29k
        m_complete.notify_all();
271
3.29k
      }
272
3.29k
    }
273
274
    // Process progress callback if we have one
275
3.29k
    if (m_callback)
276
0
    {
277
      // Initial lockless test - have we progressed enough to emit?
278
0
      float num = static_cast<float>(local_count);
279
0
      float den = static_cast<float>(m_task_count);
280
0
      float this_value =  (num / den) * 100.0f;
281
0
      bool report_test = (this_value - local_last_value) > m_callback_min_diff;
282
283
      // Recheck under lock, because another thread might report first
284
0
      if (report_test)
285
0
      {
286
0
        std::unique_lock<std::mutex> cblck(m_callback_lock);
287
0
        bool report_retest = (this_value - m_callback_last_value) > m_callback_min_diff;
288
0
        if (report_retest)
289
0
        {
290
0
          m_callback(this_value);
291
0
          m_callback_last_value = this_value;
292
0
        }
293
0
      }
294
0
    }
295
3.29k
  }
296
297
  /**
298
   * @brief Wait for stage processing to complete.
299
   */
300
  void wait()
301
4.46k
  {
302
4.46k
    std::unique_lock<std::mutex> lck(m_lock);
303
4.46k
    m_complete.wait(lck, [this]{ return m_is_cancelled || m_done_count == m_task_count; });
304
4.46k
  }
305
306
  /**
307
   * @brief Trigger the pipeline stage term step.
308
   *
309
   * This can be called from multi-threaded code. The first thread to hit this will process the
310
   * work pool termination. Caller must have called @c wait() prior to calling this function to
311
   * ensure that processing is complete.
312
   *
313
   * @param term_func   Callable which executes the stage termination.
314
   */
315
  void term(std::function<void(void)> term_func)
316
2.23k
  {
317
2.23k
    std::lock_guard<std::mutex> lck(m_lock);
318
2.23k
    if (!m_term_done)
319
2.23k
    {
320
2.23k
      term_func();
321
2.23k
      m_term_done = true;
322
2.23k
    }
323
2.23k
  }
324
};
325
326
/**
327
 * @brief The astcenc compression context.
328
 */
329
struct astcenc_context
330
{
331
  /** @brief The context internal state. */
332
  astcenc_contexti context;
333
334
#if !defined(ASTCENC_DECOMPRESS_ONLY)
335
  /** @brief The parallel manager for averages computation. */
336
  ParallelManager manage_avg;
337
338
  /** @brief The parallel manager for compression. */
339
  ParallelManager manage_compress;
340
#endif
341
342
  /** @brief The parallel manager for decompression. */
343
  ParallelManager manage_decompress;
344
};
345
346
#endif