/src/rocksdb/util/work_queue.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | /* |
7 | | * Copyright (c) 2016-present, Facebook, Inc. |
8 | | * All rights reserved. |
9 | | * |
10 | | * This source code is licensed under both the BSD-style license (found in the |
11 | | * LICENSE file in the root directory of this source tree) and the GPLv2 (found |
12 | | * in the COPYING file in the root directory of this source tree). |
13 | | */ |
14 | | #pragma once |
15 | | |
16 | | #include <atomic> |
17 | | #include <cassert> |
18 | | #include <condition_variable> |
19 | | #include <cstddef> |
20 | | #include <functional> |
21 | | #include <mutex> |
22 | | #include <queue> |
23 | | |
24 | | #include "rocksdb/rocksdb_namespace.h" |
25 | | |
26 | | namespace ROCKSDB_NAMESPACE { |
27 | | |
28 | | /// Unbounded thread-safe work queue. |
29 | | // |
30 | | // This file is an excerpt from Facebook's zstd repo at |
31 | | // https://github.com/facebook/zstd/. The relevant file is |
32 | | // contrib/pzstd/utils/WorkQueue.h. |
33 | | |
34 | | template <typename T> |
35 | | class WorkQueue { |
36 | | // Protects all member variable access |
37 | | std::mutex mutex_; |
38 | | std::condition_variable readerCv_; |
39 | | std::condition_variable writerCv_; |
40 | | std::condition_variable finishCv_; |
41 | | |
42 | | std::queue<T> queue_; |
43 | | bool done_; |
44 | | std::size_t maxSize_; |
45 | | |
46 | | // Must have lock to call this function |
47 | 0 | bool full() const { |
48 | 0 | if (maxSize_ == 0) { |
49 | 0 | return false; |
50 | 0 | } |
51 | 0 | return queue_.size() >= maxSize_; |
52 | 0 | } Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::full() const Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::full() const |
53 | | |
54 | | public: |
55 | | /** |
56 | | * Constructs an empty work queue with an optional max size. |
57 | | * If `maxSize == 0` the queue size is unbounded. |
58 | | * |
59 | | * @param maxSize The maximum allowed size of the work queue. |
60 | | */ |
61 | 0 | WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {} Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::WorkQueue(unsigned long) Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::WorkQueue(unsigned long) |
62 | | |
63 | | /** |
64 | | * Push an item onto the work queue. Notify a single thread that work is |
65 | | * available. If `finish()` has been called, do nothing and return false. |
66 | | * If `push()` returns false, then `item` has not been copied from. |
67 | | * |
68 | | * @param item Item to push onto the queue. |
69 | | * @returns True upon success, false if `finish()` has been called. An |
70 | | * item was pushed iff `push()` returns true. |
71 | | */ |
72 | | template <typename U> |
73 | 0 | bool push(U&& item) { |
74 | 0 | { |
75 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
76 | 0 | while (full() && !done_) { |
77 | 0 | writerCv_.wait(lock); |
78 | 0 | } |
79 | 0 | if (done_) { |
80 | 0 | return false; |
81 | 0 | } |
82 | 0 | queue_.push(std::forward<U>(item)); |
83 | 0 | } |
84 | 0 | readerCv_.notify_one(); |
85 | 0 | return true; |
86 | 0 | } Unexecuted instantiation: bool rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::push<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*&&) Unexecuted instantiation: bool rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::push<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&>(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&) Unexecuted instantiation: bool rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::push<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&&) |
87 | | |
88 | | /** |
89 | | * Attempts to pop an item off the work queue. It will block until data is |
90 | | * available or `finish()` has been called. |
91 | | * |
92 | | * @param[out] item If `pop` returns `true`, it contains the popped item. |
93 | | * If `pop` returns `false`, it is unmodified. |
94 | | * @returns True upon success. False if the queue is empty and |
95 | | * `finish()` has been called. |
96 | | */ |
97 | 0 | bool pop(T& item) { |
98 | 0 | { |
99 | 0 | std::unique_lock<std::mutex> lock(mutex_); |
100 | 0 | while (queue_.empty() && !done_) { |
101 | 0 | readerCv_.wait(lock); |
102 | 0 | } |
103 | 0 | if (queue_.empty()) { |
104 | 0 | assert(done_); |
105 | 0 | return false; |
106 | 0 | } |
107 | 0 | item = queue_.front(); |
108 | 0 | queue_.pop(); |
109 | 0 | } |
110 | 0 | writerCv_.notify_one(); |
111 | 0 | return true; |
112 | 0 | } Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::pop(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&) Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::pop(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*&) |
113 | | |
114 | | /** |
115 | | * Sets the maximum queue size. If `maxSize == 0` then it is unbounded. |
116 | | * |
117 | | * @param maxSize The new maximum queue size. |
118 | | */ |
119 | | void setMaxSize(std::size_t maxSize) { |
120 | | { |
121 | | std::lock_guard<std::mutex> lock(mutex_); |
122 | | maxSize_ = maxSize; |
123 | | } |
124 | | writerCv_.notify_all(); |
125 | | } |
126 | | |
127 | | /** |
128 | | * Promise that `push()` won't be called again, so once the queue is empty |
129 | | * there will never any more work. |
130 | | */ |
131 | 0 | void finish() { |
132 | 0 | { |
133 | 0 | std::lock_guard<std::mutex> lock(mutex_); |
134 | 0 | assert(!done_); |
135 | 0 | done_ = true; |
136 | 0 | } |
137 | 0 | readerCv_.notify_all(); |
138 | 0 | writerCv_.notify_all(); |
139 | 0 | finishCv_.notify_all(); |
140 | 0 | } Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::finish() Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::finish() |
141 | | |
142 | | /// Blocks until `finish()` has been called (but the queue may not be empty). |
143 | | void waitUntilFinished() { |
144 | | std::unique_lock<std::mutex> lock(mutex_); |
145 | | while (!done_) { |
146 | | finishCv_.wait(lock); |
147 | | } |
148 | | } |
149 | | }; |
150 | | } // namespace ROCKSDB_NAMESPACE |