Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/util/threadpool_imp.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "util/threadpool_imp.h"
11
12
#ifndef OS_WIN
13
#include <unistd.h>
14
#endif
15
16
#ifdef OS_LINUX
17
#include <sys/resource.h>
18
#include <sys/syscall.h>
19
#endif
20
21
#include <algorithm>
22
#include <atomic>
23
#include <condition_variable>
24
#include <cstdlib>
25
#include <deque>
26
#include <mutex>
27
#include <sstream>
28
#include <thread>
29
#include <vector>
30
31
#include "monitoring/thread_status_util.h"
32
#include "port/port.h"
33
#include "test_util/sync_point.h"
34
#include "util/string_util.h"
35
36
namespace ROCKSDB_NAMESPACE {
37
38
2
void ThreadPoolImpl::PthreadCall(const char* label, int result) {
39
2
  if (result != 0) {
40
0
    fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str());
41
0
    abort();
42
0
  }
43
2
}
44
45
struct ThreadPoolImpl::Impl {
46
  Impl();
47
  ~Impl();
48
49
  void JoinThreads(bool wait_for_jobs_to_complete);
50
51
  void SetBackgroundThreadsInternal(int num, bool allow_reduce);
52
  int GetBackgroundThreads();
53
54
0
  unsigned int GetQueueLen() const {
55
0
    return queue_len_.load(std::memory_order_relaxed);
56
0
  }
57
58
  void LowerIOPriority();
59
60
  void LowerCPUPriority(CpuPriority pri);
61
62
4
  void WakeUpAllThreads() { bgsignal_.notify_all(); }
63
64
  void BGThread(size_t thread_id);
65
66
  void StartBGThreads();
67
68
  void Submit(std::function<void()>&& schedule,
69
              std::function<void()>&& unschedule, void* tag);
70
71
  int UnSchedule(void* arg);
72
73
8
  void SetHostEnv(Env* env) { env_ = env; }
74
75
4
  Env* GetHostEnv() const { return env_; }
76
77
33.5k
  bool HasExcessiveThread() const {
78
33.5k
    return static_cast<int>(bgthreads_.size()) > total_threads_limit_;
79
33.5k
  }
80
81
  // Return true iff the current thread is the excessive thread to terminate.
82
  // Always terminate the running thread that is added last, even if there are
83
  // more than one thread to terminate.
84
25.0k
  bool IsLastExcessiveThread(size_t thread_id) const {
85
25.0k
    return HasExcessiveThread() && thread_id == bgthreads_.size() - 1;
86
25.0k
  }
87
88
8.46k
  bool IsExcessiveThread(size_t thread_id) const {
89
8.46k
    return static_cast<int>(thread_id) >= total_threads_limit_;
90
8.46k
  }
91
92
  // Return the thread priority.
93
  // This would allow its member-thread to know its priority.
94
8
  Env::Priority GetThreadPriority() const { return priority_; }
95
96
  // Set the thread priority.
97
8
  void SetThreadPriority(Env::Priority priority) { priority_ = priority; }
98
99
0
  int ReserveThreads(int threads_to_be_reserved) {
100
0
    std::unique_lock<std::mutex> lock(mu_);
101
    // We can reserve at most num_waiting_threads_ in total so the number of
102
    // threads that can be reserved might be fewer than the desired one. In
103
    // rare cases, num_waiting_threads_ could be less than reserved_threads
104
    // due to SetBackgroundThreadInternal or last excessive threads. If that
105
    // happens, we cannot reserve any other threads.
106
0
    int reserved_threads_in_success =
107
0
        std::min(std::max(num_waiting_threads_ - reserved_threads_, 0),
108
0
                 threads_to_be_reserved);
109
0
    reserved_threads_ += reserved_threads_in_success;
110
0
    return reserved_threads_in_success;
111
0
  }
112
113
0
  int ReleaseThreads(int threads_to_be_released) {
114
0
    std::unique_lock<std::mutex> lock(mu_);
115
    // We cannot release more than reserved_threads_
116
0
    int released_threads_in_success =
117
0
        std::min(reserved_threads_, threads_to_be_released);
118
0
    reserved_threads_ -= released_threads_in_success;
119
0
    WakeUpAllThreads();
120
0
    return released_threads_in_success;
121
0
  }
122
123
 private:
124
  static void BGThreadWrapper(void* arg);
125
126
  bool low_io_priority_;
127
  CpuPriority cpu_priority_;
128
  Env::Priority priority_;
129
  Env* env_;
130
131
  int total_threads_limit_;
132
  std::atomic_uint queue_len_;  // Queue length. Used for stats reporting
133
  // Number of reserved threads, managed by ReserveThreads(..) and
134
  // ReleaseThreads(..), if num_waiting_threads_ is no larger than
135
  // reserved_threads_, its thread will be blocked to ensure the reservation
136
  // mechanism
137
  int reserved_threads_;
138
  // Number of waiting threads (Maximum number of threads that can be
139
  // reserved), in rare cases, num_waiting_threads_ could be less than
140
  // reserved_threads due to SetBackgroundThreadInternal or last
141
  // excessive threads.
142
  int num_waiting_threads_;
143
  bool exit_all_threads_;
144
  bool wait_for_jobs_to_complete_;
145
146
  // Entry per Schedule()/Submit() call
147
  struct BGItem {
148
    void* tag = nullptr;
149
    std::function<void()> function;
150
    std::function<void()> unschedFunction;
151
  };
152
153
  using BGQueue = std::deque<BGItem>;
154
  BGQueue queue_;
155
156
  std::mutex mu_;
157
  std::condition_variable bgsignal_;
158
  std::vector<port::Thread> bgthreads_;
159
};
160
161
inline ThreadPoolImpl::Impl::Impl()
162
8
    : low_io_priority_(false),
