/src/rocksdb/util/threadpool_imp.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "util/threadpool_imp.h" |
11 | | |
12 | | #ifndef OS_WIN |
13 | | #include <unistd.h> |
14 | | #endif |
15 | | |
16 | | #ifdef OS_LINUX |
17 | | #include <sys/resource.h> |
18 | | #include <sys/syscall.h> |
19 | | #endif |
20 | | |
21 | | #include <algorithm> |
22 | | #include <atomic> |
23 | | #include <condition_variable> |
24 | | #include <cstdlib> |
25 | | #include <deque> |
26 | | #include <mutex> |
27 | | #include <sstream> |
28 | | #include <thread> |
29 | | #include <vector> |
30 | | |
31 | | #include "monitoring/thread_status_util.h" |
32 | | #include "port/port.h" |
33 | | #include "test_util/sync_point.h" |
34 | | #include "util/string_util.h" |
35 | | |
36 | | namespace ROCKSDB_NAMESPACE { |
37 | | |
38 | 2 | void ThreadPoolImpl::PthreadCall(const char* label, int result) { |
39 | 2 | if (result != 0) { |
40 | 0 | fprintf(stderr, "pthread %s: %s\n", label, errnoStr(result).c_str()); |
41 | 0 | abort(); |
42 | 0 | } |
43 | 2 | } |
44 | | |
45 | | struct ThreadPoolImpl::Impl { |
46 | | Impl(); |
47 | | ~Impl(); |
48 | | |
49 | | void JoinThreads(bool wait_for_jobs_to_complete); |
50 | | |
51 | | void SetBackgroundThreadsInternal(int num, bool allow_reduce); |
52 | | int GetBackgroundThreads(); |
53 | | |
54 | 0 | unsigned int GetQueueLen() const { |
55 | 0 | return queue_len_.load(std::memory_order_relaxed); |
56 | 0 | } |
57 | | |
58 | | void LowerIOPriority(); |
59 | | |
60 | | void LowerCPUPriority(CpuPriority pri); |
61 | | |
62 | 4 | void WakeUpAllThreads() { bgsignal_.notify_all(); } |
63 | | |
64 | | void BGThread(size_t thread_id); |
65 | | |
66 | | void StartBGThreads(); |
67 | | |
68 | | void Submit(std::function<void()>&& schedule, |
69 | | std::function<void()>&& unschedule, void* tag); |
70 | | |
71 | | int UnSchedule(void* arg); |
72 | | |
73 | 8 | void SetHostEnv(Env* env) { env_ = env; } |
74 | | |
75 | 4 | Env* GetHostEnv() const { return env_; } |
76 | | |
77 | 33.5k | bool HasExcessiveThread() const { |
78 | 33.5k | return static_cast<int>(bgthreads_.size()) > total_threads_limit_; |
79 | 33.5k | } |
80 | | |
81 | | // Return true iff the current thread is the excessive thread to terminate. |
82 | | // Always terminate the running thread that is added last, even if there are |
83 | | // more than one thread to terminate. |
84 | 25.0k | bool IsLastExcessiveThread(size_t thread_id) const { |
85 | 25.0k | return HasExcessiveThread() && thread_id == bgthreads_.size() - 1; |
86 | 25.0k | } |
87 | | |
88 | 8.46k | bool IsExcessiveThread(size_t thread_id) const { |
89 | 8.46k | return static_cast<int>(thread_id) >= total_threads_limit_; |
90 | 8.46k | } |
91 | | |
92 | | // Return the thread priority. |
93 | | // This would allow its member-thread to know its priority. |
94 | 8 | Env::Priority GetThreadPriority() const { return priority_; } |
95 | | |
96 | | // Set the thread priority. |
97 | 8 | void SetThreadPriority(Env::Priority priority) { priority_ = priority; } |
98 | | |
99 | 0 | int ReserveThreads(int threads_to_be_reserved) { |
100 | 0 | std::unique_lock<std::mutex> lock(mu_); |
101 | | // We can reserve at most num_waiting_threads_ in total so the number of |
102 | | // threads that can be reserved might be fewer than the desired one. In |
103 | | // rare cases, num_waiting_threads_ could be less than reserved_threads |
104 | | // due to SetBackgroundThreadInternal or last excessive threads. If that |
105 | | // happens, we cannot reserve any other threads. |
106 | 0 | int reserved_threads_in_success = |
107 | 0 | std::min(std::max(num_waiting_threads_ - reserved_threads_, 0), |
108 | 0 | threads_to_be_reserved); |
109 | 0 | reserved_threads_ += reserved_threads_in_success; |
110 | 0 | return reserved_threads_in_success; |
111 | 0 | } |
112 | | |
113 | 0 | int ReleaseThreads(int threads_to_be_released) { |
114 | 0 | std::unique_lock<std::mutex> lock(mu_); |
115 | | // We cannot release more than reserved_threads_ |
116 | 0 | int released_threads_in_success = |
117 | 0 | std::min(reserved_threads_, threads_to_be_released); |
118 | 0 | reserved_threads_ -= released_threads_in_success; |
119 | 0 | WakeUpAllThreads(); |
120 | 0 | return released_threads_in_success; |
121 | 0 | } |
122 | | |
123 | | private: |
124 | | static void BGThreadWrapper(void* arg); |
125 | | |
126 | | bool low_io_priority_; |
127 | | CpuPriority cpu_priority_; |
128 | | Env::Priority priority_; |
129 | | Env* env_; |
130 | | |
131 | | int total_threads_limit_; |
132 | | std::atomic_uint queue_len_; // Queue length. Used for stats reporting |
133 | | // Number of reserved threads, managed by ReserveThreads(..) and |
134 | | // ReleaseThreads(..), if num_waiting_threads_ is no larger than |
135 | | // reserved_threads_, its thread will be blocked to ensure the reservation |
136 | | // mechanism |
137 | | int reserved_threads_; |
138 | | // Number of waiting threads (Maximum number of threads that can be |
139 | | // reserved), in rare cases, num_waiting_threads_ could be less than |
140 | | // reserved_threads due to SetBackgroundThreadInternal or last |
141 | | // excessive threads. |
142 | | int num_waiting_threads_; |
143 | | bool exit_all_threads_; |
144 | | bool wait_for_jobs_to_complete_; |
145 | | |
146 | | // Entry per Schedule()/Submit() call |
147 | | struct BGItem { |
148 | | void* tag = nullptr; |
149 | | std::function<void()> function; |
150 | | std::function<void()> unschedFunction; |
151 | | }; |
152 | | |
153 | | using BGQueue = std::deque<BGItem>; |
154 | | BGQueue queue_; |
155 | | |
156 | | std::mutex mu_; |
157 | | std::condition_variable bgsignal_; |
158 | | std::vector<port::Thread> bgthreads_; |
159 | | }; |
160 | | |
161 | | inline ThreadPoolImpl::Impl::Impl() |
162 | 8 | : low_io_priority_(false), |
163 | 8 | cpu_priority_(CpuPriority::kNormal), |
164 | 8 | priority_(Env::LOW), |
165 | 8 | env_(nullptr), |
166 | 8 | total_threads_limit_(0), |
167 | 8 | queue_len_(), |
168 | 8 | reserved_threads_(0), |
169 | 8 | num_waiting_threads_(0), |
170 | 8 | exit_all_threads_(false), |
171 | 8 | wait_for_jobs_to_complete_(false), |
172 | 8 | queue_(), |
173 | 8 | mu_(), |
174 | 8 | bgsignal_(), |
175 | 8 | bgthreads_() {} |
176 | | |
177 | 0 | inline ThreadPoolImpl::Impl::~Impl() { assert(bgthreads_.size() == 0U); } |
178 | | |
179 | 8 | void ThreadPoolImpl::Impl::JoinThreads(bool wait_for_jobs_to_complete) { |
180 | 8 | std::unique_lock<std::mutex> lock(mu_); |
181 | 8 | assert(!exit_all_threads_); |
182 | | |
183 | 8 | wait_for_jobs_to_complete_ = wait_for_jobs_to_complete; |
184 | 8 | exit_all_threads_ = true; |
185 | | // prevent threads from being recreated right after they're joined, in case |
186 | | // the user is concurrently submitting jobs. |
187 | 8 | total_threads_limit_ = 0; |
188 | 8 | reserved_threads_ = 0; |
189 | 8 | num_waiting_threads_ = 0; |
190 | | |
191 | 8 | lock.unlock(); |
192 | | |
193 | 8 | bgsignal_.notify_all(); |
194 | | |
195 | 8 | for (auto& th : bgthreads_) { |
196 | 4 | th.join(); |
197 | 4 | } |
198 | | |
199 | 8 | bgthreads_.clear(); |
200 | | |
201 | 8 | exit_all_threads_ = false; |
202 | 8 | wait_for_jobs_to_complete_ = false; |
203 | 8 | } |
204 | | |
205 | 0 | inline void ThreadPoolImpl::Impl::LowerIOPriority() { |
206 | 0 | std::lock_guard<std::mutex> lock(mu_); |
207 | 0 | low_io_priority_ = true; |
208 | 0 | } |
209 | | |
210 | 0 | inline void ThreadPoolImpl::Impl::LowerCPUPriority(CpuPriority pri) { |
211 | 0 | std::lock_guard<std::mutex> lock(mu_); |
212 | 0 | cpu_priority_ = pri; |
213 | 0 | } |
214 | | |
215 | 4 | void ThreadPoolImpl::Impl::BGThread(size_t thread_id) { |
216 | 4 | bool low_io_priority = false; |
217 | 4 | CpuPriority current_cpu_priority = CpuPriority::kNormal; |
218 | | |
219 | 8.46k | while (true) { |
220 | | // Wait until there is an item that is ready to run |
221 | 8.46k | std::unique_lock<std::mutex> lock(mu_); |
222 | | // Stop waiting if the thread needs to do work or needs to terminate. |
223 | | // Increase num_waiting_threads_ once this task has started waiting |
224 | 8.46k | num_waiting_threads_++; |
225 | | |
226 | 8.46k | TEST_SYNC_POINT("ThreadPoolImpl::BGThread::WaitingThreadsInc"); |
227 | 8.46k | TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Start:th", thread_id); |
228 | | // When not exist_all_threads and the current thread id is not the last |
229 | | // excessive thread, it may be blocked due to 3 reasons: 1) queue is empty |
230 | | // 2) it is the excessive thread (not the last one) |
231 | | // 3) the number of waiting threads is not greater than reserved threads |
232 | | // (i.e, no available threads due to full reservation") |
233 | 16.5k | while (!exit_all_threads_ && !IsLastExcessiveThread(thread_id) && |
234 | 16.5k | (queue_.empty() || IsExcessiveThread(thread_id) || |
235 | 8.46k | num_waiting_threads_ <= reserved_threads_)) { |
236 | 8.11k | bgsignal_.wait(lock); |
237 | 8.11k | } |
238 | | // Decrease num_waiting_threads_ once the thread is not waiting |
239 | 8.46k | num_waiting_threads_--; |
240 | | |
241 | 8.46k | if (exit_all_threads_) { // mechanism to let BG threads exit safely |
242 | | |
243 | 4 | if (!wait_for_jobs_to_complete_ || queue_.empty()) { |
244 | 4 | break; |
245 | 4 | } |
246 | 8.46k | } else if (IsLastExcessiveThread(thread_id)) { |
247 | | // Current thread is the last generated one and is excessive. |
248 | | // We always terminate excessive thread in the reverse order of |
249 | | // generation time. But not when `exit_all_threads_ == true`, |
250 | | // otherwise `JoinThreads()` could try to `join()` a `detach()`ed |
251 | | // thread. |
252 | 0 | auto& terminating_thread = bgthreads_.back(); |
253 | 0 | terminating_thread.detach(); |
254 | 0 | bgthreads_.pop_back(); |
255 | 0 | if (HasExcessiveThread()) { |
256 | | // There is still at least more excessive thread to terminate. |
257 | 0 | WakeUpAllThreads(); |
258 | 0 | } |
259 | 0 | TEST_IDX_SYNC_POINT("ThreadPoolImpl::BGThread::Termination:th", |
260 | 0 | thread_id); |
261 | 0 | TEST_SYNC_POINT("ThreadPoolImpl::BGThread::Termination"); |
262 | 0 | break; |
263 | 0 | } |
264 | | |
265 | 8.46k | auto func = std::move(queue_.front().function); |
266 | 8.46k | queue_.pop_front(); |
267 | | |
268 | 8.46k | queue_len_.store(static_cast<unsigned int>(queue_.size()), |
269 | 8.46k | std::memory_order_relaxed); |
270 | | |
271 | 8.46k | bool decrease_io_priority = (low_io_priority != low_io_priority_); |
272 | 8.46k | CpuPriority cpu_priority = cpu_priority_; |
273 | 8.46k | lock.unlock(); |
274 | | |
275 | 8.46k | if (cpu_priority < current_cpu_priority) { |
276 | 0 | TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::BeforeSetCpuPriority", |
277 | 0 | ¤t_cpu_priority); |
278 | | // 0 means current thread. |
279 | 0 | port::SetCpuPriority(0, cpu_priority); |
280 | 0 | current_cpu_priority = cpu_priority; |
281 | 0 | TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::BGThread::AfterSetCpuPriority", |
282 | 0 | ¤t_cpu_priority); |
283 | 0 | } |
284 | | |
285 | 8.46k | #ifdef OS_LINUX |
286 | 8.46k | if (decrease_io_priority) { |
287 | 0 | #define IOPRIO_CLASS_SHIFT (13) |
288 | 0 | #define IOPRIO_PRIO_VALUE(class, data) (((class) << IOPRIO_CLASS_SHIFT) | data) |
289 | | // Put schedule into IOPRIO_CLASS_IDLE class (lowest) |
290 | | // These system calls only have an effect when used in conjunction |
291 | | // with an I/O scheduler that supports I/O priorities. As at |
292 | | // kernel 2.6.17 the only such scheduler is the Completely |
293 | | // Fair Queuing (CFQ) I/O scheduler. |
294 | | // To change scheduler: |
295 | | // echo cfq > /sys/block/<device_name>/queue/schedule |
296 | | // Tunables to consider: |
297 | | // /sys/block/<device_name>/queue/slice_idle |
298 | | // /sys/block/<device_name>/queue/slice_sync |
299 | 0 | syscall(SYS_ioprio_set, 1, // IOPRIO_WHO_PROCESS |
300 | 0 | 0, // current thread |
301 | 0 | IOPRIO_PRIO_VALUE(3, 0)); |
302 | 0 | low_io_priority = true; |
303 | 0 | } |
304 | | #else |
305 | | (void)decrease_io_priority; // avoid 'unused variable' error |
306 | | #endif |
307 | | |
308 | 8.46k | TEST_SYNC_POINT_CALLBACK("ThreadPoolImpl::Impl::BGThread:BeforeRun", |
309 | 8.46k | &priority_); |
310 | | |
311 | 8.46k | func(); |
312 | 8.46k | } |
313 | 4 | } |
314 | | |
315 | | // Helper struct for passing arguments when creating threads. |
316 | | struct BGThreadMetadata { |
317 | | ThreadPoolImpl::Impl* thread_pool_; |
318 | | size_t thread_id_; // Thread count in the thread. |
319 | | BGThreadMetadata(ThreadPoolImpl::Impl* thread_pool, size_t thread_id) |
320 | 4 | : thread_pool_(thread_pool), thread_id_(thread_id) {} |
321 | | }; |
322 | | |
323 | 4 | void ThreadPoolImpl::Impl::BGThreadWrapper(void* arg) { |
324 | 4 | BGThreadMetadata* meta = static_cast<BGThreadMetadata*>(arg); |
325 | 4 | size_t thread_id = meta->thread_id_; |
326 | 4 | ThreadPoolImpl::Impl* tp = meta->thread_pool_; |
327 | 4 | #ifdef ROCKSDB_USING_THREAD_STATUS |
328 | | // initialize it because compiler isn't good enough to see we don't use it |
329 | | // uninitialized |
330 | 4 | ThreadStatus::ThreadType thread_type = ThreadStatus::NUM_THREAD_TYPES; |
331 | 4 | switch (tp->GetThreadPriority()) { |
332 | 2 | case Env::Priority::HIGH: |
333 | 2 | thread_type = ThreadStatus::HIGH_PRIORITY; |
334 | 2 | break; |
335 | 2 | case Env::Priority::LOW: |
336 | 2 | thread_type = ThreadStatus::LOW_PRIORITY; |
337 | 2 | break; |
338 | 0 | case Env::Priority::BOTTOM: |
339 | 0 | thread_type = ThreadStatus::BOTTOM_PRIORITY; |
340 | 0 | break; |
341 | 0 | case Env::Priority::USER: |
342 | 0 | thread_type = ThreadStatus::USER; |
343 | 0 | break; |
344 | 0 | case Env::Priority::TOTAL: |
345 | 0 | assert(false); |
346 | 0 | return; |
347 | 4 | } |
348 | 4 | assert(thread_type != ThreadStatus::NUM_THREAD_TYPES); |
349 | 4 | ThreadStatusUtil::RegisterThread(tp->GetHostEnv(), thread_type); |
350 | 4 | #endif |
351 | 4 | delete meta; |
352 | 4 | tp->BGThread(thread_id); |
353 | 4 | #ifdef ROCKSDB_USING_THREAD_STATUS |
354 | 4 | ThreadStatusUtil::UnregisterThread(); |
355 | 4 | #endif |
356 | 4 | return; |
357 | 4 | } |
358 | | |
359 | | void ThreadPoolImpl::Impl::SetBackgroundThreadsInternal(int num, |
360 | 115k | bool allow_reduce) { |
361 | 115k | std::lock_guard<std::mutex> lock(mu_); |
362 | 115k | if (exit_all_threads_) { |
363 | 0 | return; |
364 | 0 | } |
365 | 115k | if (num > total_threads_limit_ || |
366 | 115k | (num < total_threads_limit_ && allow_reduce)) { |
367 | 4 | total_threads_limit_ = std::max(0, num); |
368 | 4 | WakeUpAllThreads(); |
369 | 4 | StartBGThreads(); |
370 | 4 | } |
371 | 115k | } |
372 | | |
373 | 93.2k | int ThreadPoolImpl::Impl::GetBackgroundThreads() { |
374 | 93.2k | std::unique_lock<std::mutex> lock(mu_); |
375 | 93.2k | return total_threads_limit_; |
376 | 93.2k | } |
377 | | |
378 | 8.47k | void ThreadPoolImpl::Impl::StartBGThreads() { |
379 | | // Start background thread if necessary |
380 | 8.47k | while ((int)bgthreads_.size() < total_threads_limit_) { |
381 | 4 | port::Thread p_t(&BGThreadWrapper, |
382 | 4 | new BGThreadMetadata(this, bgthreads_.size())); |
383 | | |
384 | | // Set the thread name to aid debugging |
385 | 4 | #if defined(_GNU_SOURCE) && defined(__GLIBC_PREREQ) |
386 | 4 | #if __GLIBC_PREREQ(2, 12) |
387 | 4 | auto th_handle = p_t.native_handle(); |
388 | 4 | std::string thread_priority = Env::PriorityToString(GetThreadPriority()); |
389 | 4 | std::ostringstream thread_name_stream; |
390 | 4 | thread_name_stream << "rocksdb:"; |
391 | 14 | for (char c : thread_priority) { |
392 | 14 | thread_name_stream << static_cast<char>(tolower(c)); |
393 | 14 | } |
394 | 4 | pthread_setname_np(th_handle, thread_name_stream.str().c_str()); |
395 | 4 | #endif |
396 | 4 | #endif |
397 | 4 | bgthreads_.push_back(std::move(p_t)); |
398 | 4 | } |
399 | 8.47k | } |
400 | | |
401 | | void ThreadPoolImpl::Impl::Submit(std::function<void()>&& schedule, |
402 | | std::function<void()>&& unschedule, |
403 | 8.46k | void* tag) { |
404 | 8.46k | std::lock_guard<std::mutex> lock(mu_); |
405 | | |
406 | 8.46k | if (exit_all_threads_) { |
407 | 0 | return; |
408 | 0 | } |
409 | | |
410 | 8.46k | StartBGThreads(); |
411 | | |
412 | | // Add to priority queue |
413 | 8.46k | queue_.push_back(BGItem()); |
414 | 8.46k | TEST_SYNC_POINT("ThreadPoolImpl::Submit::Enqueue"); |
415 | 8.46k | auto& item = queue_.back(); |
416 | 8.46k | item.tag = tag; |
417 | 8.46k | item.function = std::move(schedule); |
418 | 8.46k | item.unschedFunction = std::move(unschedule); |
419 | | |
420 | 8.46k | queue_len_.store(static_cast<unsigned int>(queue_.size()), |
421 | 8.46k | std::memory_order_relaxed); |
422 | | |
423 | 8.46k | if (!HasExcessiveThread()) { |
424 | | // Wake up at least one waiting thread. |
425 | 8.46k | bgsignal_.notify_one(); |
426 | 8.46k | } else { |
427 | | // Need to wake up all threads to make sure the one woken |
428 | | // up is not the one to terminate. |
429 | 0 | WakeUpAllThreads(); |
430 | 0 | } |
431 | 8.46k | } |
432 | | |
433 | 290k | int ThreadPoolImpl::Impl::UnSchedule(void* arg) { |
434 | 290k | int count = 0; |
435 | | |
436 | 290k | std::vector<std::function<void()>> candidates; |
437 | 290k | { |
438 | 290k | std::lock_guard<std::mutex> lock(mu_); |
439 | | |
440 | | // Remove from priority queue |
441 | 290k | BGQueue::iterator it = queue_.begin(); |
442 | 290k | while (it != queue_.end()) { |
443 | 5 | if (arg == (*it).tag) { |
444 | 5 | if (it->unschedFunction) { |
445 | 5 | candidates.push_back(std::move(it->unschedFunction)); |
446 | 5 | } |
447 | 5 | it = queue_.erase(it); |
448 | 5 | count++; |
449 | 5 | } else { |
450 | 0 | ++it; |
451 | 0 | } |
452 | 5 | } |
453 | 290k | queue_len_.store(static_cast<unsigned int>(queue_.size()), |
454 | 290k | std::memory_order_relaxed); |
455 | 290k | } |
456 | | |
457 | | // Run unschedule functions outside the mutex |
458 | 290k | for (auto& f : candidates) { |
459 | 5 | f(); |
460 | 5 | } |
461 | | |
462 | 290k | return count; |
463 | 290k | } |
464 | | |
465 | 8 | ThreadPoolImpl::ThreadPoolImpl() : impl_(new Impl()) {} |
466 | | |
467 | 0 | ThreadPoolImpl::~ThreadPoolImpl() = default; |
468 | | |
469 | 8 | void ThreadPoolImpl::JoinAllThreads() { impl_->JoinThreads(false); } |
470 | | |
471 | 0 | void ThreadPoolImpl::SetBackgroundThreads(int num) { |
472 | 0 | impl_->SetBackgroundThreadsInternal(num, true); |
473 | 0 | } |
474 | | |
475 | 93.2k | int ThreadPoolImpl::GetBackgroundThreads() { |
476 | 93.2k | return impl_->GetBackgroundThreads(); |
477 | 93.2k | } |
478 | | |
479 | 0 | unsigned int ThreadPoolImpl::GetQueueLen() const { |
480 | 0 | return impl_->GetQueueLen(); |
481 | 0 | } |
482 | | |
483 | 0 | void ThreadPoolImpl::WaitForJobsAndJoinAllThreads() { |
484 | 0 | impl_->JoinThreads(true); |
485 | 0 | } |
486 | | |
487 | 0 | void ThreadPoolImpl::LowerIOPriority() { impl_->LowerIOPriority(); } |
488 | | |
489 | 0 | void ThreadPoolImpl::LowerCPUPriority(CpuPriority pri) { |
490 | 0 | impl_->LowerCPUPriority(pri); |
491 | 0 | } |
492 | | |
493 | 115k | void ThreadPoolImpl::IncBackgroundThreadsIfNeeded(int num) { |
494 | 115k | impl_->SetBackgroundThreadsInternal(num, false); |
495 | 115k | } |
496 | | |
497 | 0 | void ThreadPoolImpl::SubmitJob(const std::function<void()>& job) { |
498 | 0 | auto copy(job); |
499 | 0 | impl_->Submit(std::move(copy), std::function<void()>(), nullptr); |
500 | 0 | } |
501 | | |
502 | 0 | void ThreadPoolImpl::SubmitJob(std::function<void()>&& job) { |
503 | 0 | impl_->Submit(std::move(job), std::function<void()>(), nullptr); |
504 | 0 | } |
505 | | |
506 | | void ThreadPoolImpl::Schedule(void (*function)(void* arg1), void* arg, |
507 | 8.46k | void* tag, void (*unschedFunction)(void* arg)) { |
508 | 8.46k | if (unschedFunction == nullptr) { |
509 | 0 | impl_->Submit(std::bind(function, arg), std::function<void()>(), tag); |
510 | 8.46k | } else { |
511 | 8.46k | impl_->Submit(std::bind(function, arg), std::bind(unschedFunction, arg), |
512 | 8.46k | tag); |
513 | 8.46k | } |
514 | 8.46k | } |
515 | | |
516 | 290k | int ThreadPoolImpl::UnSchedule(void* arg) { return impl_->UnSchedule(arg); } |
517 | | |
518 | 8 | void ThreadPoolImpl::SetHostEnv(Env* env) { impl_->SetHostEnv(env); } |
519 | | |
520 | 0 | Env* ThreadPoolImpl::GetHostEnv() const { return impl_->GetHostEnv(); } |
521 | | |
522 | | // Return the thread priority. |
523 | | // This would allow its member-thread to know its priority. |
524 | 0 | Env::Priority ThreadPoolImpl::GetThreadPriority() const { |
525 | 0 | return impl_->GetThreadPriority(); |
526 | 0 | } |
527 | | |
528 | | // Set the thread priority. |
529 | 8 | void ThreadPoolImpl::SetThreadPriority(Env::Priority priority) { |
530 | 8 | impl_->SetThreadPriority(priority); |
531 | 8 | } |
532 | | |
533 | | // Reserve a specific number of threads, prevent them from running other |
534 | | // functions The number of reserved threads could be fewer than the desired one |
535 | 0 | int ThreadPoolImpl::ReserveThreads(int threads_to_be_reserved) { |
536 | 0 | return impl_->ReserveThreads(threads_to_be_reserved); |
537 | 0 | } |
538 | | |
539 | | // Release a specific number of threads |
540 | 0 | int ThreadPoolImpl::ReleaseThreads(int threads_to_be_released) { |
541 | 0 | return impl_->ReleaseThreads(threads_to_be_released); |
542 | 0 | } |
543 | | |
544 | 0 | ThreadPool* NewThreadPool(int num_threads) { |
545 | 0 | ThreadPoolImpl* thread_pool = new ThreadPoolImpl(); |
546 | 0 | thread_pool->SetBackgroundThreads(num_threads); |
547 | 0 | return thread_pool; |
548 | 0 | } |
549 | | |
550 | | } // namespace ROCKSDB_NAMESPACE |