Coverage Report

Created: 2026-06-30 07:12

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/libjxl/lib/threads/thread_parallel_runner_internal.cc
Line
Count
Source
1
// Copyright (c) the JPEG XL Project Authors. All rights reserved.
2
//
3
// Use of this source code is governed by a BSD-style
4
// license that can be found in the LICENSE file.
5
6
#include "lib/threads/thread_parallel_runner_internal.h"
7
8
#include <jxl/parallel_runner.h>
9
#include <jxl/types.h>
10
11
#include <algorithm>
12
#include <atomic>
13
#include <cstddef>
14
#include <cstdint>
15
#include <mutex>
16
#include <thread>
17
18
namespace jpegxl {
19
20
// static
21
JxlParallelRetCode ThreadParallelRunner::Runner(
22
    void* runner_opaque, void* jpegxl_opaque, JxlParallelRunInit init,
23
224k
    JxlParallelRunFunction func, uint32_t start_range, uint32_t end_range) {
24
224k
  ThreadParallelRunner* self =
25
224k
      static_cast<ThreadParallelRunner*>(runner_opaque);
26
224k
  if (start_range > end_range) return JXL_PARALLEL_RET_RUNNER_ERROR;
27
224k
  if (start_range == end_range) return JXL_PARALLEL_RET_SUCCESS;
28
29
224k
  int ret = init(jpegxl_opaque, std::max<size_t>(self->num_worker_threads_, 1));
30
224k
  if (ret != JXL_PARALLEL_RET_SUCCESS) return ret;
31
32
  // Use a sequential run when num_worker_threads_ is zero since we have no
33
  // worker threads.
34
224k
  if (self->num_worker_threads_ == 0) {
35
0
    const size_t thread = 0;
36
0
    for (uint32_t task = start_range; task < end_range; ++task) {
37
0
      func(jpegxl_opaque, task, thread);
38
0
    }
39
0
    return JXL_PARALLEL_RET_SUCCESS;
40
0
  }
41
42
224k
  if (self->depth_.fetch_add(1, std::memory_order_acq_rel) != 0) {
43
0
    return JXL_PARALLEL_RET_RUNNER_ERROR;  // Must not re-enter.
44
0
  }
45
46
224k
  const WorkerCommand worker_command =
47
224k
      (static_cast<WorkerCommand>(start_range) << 32) + end_range;
48
  // Ensure the inputs do not result in a reserved command.
49
224k
  if ((worker_command == kWorkerWait) || (worker_command == kWorkerOnce) ||
50
224k
      (worker_command == kWorkerExit)) {
51
0
    return JXL_PARALLEL_RET_RUNNER_ERROR;
52
0
  }
53
54
224k
  self->data_func_ = func;
55
224k
  self->jpegxl_opaque_ = jpegxl_opaque;
56
224k
  self->num_reserved_.store(0, std::memory_order_relaxed);
57
58
224k
  self->StartWorkers(worker_command);
59
224k
  self->WorkersReadyBarrier();
60
61
224k
  if (self->depth_.fetch_add(-1, std::memory_order_acq_rel) != 1) {
62
0
    return JXL_PARALLEL_RET_RUNNER_ERROR;
63
0
  }
64
224k
  return JXL_PARALLEL_RET_SUCCESS;
65
224k
}
66
67
// static
68
void ThreadParallelRunner::RunRange(ThreadParallelRunner* self,
69
                                    const WorkerCommand command,
70
224k
                                    const int thread) {
71
224k
  const uint32_t begin = command >> 32;
72
224k
  const uint32_t end = command & 0xFFFFFFFF;
73
224k
  const uint32_t num_tasks = end - begin;
74
224k
  const uint32_t num_worker_threads = self->num_worker_threads_;
75
76
  // OpenMP introduced several "schedule" strategies:
77
  // "single" (static assignment of exactly one chunk per thread): slower.
78
  // "dynamic" (allocates k tasks at a time): competitive for well-chosen k.
79
  // "guided" (allocates k tasks, decreases k): computing k = remaining/n
80
  //   is faster than halving k each iteration. We prefer this strategy
81
  //   because it avoids user-specified parameters.
82
83
1.22M
  for (;;) {
84
#if JXL_FALSE
85
    // dynamic
86
    const uint32_t my_size = std::max(num_tasks / (num_worker_threads * 4), 1);
87
#else
88
    // guided
89
1.22M
    const uint32_t num_reserved =
90
1.22M
        self->num_reserved_.load(std::memory_order_relaxed);
91
    // It is possible that more tasks are reserved than ready to run.
92
1.22M
    const uint32_t num_remaining =
93
1.22M
        num_tasks - std::min(num_reserved, num_tasks);
94
1.22M
    const uint32_t my_size =
95
1.22M
        std::max(num_remaining / (num_worker_threads * 4), 1u);
96
1.22M
#endif
97
1.22M
    const uint32_t my_begin = begin + self->num_reserved_.fetch_add(
98
1.22M
                                          my_size, std::memory_order_relaxed);
99
1.22M
    const uint32_t my_end = std::min(my_begin + my_size, begin + num_tasks);
100
    // Another thread already reserved the last task.
101
1.22M
    if (my_begin >= my_end) {
102
224k
      break;
103
224k
    }
104
8.99M
    for (uint32_t task = my_begin; task < my_end; ++task) {
105
7.98M
      self->data_func_(self->jpegxl_opaque_, task, thread);
106
7.98M
    }
107
1.00M
  }
108
224k
}
109
110
// static
111
void ThreadParallelRunner::ThreadFunc(ThreadParallelRunner* self,
112
20.1k
                                      const int thread) {
113
  // Until kWorkerExit command received:
114
244k
  for (;;) {
115
244k
    std::unique_lock<std::mutex> lock(self->mutex_);
116
    // Notify main thread that this thread is ready.
117
244k
    if (++self->workers_ready_ == self->num_threads_) {
118
244k
      self->workers_ready_cv_.notify_one();
119
244k
    }
120
244k
  RESUME_WAIT:
121
    // Wait for a command.
122
244k
    self->worker_start_cv_.wait(lock);
123
244k
    const WorkerCommand command = self->worker_start_command_;
124
244k
    switch (command) {
125
0
      case kWorkerWait:    // spurious wakeup:
126
0
        goto RESUME_WAIT;  // lock still held, avoid incrementing ready.
127
0
      case kWorkerOnce:
128
0
        lock.unlock();
129
0
        self->data_func_(self->jpegxl_opaque_, thread, thread);
130
0
        break;
131
20.1k
      case kWorkerExit:
132
20.1k
        return;  // exits thread
133
224k
      default:
134
224k
        lock.unlock();
135
224k
        RunRange(self, command, thread);
136
224k
        break;
137
244k
    }
138
244k
  }
139
20.1k
}
140
141
ThreadParallelRunner::ThreadParallelRunner(const int num_worker_threads)
142
20.1k
    : num_worker_threads_(num_worker_threads),
143
20.1k
      num_threads_(std::max(num_worker_threads, 1)) {
144
20.1k
  threads_.reserve(num_worker_threads_);
145
146
  // Suppress "unused-private-field" warning.
147
20.1k
  (void)padding1;
148
20.1k
  (void)padding2;
149
150
  // Safely handle spurious worker wakeups.
151
20.1k
  worker_start_command_ = kWorkerWait;
152
153
40.2k
  for (uint32_t i = 0; i < num_worker_threads_; ++i) {
154
20.1k
    threads_.emplace_back(ThreadFunc, this, i);
155
20.1k
  }
156
157
20.1k
  if (num_worker_threads_ != 0) {
158
20.1k
    WorkersReadyBarrier();
159
20.1k
  }
160
20.1k
}
161
162
20.1k
ThreadParallelRunner::~ThreadParallelRunner() {
163
20.1k
  if (num_worker_threads_ != 0) {
164
20.1k
    StartWorkers(kWorkerExit);
165
20.1k
  }
166
167
20.1k
  for (std::thread& thread : threads_) {
168
20.1k
    if (!thread.joinable()) {
169
      // Should not ever happen.
170
      // TODO(eustas): do we need to log some alert?
171
20.1k
    } else {
172
20.1k
      thread.join();
173
20.1k
    }
174
20.1k
  }
175
20.1k
}
176
}  // namespace jpegxl