163
8
      cpu_priority_(CpuPriority::kNormal),
164
8
      priority_(Env::LOW),
165
8
      env_(nullptr),
166
8
      total_threads_limit_(0),
167
8
      queue_len_(),
168
8
      reserved_threads_(0),
169
8
      num_waiting_threads_(0),
170
8
      exit_all_threads_(false),
171
8
      wait_for_jobs_to_complete_(false),
172
8
      queue_(),
173
8
      mu_(),
174
8
      bgsignal_(),
175
8
      bgthreads_() {}
176
177
0
inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); }
178
179
8
void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) {
180
8
  std::unique_lock<std::mutex> lock(mu_);
181
8
  assert(!exit_all_threads_);
182
183
8
  wait_for_jobs_to_complete_ = wait_for_jobs_to_complete;
184
8
  exit_all_threads_ = true;
185
  // prevent threads from being recreated right after they're joined, in case
186
  // the user is concurrently submitting jobs.
187
8
  total_threads_limit_ = 0;
188
8
  reserved_threads_ = 0;
189
8
  num_waiting_threads_ = 0;
190
191
8
  lock.unlock();
192
193
8
  bgsignal_.notify_all();
194
195
8
  for (auto& th : bgthreads_) {
196
4
    th.join();
197
4
  }
198
199
8
  bgthreads_.clear();
200
201
8
  exit_all_threads_ = false;
202
8
  wait_for_jobs_to_complete_ = false;
203
8
}
204
205
0
inline void ThreadPoolImpl::Impl::LowerIOPriority() {
206
0
  std::lock_guard<std::mutex> lock(mu_);
207
0
  low_io_priority_ = true;
208
0
}
209
210
0
inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) {
211
0
  std::lock_guard<std::mutex> lock(mu_);
212
0
  cpu_priority_ = pri;
213
0
}
214
215
4
void ThreadPoolImpl::Impl::BGThread(size_t thread_id) {
216
4
  bool low_io_priority = false;
217
4
  CpuPriority current_cpu_priority = CpuPriority::kNormal;
218
219
8.46k
  while (true) {
220
    // Wait until there is an item that is ready to run
221
8.46k
    std::unique_lock<std::mutex> lock(mu_);
222
    // Stop waiting if the thread needs to do work or needs to terminate.
223
    // Increase num_waiting_threads_ once this task has started waiting
224
8.46k
    num_waiting_threads_++;
225
226
8.46k
    TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc");
227
8.46k
    TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id);
228
    // When not exist_all_threads and the current thread id is not the last
229
    // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty
230
    // 2) it is the excessive thread (not the last one)
231
    // 3) the number of waiting threads is not greater than reserved threads
232
    // (i.e, no available threads due to full reservation")
233
16.5k
    while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) &&
