/src/libjxl/lib/threads/thread_parallel_runner_internal.cc
Line | Count | Source |
1 | | // Copyright (c) the JPEG XL Project Authors. All rights reserved. |
2 | | // |
3 | | // Use of this source code is governed by a BSD-style |
4 | | // license that can be found in the LICENSE file. |
5 | | |
6 | | #include "lib/threads/thread_parallel_runner_internal.h" |
7 | | |
8 | | #include <jxl/parallel_runner.h> |
9 | | #include <jxl/types.h> |
10 | | |
11 | | #include <algorithm> |
12 | | #include <atomic> |
13 | | #include <cstddef> |
14 | | #include <cstdint> |
15 | | #include <mutex> |
16 | | #include <thread> |
17 | | |
18 | | namespace jpegxl { |
19 | | |
20 | | // static |
21 | | JxlParallelRetCode ThreadParallelRunner::Runner( |
22 | | void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init, |
23 | 344k | JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) { |
24 | 344k | ThreadParallelRunner* self = |
25 | 344k | static_cast<ThreadParallelRunner*>(runner_opaque); |
26 | 344k | if (start_range > end_range) return JXL_PARALLEL_RET_RUNNER_ERROR; |
27 | 344k | if (start_range == end_range) return JXL_PARALLEL_RET_SUCCESS; |
28 | | |
29 | 344k | int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1)); |
30 | 344k | if (ret != JXL_PARALLEL_RET_SUCCESS) return ret; |
31 | | |
32 | | // Use a sequential run when num_worker_threads_ is zero since we have no |
33 | | // worker threads. |
34 | 342k | if (self->num_worker_threads_ == 0) { |
35 | 0 | const size_t thread = 0; |
36 | 0 | for (uint32_t task = start_range; task < end_range; ++task) { |
37 | 0 | func(jpegxl_opaque, task, thread); |
38 | 0 | } |
39 | 0 | return JXL_PARALLEL_RET_SUCCESS; |
40 | 0 | } |
41 | | |
42 | 342k | if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) { |
43 | 0 | return JXL_PARALLEL_RET_RUNNER_ERROR; // Must not re-enter. |
44 | 0 | } |
45 | | |
46 | 342k | const WorkerCommand worker_command = |
47 | 342k | (static_cast<WorkerCommand>(start_range) << 32) + end_range; |
48 | | // Ensure the inputs do not result in a reserved command. |
49 | 342k | if ((worker_command == kWorkerWait) || (worker_command == kWorkerOnce) || |
50 | 342k | (worker_command == kWorkerExit)) { |
51 | 0 | return JXL_PARALLEL_RET_RUNNER_ERROR; |
52 | 0 | } |
53 | | |
54 | 342k | self->data_func_ = func; |
55 | 342k | self->jpegxl_opaque_ = jpegxl_opaque; |
56 | 342k | self->num_reserved_.store(0, std::memory_order_relaxed); |
57 | | |
58 | 342k | self->StartWorkers(worker_command); |
59 | 342k | self->WorkersReadyBarrier(); |
60 | | |
61 | 342k | if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) { |
62 | 0 | return JXL_PARALLEL_RET_RUNNER_ERROR; |
63 | 0 | } |
64 | 342k | return JXL_PARALLEL_RET_SUCCESS; |
65 | 342k | } |
66 | | |
67 | | // static |
68 | | void ThreadParallelRunner::RunRange(ThreadParallelRunner* self, |
69 | | const WorkerCommand command, |
70 | 342k | const int thread) { |
71 | 342k | const uint32_t begin = command >> 32; |
72 | 342k | const uint32_t end = command & 0xFFFFFFFF; |
73 | 342k | const uint32_t num_tasks = end - begin; |
74 | 342k | const uint32_t num_worker_threads = self->num_worker_threads_; |
75 | | |
76 | | // OpenMP introduced several "schedule" strategies: |
77 | | // "single" (static assignment of exactly one chunk per thread): slower. |
78 | | // "dynamic" (allocates k tasks at a time): competitive for well-chosen k. |
79 | | // "guided" (allocates k tasks, decreases k): computing k = remaining/n |
80 | | // is faster than halving k each iteration. We prefer this strategy |
81 | | // because it avoids user-specified parameters. |
82 | | |
83 | 1.89M | for (;;) { |
84 | | #if JXL_FALSE |
85 | | // dynamic |
86 | | const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1); |
87 | | #else |
88 | | // guided |
89 | 1.89M | const uint32_t num_reserved = |
90 | 1.89M | self->num_reserved_.load(std::memory_order_relaxed); |
91 | | // It is possible that more tasks are reserved than ready to run. |
92 | 1.89M | const uint32_t num_remaining = |
93 | 1.89M | num_tasks - std::min(num_reserved, num_tasks); |
94 | 1.89M | const uint32_t my_size = |
95 | 1.89M | std::max(num_remaining / (num_worker_threads * 4), 1u); |
96 | 1.89M | #endif |
97 | 1.89M | const uint32_t my_begin = begin + self->num_reserved_.fetch_add( |
98 | 1.89M | my_size, std::memory_order_relaxed); |
99 | 1.89M | const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks); |
100 | | // Another thread already reserved the last task. |
101 | 1.89M | if (my_begin >= my_end) { |
102 | 342k | break; |
103 | 342k | } |
104 | 15.9M | for (uint32_t task = my_begin; task < my_end; ++task) { |
105 | 14.4M | self->data_func_(self->jpegxl_opaque_, task, thread); |
106 | 14.4M | } |
107 | 1.55M | } |
108 | 342k | } |
109 | | |
110 | | // static |
111 | | void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self, |
112 | 524k | const int thread) { |
113 | | // Until kWorkerExit command received: |
114 | 867k | for (;;) { |
115 | 867k | std::unique_lock<std::mutex> lock(self->mutex_); |
116 | | // Notify main thread that this thread is ready. |
117 | 867k | if (++self->workers_ready_ == self->num_threads_) { |
118 | 867k | self->workers_ready_cv_.notify_one(); |
119 | 867k | } |
120 | 867k | RESUME_WAIT: |
121 | | // Wait for a command. |
122 | 867k | self->worker_start_cv_.wait(lock); |
123 | 867k | const WorkerCommand command = self->worker_start_command_; |
124 | 867k | switch (command) { |
125 | 0 | case kWorkerWait: // spurious wakeup: |
126 | 0 | goto RESUME_WAIT; // lock still held, avoid incrementing ready. |
127 | 0 | case kWorkerOnce: |
128 | 0 | lock.unlock(); |
129 | 0 | self->data_func_(self->jpegxl_opaque_, thread, thread); |
130 | 0 | break; |
131 | 524k | case kWorkerExit: |
132 | 524k | return; // exits thread |
133 | 342k | default: |
134 | 342k | lock.unlock(); |
135 | 342k | RunRange(self, command, thread); |
136 | 342k | break; |
137 | 867k | } |
138 | 867k | } |
139 | 524k | } |
140 | | |
141 | | ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads) |
142 | 524k | : num_worker_threads_(num_worker_threads), |
143 | 524k | num_threads_(std::max(num_worker_threads, 1)) { |
144 | 524k | threads_.reserve(num_worker_threads_); |
145 | | |
146 | | // Suppress "unused-private-field" warning. |
147 | 524k | (void)padding1; |
148 | 524k | (void)padding2; |
149 | | |
150 | | // Safely handle spurious worker wakeups. |
151 | 524k | worker_start_command_ = kWorkerWait; |
152 | | |
153 | 1.04M | for (uint32_t i = 0; i < num_worker_threads_; ++i) { |
154 | 524k | threads_.emplace_back(ThreadFunc, this, i); |
155 | 524k | } |
156 | | |
157 | 524k | if (num_worker_threads_ != 0) { |
158 | 524k | WorkersReadyBarrier(); |
159 | 524k | } |
160 | 524k | } |
161 | | |
162 | 524k | ThreadParallelRunner::~ThreadParallelRunner() { |
163 | 524k | if (num_worker_threads_ != 0) { |
164 | 524k | StartWorkers(kWorkerExit); |
165 | 524k | } |
166 | | |
167 | 524k | for (std::thread& thread : threads_) { |
168 | 524k | if (!thread.joinable()) { |
169 | | // Should not ever happen. |
170 | | // TODO(eustas): do we need to log some alert? |
171 | 524k | } else { |
172 | 524k | thread.join(); |
173 | 524k | } |
174 | 524k | } |
175 | 524k | } |
176 | | } // namespace jpegxl |