Coverage Report

Created: 2025-06-16 07:00

/src/libjxl/lib/threads/thread_parallel_runner_internal.cc
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) the JPEG XL Project Authors. All rights reserved.
2
//
3
// Use of this source code is governed by a BSD-style
4
// license that can be found in the LICENSE file.
5
6
#include "lib/threads/thread_parallel_runner_internal.h"
7
8
#include <jxl/parallel_runner.h>
9
#include <jxl/types.h>
10
11
#include <algorithm>
12
#include <atomic>
13
#include <cstddef>
14
#include <cstdint>
15
#include <mutex>
16
#include <thread>
17
18
#include "lib/jxl/base/compiler_specific.h"
19
20
namespace jpegxl {
21
22
// static
23
JxlParallelRetCode ThreadParallelRunner::Runner(
24
    void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
25
65.5k
    JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
26
65.5k
  ThreadParallelRunner* self =
27
65.5k
      static_cast<ThreadParallelRunner*>(runner_opaque);
28
65.5k
  if (start_range > end_range) return JXL_PARALLEL_RET_RUNNER_ERROR;
29
65.5k
  if (start_range == end_range) return JXL_PARALLEL_RET_SUCCESS;
30
31
65.5k
  int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
32
65.5k
  if (ret != JXL_PARALLEL_RET_SUCCESS) return ret;
33
34
  // Use a sequential run when num_worker_threads_ is zero since we have no
35
  // worker threads.
36
65.5k
  if (self->num_worker_threads_ == 0) {
37
0
    const size_t thread = 0;
38
0
    for (uint32_t task = start_range; task < end_range; ++task) {
39
0
      func(jpegxl_opaque, task, thread);
40
0
    }
41
0
    return JXL_PARALLEL_RET_SUCCESS;
42
0
  }
43
44
65.5k
  if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
45
0
    return JXL_PARALLEL_RET_RUNNER_ERROR;  // Must not re-enter.
46
0
  }
47
48
65.5k
  const WorkerCommand worker_command =
49
65.5k
      (static_cast<WorkerCommand>(start_range) << 32) + end_range;
50
  // Ensure the inputs do not result in a reserved command.
51
65.5k
  if ((worker_command == kWorkerWait) || (worker_command == kWorkerOnce) ||
52
65.5k
      (worker_command == kWorkerExit)) {
53
0
    return JXL_PARALLEL_RET_RUNNER_ERROR;
54
0
  }
55
56
65.5k
  self->data_func_ = func;
57
65.5k
  self->jpegxl_opaque_ = jpegxl_opaque;
58
65.5k
  self->num_reserved_.store(0, std::memory_order_relaxed);
59
60
65.5k
  self->StartWorkers(worker_command);
61
65.5k
  self->WorkersReadyBarrier();
62
63
65.5k
  if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
64
0
    return JXL_PARALLEL_RET_RUNNER_ERROR;
65
0
  }
66
65.5k
  return JXL_PARALLEL_RET_SUCCESS;
67
65.5k
}
68
69
// static
70
void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
71
                                    const WorkerCommand command,
72
65.5k
                                    const int thread) {
73
65.5k
  const uint32_t begin = command >> 32;
74
65.5k
  const uint32_t end = command & 0xFFFFFFFF;
75
65.5k
  const uint32_t num_tasks = end - begin;
76
65.5k
  const uint32_t num_worker_threads = self->num_worker_threads_;
77
78
  // OpenMP introduced several "schedule" strategies:
79
  // "single" (static assignment of exactly one chunk per thread): slower.
80
  // "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
81
  // "guided" (allocates k tasks, decreases k): computing k = remaining/n
82
  //   is faster than halving k each iteration. We prefer this strategy
83
  //   because it avoids user-specified parameters.
84
85
213k
  for (;;) {
86
#if JXL_FALSE
87
    // dynamic
88
    const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
89
#else
90
    // guided
91
213k
    const uint32_t num_reserved =
92
213k
        self->num_reserved_.load(std::memory_order_relaxed);
93
    // It is possible that more tasks are reserved than ready to run.
94
213k
    const uint32_t num_remaining =
95
213k
        num_tasks - std::min(num_reserved, num_tasks);
96
213k
    const uint32_t my_size =
97
213k
        std::max(num_remaining / (num_worker_threads * 4), 1u);
98
213k
#endif
99
213k
    const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
100
213k
                                          my_size, std::memory_order_relaxed);
101
213k
    const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
102
    // Another thread already reserved the last task.
103
213k
    if (my_begin >= my_end) {
104
65.5k
      break;
105
65.5k
    }
106
807k
    for (uint32_t task = my_begin; task < my_end; ++task) {
107
660k
      self->data_func_(self->jpegxl_opaque_, task, thread);
108
660k
    }
109
147k
  }
110
65.5k
}
111
112
// static
113
void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
114
11.7k
                                      const int thread) {
115
  // Until kWorkerExit command received:
116
77.3k
  for (;;) {
117
77.3k
    std::unique_lock<std::mutex> lock(self->mutex_);
118
    // Notify main thread that this thread is ready.
119
77.3k
    if (++self->workers_ready_ == self->num_threads_) {
120
77.3k
      self->workers_ready_cv_.notify_one();
121
77.3k
    }
122
77.3k
  RESUME_WAIT:
123
    // Wait for a command.
124
77.3k
    self->worker_start_cv_.wait(lock);
125
77.3k
    const WorkerCommand command = self->worker_start_command_;
126
77.3k
    switch (command) {
127
0
      case kWorkerWait:    // spurious wakeup:
128
0
        goto RESUME_WAIT;  // lock still held, avoid incrementing ready.
129
0
      case kWorkerOnce:
130
0
        lock.unlock();
131
0
        self->data_func_(self->jpegxl_opaque_, thread, thread);
132
0
        break;
133
11.7k
      case kWorkerExit:
134
11.7k
        return;  // exits thread
135
65.5k
      default:
136
65.5k
        lock.unlock();
137
65.5k
        RunRange(self, command, thread);
138
65.5k
        break;
139
77.3k
    }
140
77.3k
  }
141
11.7k
}
142
143
ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
144
11.7k
    : num_worker_threads_(num_worker_threads),
145
11.7k
      num_threads_(std::max(num_worker_threads, 1)) {
146
11.7k
  threads_.reserve(num_worker_threads_);
147
148
  // Suppress "unused-private-field" warning.
149
11.7k
  (void)padding1;
150
11.7k
  (void)padding2;
151
152
  // Safely handle spurious worker wakeups.
153
11.7k
  worker_start_command_ = kWorkerWait;
154
155
23.5k
  for (uint32_t i = 0; i < num_worker_threads_; ++i) {
156
11.7k
    threads_.emplace_back(ThreadFunc, this, i);
157
11.7k
  }
158
159
11.7k
  if (num_worker_threads_ != 0) {
160
11.7k
    WorkersReadyBarrier();
161
11.7k
  }
162
11.7k
}
163
164
11.7k
ThreadParallelRunner::~ThreadParallelRunner() {
165
11.7k
  if (num_worker_threads_ != 0) {
166
11.7k
    StartWorkers(kWorkerExit);
167
11.7k
  }
168
169
11.7k
  for (std::thread& thread : threads_) {
170
11.7k
    if (thread.joinable()) {
171
11.7k
      thread.join();
172
11.7k
    } else {
173
#if JXL_IS_DEBUG_BUILD
174
      JXL_PRINT_STACK_TRACE();
175
      JXL_CRASH();
176
#endif
177
0
    }
178
11.7k
  }
179
11.7k
}
180
}  // namespace jpegxl