234
16.5k
           (queue_.empty() || IsExcessiveThread(thread_id) ||
235
8.46k
            num_waiting_threads_ <= reserved_threads_)) {
236
8.11k
      bgsignal_.wait(lock);
237
8.11k
    }
238
    // Decrease num_waiting_threads_ once the thread is not waiting
239
8.46k
    num_waiting_threads_--;
240
241
8.46k
    if (exit_all_threads_) {  // mechanism to let BG threads exit safely
242
243
4
      if (!wait_for_jobs_to_complete_ || queue_.empty()) {
244
4
        break;
245
4
      }
246
8.46k
    } else if (IsLastExcessiveThread(thread_id)) {
247
      // Current thread is the last generated one and is excessive.
248
      // We always terminate excessive thread in the reverse order of
249
      // generation time. But not when `exit_all_threads_ == true`,
250
      // otherwise `JoinThreads()` could try to `join()` a `detach()`ed
251
      // thread.
252
0
      auto& terminating_thread = bgthreads_.back();
253
0
      terminating_thread.detach();
254
0
      bgthreads_.pop_back();
255
0
      if (HasExcessiveThread()) {
256
        // There is still at least more excessive thread to terminate.
257
0
        WakeUpAllThreads();
258
0
      }
259
0
      TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th",
260
0
                          thread_id);
261
0
      TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination");
262
0
      break;
263
0
    }
264
265
8.46k
    auto func = std::move(queue_.front().function);
266
8.46k
    queue_.pop_front();
267
268
8.46k
    queue_len_.store(static_cast<unsigned int>(queue_.size()),
269
8.46k
                     std::memory_order_relaxed);
270
271
8.46k
    bool decrease_io_priority = (low_io_priority != low_io_priority_);
272
8.46k
    CpuPriority cpu_priority = cpu_priority_;
273
8.46k
    lock.unlock();
274
275
8.46k
    if (cpu_priority < current_cpu_priority) {
276
0
      TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority",
277
0
                               &current_cpu_priority);
278
      // 0 means current thread.
279
0
      port::SetCpuPriority(0, cpu_priority);
280
0
      current_cpu_priority = cpu_priority;
281
0
      TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority",
282
0
                               &current_cpu_priority);
283
0
    }
284
285
8.46k
#ifdef OS_LINUX
286
8.46k
    if (decrease_io_priority) {
287
0
#define IOPRIO_CLASS_SHIFT (13)
288
0
#define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data)
289
      // Put schedule into IOPRIO_CLASS_IDLE class (lowest)
290
      // These system calls only have an effect when used in conjunction
291
      // with an I/O scheduler that supports I/O priorities. As at
292
      // kernel 2.6.17 the only such scheduler is the Completely
293
      // Fair Queuing (CFQ) I/O scheduler.
294
      // To change scheduler:
295
      //  echo cfq > /sys/block/<device_name>/queue/schedule
296
      // Tunables to consider:
297
      //  /sys/block/<device_name>/queue/slice_idle
298
      //  /sys/block/<device_name>/queue/slice_sync
299
0
      syscall(SYS_ioprio_set, 1,  // IOPRIO_WHO_PROCESS
300
0
              0,                  // current thread
301
0
              IOPRIO_PRIO_VALUE(3, 0));
302
0
      low_io_priority = true;
303
0
    }
304
#else
305
    (void)decrease_io_priority;  // avoid 'unused variable' error
306
#endif
307
308
8.46k
    TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun",
309
8.46k
                             &priority_);
310
311
8.46k
    func();
312
8.46k
  }
313
4
}
314
315
// Helper struct for passing arguments when creating threads.
316
struct BGThreadMetadata {
317
  ThreadPoolImpl::Impl* thread_pool_;
318
  size_t thread_id_;  // Thread count in the thread.
319
  BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id)
320
4
      : thread_pool_(thread_pool), thread_id_(thread_id) {}
