Coverage Report

Created: 2025-06-16 07:00

/src/libjxl/lib/threads/thread_parallel_runner_internal.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) the JPEG XL Project Authors. All rights reserved.
2
//
3
// Use of this source code is governed by a BSD-style
4
// license that can be found in the LICENSE file.
5
//
6
7
// C++ implementation using std::thread of a ::JxlParallelRunner.
8
9
// The main class in this module, ThreadParallelRunner, implements a static
10
// method ThreadParallelRunner::Runner than can be passed as a
11
// JxlParallelRunner when using the JPEG XL library. This uses std::thread
12
// internally and related synchronization functions. The number of threads
13
// created is fixed at construction time and the threads are re-used for every
14
// ThreadParallelRunner::Runner call. Only one concurrent Runner() call per
15
// instance is allowed at a time.
16
//
17
// This is a scalable, lower-overhead thread pool runner, especially suitable
18
// for data-parallel computations in the fork-join model, where clients need to
19
// know when all tasks have completed.
20
//
21
// This thread pool can efficiently load-balance millions of tasks using an
22
// atomic counter, thus avoiding per-task virtual or system calls. With 48
23
// hyperthreads and 1M tasks that add to an atomic counter, overall runtime is
24
// 10-20x higher when using std::async, and ~200x for a queue-based thread
25
// pool.
26
//
27
// Usage:
28
//   ThreadParallelRunner runner;
29
//   JxlDecode(
30
//       ... , &ThreadParallelRunner::Runner, static_cast<void*>(&runner));
31
32
#ifndef LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
33
#define LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_
34
35
#include <jxl/memory_manager.h>
36
#include <jxl/parallel_runner.h>
37
38
#include <atomic>
39
#include <condition_variable>  //NOLINT
40
#include <cstddef>
41
#include <cstdint>
42
#include <cstdlib>
43
#include <mutex>   //NOLINT
44
#include <thread>  //NOLINT
45
#include <vector>
46
47
namespace jpegxl {
48
49
// Main helper class implementing the ::JxlParallelRunner interface.
50
class ThreadParallelRunner {
51
 public:
52
  // ::JxlParallelRunner interface.
53
  static JxlParallelRetCode Runner(void* runner_opaque, void* jpegxl_opaque,
54
                                   JxlParallelRunInit init,
55
                                   JxlParallelRunFunction func,
56
                                   uint32_t start_range, uint32_t end_range);
57
58
  // Starts the given number of worker threads and blocks until they are ready.
59
  // "num_worker_threads" defaults to one per hyperthread. If zero, all tasks
60
  // run on the main thread.
61
  explicit ThreadParallelRunner(
62
      int num_worker_threads = std::thread::hardware_concurrency());
63
64
  // Waits for all threads to exit.
65
  ~ThreadParallelRunner();
66
67
  // Returns maximum number of main/worker threads that may call Func. Useful
68
  // for allocating per-thread storage.
69
0
  size_t NumThreads() const { return num_threads_; }
70
71
  // Runs func(thread, thread) on all thread(s) that may participate in Run.
72
  // If NumThreads() == 0, runs on the main thread with thread == 0, otherwise
73
  // concurrently called by each worker thread in [0, NumThreads()).
74
  template <class Func>
75
  void RunOnEachThread(const Func& func) {
76
    if (num_worker_threads_ == 0) {
77
      const int thread = 0;
78
      func(thread, thread);
79
      return;
80
    }
81
82
    data_func_ = reinterpret_cast<JxlParallelRunFunction>(&CallClosure<Func>);
83
    jpegxl_opaque_ = const_cast<void*>(static_cast<const void*>(&func));
84
    StartWorkers(kWorkerOnce);
85
    WorkersReadyBarrier();
86
  }
87
88
  JxlMemoryManager memory_manager;
89
90
 private:
91
  // After construction and between calls to Run, workers are "ready", i.e.
92
  // waiting on worker_start_cv_. They are "started" by sending a "command"
93
  // and notifying all worker_start_cv_ waiters. (That is why all workers
94
  // must be ready/waiting - otherwise, the notification will not reach all of
95
  // them and the main thread waits in vain for them to report readiness.)
96
  using WorkerCommand = uint64_t;
97
98
  // Special values; all others encode the begin/end parameters. Note that all
99
  // these are no-op ranges (begin >= end) and therefore never used to encode
100
  // ranges.
101
  static constexpr WorkerCommand kWorkerWait = ~1ULL;
102
  static constexpr WorkerCommand kWorkerOnce = ~2ULL;
103
  static constexpr WorkerCommand kWorkerExit = ~3ULL;
104
105
  // Calls f(task, thread). Used for type erasure of Func arguments. The
106
  // signature must match JxlParallelRunFunction, hence a void* argument.
107
  template <class Closure>
108
  static void CallClosure(void* f, const uint32_t task, const size_t thread) {
109
    (*reinterpret_cast<const Closure*>(f))(task, thread);
110
  }
111
112
77.3k
  void WorkersReadyBarrier() {
113
77.3k
    std::unique_lock<std::mutex> lock(mutex_);
114
    // Typically only a single iteration.
115
154k
    while (workers_ready_ != threads_.size()) {
116
77.3k
      workers_ready_cv_.wait(lock);
117
77.3k
    }
118
77.3k
    workers_ready_ = 0;
119
120
    // Safely handle spurious worker wakeups.
121
77.3k
    worker_start_command_ = kWorkerWait;
122
77.3k
  }
123
124
  // Precondition: all workers are ready.
125
77.3k
  void StartWorkers(const WorkerCommand worker_command) {
126
77.3k
    mutex_.lock();
127
77.3k
    worker_start_command_ = worker_command;
128
    // Workers will need this lock, so release it before they wake up.
129
77.3k
    mutex_.unlock();
130
77.3k
    worker_start_cv_.notify_all();
131
77.3k
  }
132
133
  // Attempts to reserve and perform some work from the global range of tasks,
134
  // which is encoded within "command". Returns after all tasks are reserved.
135
  static void RunRange(ThreadParallelRunner* self, WorkerCommand command,
136
                       int thread);
137
138
  static void ThreadFunc(ThreadParallelRunner* self, int thread);
139
140
  // Unmodified after ctor, but cannot be const because we call thread::join().
141
  std::vector<std::thread> threads_;
142
143
  const uint32_t num_worker_threads_;  // == threads_.size()
144
  const uint32_t num_threads_;
145
146
  std::atomic<uint32_t> depth_{
147
      0};  // detects if Run is re-entered (not supported).
148
149
  std::mutex mutex_;  // guards both cv and their variables.
150
  std::condition_variable workers_ready_cv_;
151
  uint32_t workers_ready_ = 0;
152
  std::condition_variable worker_start_cv_;
153
  WorkerCommand worker_start_command_;
154
155
  // Written by main thread, read by workers (after mutex lock/unlock).
156
  JxlParallelRunFunction data_func_;
157
  void* jpegxl_opaque_;
158
159
  // Updated by workers; padding avoids false sharing.
160
  uint8_t padding1[64];
161
  std::atomic<uint32_t> num_reserved_{0};
162
  uint8_t padding2[64];
163
};
164
165
}  // namespace jpegxl
166
167
#endif  // LIB_THREADS_THREAD_PARALLEL_RUNNER_INTERNAL_H_