/src/rocksdb/db/flush_scheduler.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/flush_scheduler.h" |
7 | | |
8 | | #include <cassert> |
9 | | |
10 | | #include "db/column_family.h" |
11 | | |
12 | | namespace ROCKSDB_NAMESPACE { |
13 | | |
14 | 0 | void FlushScheduler::ScheduleWork(ColumnFamilyData* cfd) { |
15 | | #ifndef NDEBUG |
16 | | { |
17 | | std::lock_guard<std::mutex> lock(checking_mutex_); |
18 | | assert(checking_set_.count(cfd) == 0); |
19 | | checking_set_.insert(cfd); |
20 | | } |
21 | | #endif // NDEBUG |
22 | 0 | cfd->Ref(); |
23 | | // Suppress false positive clang analyzer warnings. |
24 | 0 | #ifndef __clang_analyzer__ |
25 | 0 | Node* node = new Node{cfd, head_.load(std::memory_order_relaxed)}; |
26 | 0 | while (!head_.compare_exchange_strong( |
27 | 0 | node->next, node, std::memory_order_relaxed, std::memory_order_relaxed)) { |
28 | | // failing CAS updates the first param, so we are already set for |
29 | | // retry. TakeNextColumnFamily won't happen until after another |
30 | | // inter-thread synchronization, so we don't even need release |
31 | | // semantics for this CAS |
32 | 0 | } |
33 | 0 | #endif // __clang_analyzer__ |
34 | 0 | } |
35 | | |
36 | 548k | ColumnFamilyData* FlushScheduler::TakeNextColumnFamily() { |
37 | 548k | while (true) { |
38 | 548k | if (head_.load(std::memory_order_relaxed) == nullptr) { |
39 | 548k | return nullptr; |
40 | 548k | } |
41 | | |
42 | | // dequeue the head |
43 | 0 | Node* node = head_.load(std::memory_order_relaxed); |
44 | 0 | head_.store(node->next, std::memory_order_relaxed); |
45 | 0 | ColumnFamilyData* cfd = node->column_family; |
46 | 0 | delete node; |
47 | |
|
48 | | #ifndef NDEBUG |
49 | | { |
50 | | std::lock_guard<std::mutex> lock(checking_mutex_); |
51 | | auto iter = checking_set_.find(cfd); |
52 | | assert(iter != checking_set_.end()); |
53 | | checking_set_.erase(iter); |
54 | | } |
55 | | #endif // NDEBUG |
56 | |
|
57 | 0 | if (!cfd->IsDropped()) { |
58 | | // success |
59 | 0 | return cfd; |
60 | 0 | } |
61 | | |
62 | | // no longer relevant, retry |
63 | 0 | cfd->UnrefAndTryDelete(); |
64 | 0 | } |
65 | 548k | } |
66 | | |
67 | 495k | bool FlushScheduler::Empty() { |
68 | 495k | auto rv = head_.load(std::memory_order_relaxed) == nullptr; |
69 | | #ifndef NDEBUG |
70 | | std::lock_guard<std::mutex> lock(checking_mutex_); |
71 | | // Empty is allowed to be called concurrnetly with ScheduleFlush. It would |
72 | | // only miss the recent schedules. |
73 | | assert((rv == checking_set_.empty()) || rv); |
74 | | #endif // NDEBUG |
75 | 495k | return rv; |
76 | 495k | } |
77 | | |
78 | 73.0k | void FlushScheduler::Clear() { |
79 | 73.0k | ColumnFamilyData* cfd; |
80 | 73.0k | while ((cfd = TakeNextColumnFamily()) != nullptr) { |
81 | 0 | cfd->UnrefAndTryDelete(); |
82 | 0 | } |
83 | | assert(head_.load(std::memory_order_relaxed) == nullptr); |
84 | 73.0k | } |
85 | | |
86 | | } // namespace ROCKSDB_NAMESPACE |