321
};
322
323
4
void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) {
324
4
  BGThreadMetadata* meta = static_cast<BGThreadMetadata*>(arg);
325
4
  size_t thread_id = meta->thread_id_;
326
4
  ThreadPoolImpl::Impl* tp = meta->thread_pool_;
327
4
#ifdef ROCKSDB_USING_THREAD_STATUS
328
  // initialize it because compiler isn't good enough to see we don't use it
329
  // uninitialized
330
4
  ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES;
331
4
  switch (tp->GetThreadPriority()) {
332
2
    case Env::Priority::HIGH:
333
2
      thread_type = ThreadStatus::HIGH_PRIORITY;
334
2
      break;
335
2
    case Env::Priority::LOW:
336
2
      thread_type = ThreadStatus::LOW_PRIORITY;
337
2
      break;
338
0
    case Env::Priority::BOTTOM:
339
0
      thread_type = ThreadStatus::BOTTOM_PRIORITY;
340
0
      break;
341
0
    case Env::Priority::USER:
342
0
      thread_type = ThreadStatus::USER;
343
0
      break;
344
0
    case Env::Priority::TOTAL:
345
0
      assert(false);
346
0
      return;
347
4
  }
348
4
  assert(thread_type != ThreadStatus::NUM_THREAD_TYPES);
349
4
  ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type);
350
4
#endif
351
4
  delete meta;
352
4
  tp->BGThread(thread_id);
353
4
#ifdef ROCKSDB_USING_THREAD_STATUS
354
4
  ThreadStatusUtil::UnregisterThread();
355
4
#endif
356
4
  return;
357
4
}
358
359
void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num,
360
115k
                                                        bool allow_reduce) {
361
115k
  std::lock_guard<std::mutex> lock(mu_);
362
115k
  if (exit_all_threads_) {
363
0
    return;
364
0
  }
365
115k
  if (num > total_threads_limit_ ||
366
115k
      (num < total_threads_limit_ && allow_reduce)) {
367
4
    total_threads_limit_ = std::max(0, num);
368
4
    WakeUpAllThreads();
369
4
    StartBGThreads();
370
4
  }
371
115k
}
372
373
93.2k
int ThreadPoolImpl::Impl::GetBackgroundThreads() {
374
93.2k
  std::unique_lock<std::mutex> lock(mu_);
375
93.2k
  return total_threads_limit_;
376
93.2k
}
377
378
8.47k
void ThreadPoolImpl::Impl::StartBGThreads() {
379
  // Start background thread if necessary
380
8.47k
  while ((int)bgthreads_.size() < total_threads_limit_) {
381
4
    port::Thread p_t(&BGThreadWrapper,
382
4
                     new BGThreadMetadata(this, bgthreads_.size()));
383
384
// Set the thread name to aid debugging
385
4
#if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ)
386
4
#if __GLIBC_PREREQ(2, 12)
387
4
    auto th_handle = p_t.native_handle();
388
4
    std::string thread_priority = Env::PriorityToString(GetThreadPriority());
389
4
    std::ostringstream thread_name_stream;
390
4
    thread_name_stream << "rocksdb:";
391
14
    for (char c : thread_priority) {
392
14
      thread_name_stream << static_cast<char>(tolower(c));
393
14
    }
394
4
    pthread_setname_np(th_handle, thread_name_stream.str().c_str());
395
4
#endif
396
4
#endif
397
4
    bgthreads_.push_back(std::move(p_t));
398
4
  }
399
8.47k
}
400
401
void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule,
402
                                  std::function<void()>&& unschedule,
