Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/util/work_queue.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
/*
7
 * Copyright (c) 2016-present, Facebook, Inc.
8
 * All rights reserved.
9
 *
10
 * This source code is licensed under both the BSD-style license (found in the
11
 * LICENSE file in the root directory of this source tree) and the GPLv2 (found
12
 * in the COPYING file in the root directory of this source tree).
13
 */
14
#pragma once
15
16
#include <atomic>
17
#include <cassert>
18
#include <condition_variable>
19
#include <cstddef>
20
#include <functional>
21
#include <mutex>
22
#include <queue>
23
24
#include "rocksdb/rocksdb_namespace.h"
25
26
namespace ROCKSDB_NAMESPACE {
27
28
/// Unbounded thread-safe work queue.
29
//
30
// This file is an excerpt from Facebook's zstd repo at
31
// https://github.com/facebook/zstd/. The relevant file is
32
// contrib/pzstd/utils/WorkQueue.h.
33
34
template <typename T>
35
class WorkQueue {
36
  // Protects all member variable access
37
  std::mutex mutex_;
38
  std::condition_variable readerCv_;
39
  std::condition_variable writerCv_;
40
  std::condition_variable finishCv_;
41
42
  std::queue<T> queue_;
43
  bool done_;
44
  std::size_t maxSize_;
45
46
  // Must have lock to call this function
47
0
  bool full() const {
48
0
    if (maxSize_ == 0) {
49
0
      return false;
50
0
    }
51
0
    return queue_.size() >= maxSize_;
52
0
  }
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::full() const
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::full() const
53
54
 public:
55
  /**
56
   * Constructs an empty work queue with an optional max size.
57
   * If `maxSize == 0` the queue size is unbounded.
58
   *
59
   * @param maxSize The maximum allowed size of the work queue.
60
   */
61
0
  WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::WorkQueue(unsigned long)
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::WorkQueue(unsigned long)
62
63
  /**
64
   * Push an item onto the work queue.  Notify a single thread that work is
65
   * available.  If `finish()` has been called, do nothing and return false.
66
   * If `push()` returns false, then `item` has not been copied from.
67
   *
68
   * @param item  Item to push onto the queue.
69
   * @returns     True upon success, false if `finish()` has been called.  An
70
   *               item was pushed iff `push()` returns true.
71
   */
72
  template <typename U>
73
0
  bool push(U&& item) {
74
0
    {
75
0
      std::unique_lock<std::mutex> lock(mutex_);
76
0
      while (full() && !done_) {
77
0
        writerCv_.wait(lock);
78
0
      }
79
0
      if (done_) {
80
0
        return false;
81
0
      }
82
0
      queue_.push(std::forward<U>(item));
83
0
    }
84
0
    readerCv_.notify_one();
85
0
    return true;
86
0
  }
Unexecuted instantiation: bool rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::push<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*&&)
Unexecuted instantiation: bool rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::push<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&>(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&)
Unexecuted instantiation: bool rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::push<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&&)
87
88
  /**
89
   * Attempts to pop an item off the work queue.  It will block until data is
90
   * available or `finish()` has been called.
91
   *
92
   * @param[out] item  If `pop` returns `true`, it contains the popped item.
93
   *                    If `pop` returns `false`, it is unmodified.
94
   * @returns          True upon success.  False if the queue is empty and
95
   *                    `finish()` has been called.
96
   */
97
0
  bool pop(T& item) {
98
0
    {
99
0
      std::unique_lock<std::mutex> lock(mutex_);
100
0
      while (queue_.empty() && !done_) {
101
0
        readerCv_.wait(lock);
102
0
      }
103
0
      if (queue_.empty()) {
104
0
        assert(done_);
105
0
        return false;
106
0
      }
107
0
      item = queue_.front();
108
0
      queue_.pop();
109
0
    }
110
0
    writerCv_.notify_one();
111
0
    return true;
112
0
  }
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::pop(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*&)
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::pop(rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*&)
113
114
  /**
115
   * Sets the maximum queue size.  If `maxSize == 0` then it is unbounded.
116
   *
117
   * @param maxSize The new maximum queue size.
118
   */
119
  void setMaxSize(std::size_t maxSize) {
120
    {
121
      std::lock_guard<std::mutex> lock(mutex_);
122
      maxSize_ = maxSize;
123
    }
124
    writerCv_.notify_all();
125
  }
126
127
  /**
128
   * Promise that `push()` won't be called again, so once the queue is empty
129
   * there will never any more work.
130
   */
131
0
  void finish() {
132
0
    {
133
0
      std::lock_guard<std::mutex> lock(mutex_);
134
0
      assert(!done_);
135
0
      done_ = true;
136
0
    }
137
0
    readerCv_.notify_all();
138
0
    writerCv_.notify_all();
139
0
    finishCv_.notify_all();
140
0
  }
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRep*>::finish()
Unexecuted instantiation: rocksdb::WorkQueue<rocksdb::BlockBasedTableBuilder::ParallelCompressionRep::BlockRepSlot*>::finish()
141
142
  /// Blocks until `finish()` has been called (but the queue may not be empty).
143
  void waitUntilFinished() {
144
    std::unique_lock<std::mutex> lock(mutex_);
145
    while (!done_) {
146
      finishCv_.wait(lock);
147
    }
148
  }
149
};
150
}  // namespace ROCKSDB_NAMESPACE