/src/libjxl/lib/threads/thread_parallel_runner_internal.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) the JPEG XL Project Authors. All rights reserved. |
2 | | // |
3 | | // Use of this source code is governed by a BSD-style |
4 | | // license that can be found in the LICENSE file. |
5 | | |
6 | | #include "lib/threads/thread_parallel_runner_internal.h" |
7 | | |
8 | | #include <jxl/parallel_runner.h> |
9 | | #include <jxl/types.h> |
10 | | |
11 | | #include <algorithm> |
12 | | #include <atomic> |
13 | | #include <cstddef> |
14 | | #include <cstdint> |
15 | | #include <mutex> |
16 | | #include <thread> |
17 | | |
18 | | #include "lib/jxl/base/compiler_specific.h" |
19 | | |
20 | | namespace jpegxl { |
21 | | |
22 | | // static |
23 | | JxlParallelRetCode ThreadParallelRunner::Runner( |
24 | | void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init, |
25 | 65.5k | JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) { |
26 | 65.5k | ThreadParallelRunner* self = |
27 | 65.5k | static_cast<ThreadParallelRunner*>(runner_opaque); |
28 | 65.5k | if (start_range > end_range) return JXL_PARALLEL_RET_RUNNER_ERROR; |
29 | 65.5k | if (start_range == end_range) return JXL_PARALLEL_RET_SUCCESS; |
30 | | |
31 | 65.5k | int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1)); |
32 | 65.5k | if (ret != JXL_PARALLEL_RET_SUCCESS) return ret; |
33 | | |
34 | | // Use a sequential run when num_worker_threads_ is zero since we have no |
35 | | // worker threads. |
36 | 65.5k | if (self->num_worker_threads_ == 0) { |
37 | 0 | const size_t thread = 0; |
38 | 0 | for (uint32_t task = start_range; task < end_range; ++task) { |
39 | 0 | func(jpegxl_opaque, task, thread); |
40 | 0 | } |
41 | 0 | return JXL_PARALLEL_RET_SUCCESS; |
42 | 0 | } |
43 | | |
44 | 65.5k | if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) { |
45 | 0 | return JXL_PARALLEL_RET_RUNNER_ERROR; // Must not re-enter. |
46 | 0 | } |
47 | | |
48 | 65.5k | const WorkerCommand worker_command = |
49 | 65.5k | (static_cast<WorkerCommand>(start_range) << 32) + end_range; |
50 | | // Ensure the inputs do not result in a reserved command. |
51 | 65.5k | if ((worker_command == kWorkerWait) || (worker_command == kWorkerOnce) || |
52 | 65.5k | (worker_command == kWorkerExit)) { |
53 | 0 | return JXL_PARALLEL_RET_RUNNER_ERROR; |
54 | 0 | } |
55 | | |
56 | 65.5k | self->data_func_ = func; |
57 | 65.5k | self->jpegxl_opaque_ = jpegxl_opaque; |
58 | 65.5k | self->num_reserved_.store(0, std::memory_order_relaxed); |
59 | | |
60 | 65.5k | self->StartWorkers(worker_command); |
61 | 65.5k | self->WorkersReadyBarrier(); |
62 | | |
63 | 65.5k | if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) { |
64 | 0 | return JXL_PARALLEL_RET_RUNNER_ERROR; |
65 | 0 | } |
66 | 65.5k | return JXL_PARALLEL_RET_SUCCESS; |
67 | 65.5k | } |
68 | | |
69 | | // static |
70 | | void ThreadParallelRunner::RunRange(ThreadParallelRunner* self, |
71 | | const WorkerCommand command, |
72 | 65.5k | const int thread) { |
73 | 65.5k | const uint32_t begin = command >> 32; |
74 | 65.5k | const uint32_t end = command & 0xFFFFFFFF; |
75 | 65.5k | const uint32_t num_tasks = end - begin; |
76 | 65.5k | const uint32_t num_worker_threads = self->num_worker_threads_; |
77 | | |
78 | | // OpenMP introduced several "schedule" strategies: |
79 | | // "single" (static assignment of exactly one chunk per thread): slower. |
80 | | // "dynamic" (allocates k tasks at a time): competitive for well-chosen k. |
81 | | // "guided" (allocates k tasks, decreases k): computing k = remaining/n |
82 | | // is faster than halving k each iteration. We prefer this strategy |
83 | | // because it avoids user-specified parameters. |
84 | | |
85 | 213k | for (;;) { |
86 | | #if JXL_FALSE |
87 | | // dynamic |
88 | | const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1); |
89 | | #else |
90 | | // guided |
91 | 213k | const uint32_t num_reserved = |
92 | 213k | self->num_reserved_.load(std::memory_order_relaxed); |
93 | | // It is possible that more tasks are reserved than ready to run. |
94 | 213k | const uint32_t num_remaining = |
95 | 213k | num_tasks - std::min(num_reserved, num_tasks); |
96 | 213k | const uint32_t my_size = |
97 | 213k | std::max(num_remaining / (num_worker_threads * 4), 1u); |
98 | 213k | #endif |
99 | 213k | const uint32_t my_begin = begin + self->num_reserved_.fetch_add( |
100 | 213k | my_size, std::memory_order_relaxed); |
101 | 213k | const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks); |
102 | | // Another thread already reserved the last task. |
103 | 213k | if (my_begin >= my_end) { |
104 | 65.5k | break; |
105 | 65.5k | } |
106 | 807k | for (uint32_t task = my_begin; task < my_end; ++task) { |
107 | 660k | self->data_func_(self->jpegxl_opaque_, task, thread); |
108 | 660k | } |
109 | 147k | } |
110 | 65.5k | } |
111 | | |
112 | | // static |
113 | | void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self, |
114 | 11.7k | const int thread) { |
115 | | // Until kWorkerExit command received: |
116 | 77.3k | for (;;) { |
117 | 77.3k | std::unique_lock<std::mutex> lock(self->mutex_); |
118 | | // Notify main thread that this thread is ready. |
119 | 77.3k | if (++self->workers_ready_ == self->num_threads_) { |
120 | 77.3k | self->workers_ready_cv_.notify_one(); |
121 | 77.3k | } |
122 | 77.3k | RESUME_WAIT: |
123 | | // Wait for a command. |
124 | 77.3k | self->worker_start_cv_.wait(lock); |
125 | 77.3k | const WorkerCommand command = self->worker_start_command_; |
126 | 77.3k | switch (command) { |
127 | 0 | case kWorkerWait: // spurious wakeup: |
128 | 0 | goto RESUME_WAIT; // lock still held, avoid incrementing ready. |
129 | 0 | case kWorkerOnce: |
130 | 0 | lock.unlock(); |
131 | 0 | self->data_func_(self->jpegxl_opaque_, thread, thread); |
132 | 0 | break; |
133 | 11.7k | case kWorkerExit: |
134 | 11.7k | return; // exits thread |
135 | 65.5k | default: |
136 | 65.5k | lock.unlock(); |
137 | 65.5k | RunRange(self, command, thread); |
138 | 65.5k | break; |
139 | 77.3k | } |
140 | 77.3k | } |
141 | 11.7k | } |
142 | | |
143 | | ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads) |
144 | 11.7k | : num_worker_threads_(num_worker_threads), |
145 | 11.7k | num_threads_(std::max(num_worker_threads, 1)) { |
146 | 11.7k | threads_.reserve(num_worker_threads_); |
147 | | |
148 | | // Suppress "unused-private-field" warning. |
149 | 11.7k | (void)padding1; |
150 | 11.7k | (void)padding2; |
151 | | |
152 | | // Safely handle spurious worker wakeups. |
153 | 11.7k | worker_start_command_ = kWorkerWait; |
154 | | |
155 | 23.5k | for (uint32_t i = 0; i < num_worker_threads_; ++i) { |
156 | 11.7k | threads_.emplace_back(ThreadFunc, this, i); |
157 | 11.7k | } |
158 | | |
159 | 11.7k | if (num_worker_threads_ != 0) { |
160 | 11.7k | WorkersReadyBarrier(); |
161 | 11.7k | } |
162 | 11.7k | } |
163 | | |
164 | 11.7k | ThreadParallelRunner::~ThreadParallelRunner() { |
165 | 11.7k | if (num_worker_threads_ != 0) { |
166 | 11.7k | StartWorkers(kWorkerExit); |
167 | 11.7k | } |
168 | | |
169 | 11.7k | for (std::thread& thread : threads_) { |
170 | 11.7k | if (thread.joinable()) { |
171 | 11.7k | thread.join(); |
172 | 11.7k | } else { |
173 | | #if JXL_IS_DEBUG_BUILD |
174 | | JXL_PRINT_STACK_TRACE(); |
175 | | JXL_CRASH(); |
176 | | #endif |
177 | 0 | } |
178 | 11.7k | } |
179 | 11.7k | } |
180 | | } // namespace jpegxl |