403
8.46k
                                  void* tag) {
404
8.46k
  std::lock_guard<std::mutex> lock(mu_);
405
406
8.46k
  if (exit_all_threads_) {
407
0
    return;
408
0
  }
409
410
8.46k
  StartBGThreads();
411
412
  // Add to priority queue
413
8.46k
  queue_.push_back(BGItem());
414
8.46k
  TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue");
415
8.46k
  auto& item = queue_.back();
416
8.46k
  item.tag = tag;
417
8.46k
  item.function = std::move(schedule);
418
8.46k
  item.unschedFunction = std::move(unschedule);
419
420
8.46k
  queue_len_.store(static_cast<unsigned int>(queue_.size()),
421
8.46k
                   std::memory_order_relaxed);
422
423
8.46k
  if (!HasExcessiveThread()) {
424
    // Wake up at least one waiting thread.
425
8.46k
    bgsignal_.notify_one();
426
8.46k
  } else {
427
    // Need to wake up all threads to make sure the one woken
428
    // up is not the one to terminate.
429
0
    WakeUpAllThreads();
430
0
  }
431
8.46k
}
432
433
290k
int ThreadPoolImpl::Impl::UnSchedule(void* arg) {
434
290k
  int count = 0;
435
436
290k
  std::vector<std::function<void()>> candidates;
437
290k
  {
438
290k
    std::lock_guard<std::mutex> lock(mu_);
439
440
    // Remove from priority queue
441
290k
    BGQueue::iterator it = queue_.begin();
442
290k
    while (it != queue_.end()) {
443
5
      if (arg == (*it).tag) {
444
5
        if (it->unschedFunction) {
445
5
          candidates.push_back(std::move(it->unschedFunction));
446
5
        }
447
5
        it = queue_.erase(it);
448
5
        count++;
449
5
      } else {
450
0
        ++it;
451
0
      }
452
5
    }
453
290k
    queue_len_.store(static_cast<unsigned int>(queue_.size()),
454
290k
                     std::memory_order_relaxed);
455
290k
  }
456
457
  // Run unschedule functions outside the mutex
458
290k
  for (auto& f : candidates) {
459
5
    f();
460
5
  }
461
462
290k
  return count;
463
290k
}
464
465
8
ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) {}
466
467
0
ThreadPoolImpl::~ThreadPoolImpl() = default;
468
469
8
void ThreadPoolImpl::JoinAllThreads() { impl_->JoinThreads(false); }
470
471
0
void ThreadPoolImpl::SetBackgroundThreads(int num) {
472
0
  impl_->SetBackgroundThreadsInternal(num, true);
473
0
}
474
475
93.2k
int ThreadPoolImpl::GetBackgroundThreads() {
476
93.2k
  return impl_->GetBackgroundThreads();
477
93.2k
}
478
479
0
unsigned int ThreadPoolImpl::GetQueueLen() const {
480
0
  return impl_->GetQueueLen();
481
0
}
482
483
0
void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() {
484
0
  impl_->JoinThreads(true);
485
0
}
486
487
0
void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); }
488
489
0
void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) {
490
0
  impl_->LowerCPUPriority(pri);
491
0
}
492
493
115k
void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) {
494
115k
  impl_->SetBackgroundThreadsInternal(num, false);
495
115k
}
496
497
0
void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) {
498
0
  auto copy(job);
499
0
  impl_->Submit(std::move(copy), std::function<void()>(), nullptr);
500
0
}
501
502
0
void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) {
503
0
  impl_->Submit(std::move(job), std::function<void()>(), nullptr);
504
0
}
505
506
void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg,
507
8.46k
                              void* tag, void (*unschedFunction)(void* arg)) {
508
8.46k
  if (unschedFunction == nullptr) {
509
0
    impl_->Submit(std::bind(function, arg), std::function<void()>(), tag);
510
8.46k
  } else {
511
8.46k
    impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg),
512
8.46k
                  tag);
513
8.46k
  }
514
8.46k
}
515
516
290k
int ThreadPoolImpl::UnSchedule(void* arg) { return impl_->UnSchedule(arg); }
517
518
8
void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); }
519
520
0
Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); }
521
522
// Return the thread priority.
523
// This would allow its member-thread to know its priority.
524
0
Env::Priority ThreadPoolImpl::GetThreadPriority() const {
525
0
  return impl_->GetThreadPriority();
526
0
}
527
528
// Set the thread priority.
529
8
void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) {
530
8
  impl_->SetThreadPriority(priority);
531
8
}
532
533
// Reserve a specific number of threads, prevent them from running other
534
// functions The number of reserved threads could be fewer than the desired one
535
0
int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) {
536
0
  return impl_->ReserveThreads(threads_to_be_reserved);
537
0
}
538
539
// Release a specific number of threads
540
0
int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) {
541
0
  return impl_->ReleaseThreads(threads_to_be_released);
542
0
}
543
544
0
ThreadPool* NewThreadPool(int num_threads) {
545
0
  ThreadPoolImpl* thread_pool = new ThreadPoolImpl();
546
0
  thread_pool->SetBackgroundThreads(num_threads);
547
0
  return thread_pool;
548
0
}
549
550
}  // namespace ROCKSDB_NAMESPACE