/src/rocksdb/monitoring/thread_status_updater.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "monitoring/thread_status_updater.h" |
7 | | |
8 | | #include <memory> |
9 | | |
10 | | #include "port/likely.h" |
11 | | #include "rocksdb/env.h" |
12 | | #include "rocksdb/system_clock.h" |
13 | | #include "util/mutexlock.h" |
14 | | |
15 | | namespace ROCKSDB_NAMESPACE { |
16 | | |
17 | | #ifndef NROCKSDB_THREAD_STATUS |
18 | | |
19 | | thread_local ThreadStatusData* ThreadStatusUpdater::thread_status_data_ = |
20 | | nullptr; |
21 | | |
22 | | void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype, |
23 | 4 | uint64_t thread_id) { |
24 | 4 | if (UNLIKELY(thread_status_data_ == nullptr)) { |
25 | 4 | thread_status_data_ = new ThreadStatusData(); |
26 | 4 | thread_status_data_->thread_type = ttype; |
27 | 4 | thread_status_data_->thread_id = thread_id; |
28 | 4 | std::lock_guard<std::mutex> lck(thread_list_mutex_); |
29 | 4 | thread_data_set_.insert(thread_status_data_); |
30 | 4 | } |
31 | | |
32 | 4 | ClearThreadOperationProperties(); |
33 | 4 | } |
34 | | |
35 | 4 | void ThreadStatusUpdater::UnregisterThread() { |
36 | 4 | if (thread_status_data_ != nullptr) { |
37 | 4 | std::lock_guard<std::mutex> lck(thread_list_mutex_); |
38 | 4 | thread_data_set_.erase(thread_status_data_); |
39 | 4 | delete thread_status_data_; |
40 | 4 | thread_status_data_ = nullptr; |
41 | 4 | } |
42 | 4 | } |
43 | | |
44 | 8.15k | void ThreadStatusUpdater::ResetThreadStatus() { |
45 | 8.15k | ClearThreadState(); |
46 | 8.15k | ClearThreadOperation(); |
47 | 8.15k | SetColumnFamilyInfoKey(nullptr); |
48 | 8.15k | } |
49 | | |
50 | 5.26k | void ThreadStatusUpdater::SetEnableTracking(bool enable_tracking) { |
51 | 5.26k | auto* data = Get(); |
52 | 5.26k | if (data == nullptr) { |
53 | 0 | return; |
54 | 0 | } |
55 | 5.26k | data->enable_tracking.store(enable_tracking, std::memory_order_relaxed); |
56 | 5.26k | } |
57 | | |
58 | 16.3k | void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) { |
59 | 16.3k | auto* data = Get(); |
60 | 16.3k | if (data == nullptr) { |
61 | 0 | return; |
62 | 0 | } |
63 | 16.3k | data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed); |
64 | 16.3k | } |
65 | | |
66 | 0 | const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() { |
67 | 0 | auto* data = GetLocalThreadStatus(); |
68 | 0 | if (data == nullptr) { |
69 | 0 | return nullptr; |
70 | 0 | } |
71 | 0 | return data->cf_key.load(std::memory_order_relaxed); |
72 | 0 | } |
73 | | |
74 | | void ThreadStatusUpdater::SetThreadOperation( |
75 | 15.2k | const ThreadStatus::OperationType type) { |
76 | 15.2k | auto* data = GetLocalThreadStatus(); |
77 | 15.2k | if (data == nullptr) { |
78 | 15.2k | return; |
79 | 15.2k | } |
80 | | // NOTE: Our practice here is to set all the thread operation properties |
81 | | // and stage before we set thread operation, and thread operation |
82 | | // will be set in std::memory_order_release. This is to ensure |
83 | | // whenever a thread operation is not OP_UNKNOWN, we will always |
84 | | // have a consistent information on its properties. |
85 | 0 | data->operation_type.store(type, std::memory_order_release); |
86 | 0 | if (type == ThreadStatus::OP_UNKNOWN) { |
87 | 0 | data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, |
88 | 0 | std::memory_order_relaxed); |
89 | 0 | ClearThreadOperationProperties(); |
90 | 0 | } |
91 | 0 | } |
92 | | |
93 | 1.83k | ThreadStatus::OperationType ThreadStatusUpdater::GetThreadOperation() { |
94 | 1.83k | ThreadStatusData* data = GetLocalThreadStatus(); |
95 | 1.83k | if (data == nullptr) { |
96 | 1.83k | return ThreadStatus::OperationType::OP_UNKNOWN; |
97 | 1.83k | } |
98 | 0 | return data->operation_type.load(std::memory_order_relaxed); |
99 | 1.83k | } |
100 | | |
101 | 22.3k | void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) { |
102 | 22.3k | auto* data = GetLocalThreadStatus(); |
103 | 22.3k | if (data == nullptr) { |
104 | 22.3k | return; |
105 | 22.3k | } |
106 | 0 | data->op_properties[i].store(value, std::memory_order_relaxed); |
107 | 0 | } |
108 | | |
109 | | void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i, |
110 | 19.2k | uint64_t delta) { |
111 | 19.2k | auto* data = GetLocalThreadStatus(); |
112 | 19.2k | if (data == nullptr) { |
113 | 19.2k | return; |
114 | 19.2k | } |
115 | 18.4E | data->op_properties[i].fetch_add(delta, std::memory_order_relaxed); |
116 | 18.4E | } |
117 | | |
118 | 15.2k | void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) { |
119 | 15.2k | auto* data = GetLocalThreadStatus(); |
120 | 15.2k | if (data == nullptr) { |
121 | 15.2k | return; |
122 | 15.2k | } |
123 | 0 | data->op_start_time.store(start_time, std::memory_order_relaxed); |
124 | 0 | } |
125 | | |
126 | 8.15k | void ThreadStatusUpdater::ClearThreadOperation() { |
127 | 8.15k | auto* data = GetLocalThreadStatus(); |
128 | 8.15k | if (data == nullptr) { |
129 | 8.15k | return; |
130 | 8.15k | } |
131 | 0 | data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN, |
132 | 0 | std::memory_order_relaxed); |
133 | 0 | data->operation_type.store(ThreadStatus::OP_UNKNOWN, |
134 | 0 | std::memory_order_relaxed); |
135 | 0 | ClearThreadOperationProperties(); |
136 | 0 | } |
137 | | |
138 | 4 | void ThreadStatusUpdater::ClearThreadOperationProperties() { |
139 | 4 | auto* data = GetLocalThreadStatus(); |
140 | 4 | if (data == nullptr) { |
141 | 4 | return; |
142 | 4 | } |
143 | 0 | for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { |
144 | 0 | data->op_properties[i].store(0, std::memory_order_relaxed); |
145 | 0 | } |
146 | 0 | } |
147 | | |
148 | | ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage( |
149 | 45.7k | ThreadStatus::OperationStage stage) { |
150 | 45.7k | auto* data = GetLocalThreadStatus(); |
151 | 45.7k | if (data == nullptr) { |
152 | 45.7k | return ThreadStatus::STAGE_UNKNOWN; |
153 | 45.7k | } |
154 | 0 | return data->operation_stage.exchange(stage, std::memory_order_relaxed); |
155 | 45.7k | } |
156 | | |
157 | 0 | void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) { |
158 | 0 | auto* data = GetLocalThreadStatus(); |
159 | 0 | if (data == nullptr) { |
160 | 0 | return; |
161 | 0 | } |
162 | 0 | data->state_type.store(type, std::memory_order_relaxed); |
163 | 0 | } |
164 | | |
165 | 8.15k | void ThreadStatusUpdater::ClearThreadState() { |
166 | 8.15k | auto* data = GetLocalThreadStatus(); |
167 | 8.15k | if (data == nullptr) { |
168 | 8.15k | return; |
169 | 8.15k | } |
170 | 0 | data->state_type.store(ThreadStatus::STATE_UNKNOWN, |
171 | 0 | std::memory_order_relaxed); |
172 | 0 | } |
173 | | |
174 | | Status ThreadStatusUpdater::GetThreadList( |
175 | 0 | std::vector<ThreadStatus>* thread_list) { |
176 | 0 | thread_list->clear(); |
177 | 0 | std::vector<std::shared_ptr<ThreadStatusData>> valid_list; |
178 | 0 | uint64_t now_micros = SystemClock::Default()->NowMicros(); |
179 | |
|
180 | 0 | std::lock_guard<std::mutex> lck(thread_list_mutex_); |
181 | 0 | for (auto* thread_data : thread_data_set_) { |
182 | 0 | assert(thread_data); |
183 | 0 | auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed); |
184 | 0 | auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed); |
185 | | // Since any change to cf_info_map requires thread_list_mutex, |
186 | | // which is currently held by GetThreadList(), here we can safely |
187 | | // use "memory_order_relaxed" to load the cf_key. |
188 | 0 | auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed); |
189 | |
|
190 | 0 | ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN; |
191 | 0 | ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN; |
192 | 0 | ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN; |
193 | 0 | uint64_t op_elapsed_micros = 0; |
194 | 0 | uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0}; |
195 | |
|
196 | 0 | auto iter = cf_info_map_.find(cf_key); |
197 | 0 | if (iter != cf_info_map_.end()) { |
198 | 0 | op_type = thread_data->operation_type.load(std::memory_order_acquire); |
199 | | // display lower-level info only when higher-level info is available. |
200 | 0 | if (op_type != ThreadStatus::OP_UNKNOWN) { |
201 | 0 | op_elapsed_micros = now_micros - thread_data->op_start_time.load( |
202 | 0 | std::memory_order_relaxed); |
203 | 0 | op_stage = thread_data->operation_stage.load(std::memory_order_relaxed); |
204 | 0 | state_type = thread_data->state_type.load(std::memory_order_relaxed); |
205 | 0 | for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) { |
206 | 0 | op_props[i] = |
207 | 0 | thread_data->op_properties[i].load(std::memory_order_relaxed); |
208 | 0 | } |
209 | 0 | } |
210 | 0 | } |
211 | |
|
212 | 0 | thread_list->emplace_back( |
213 | 0 | thread_id, thread_type, |
214 | 0 | iter != cf_info_map_.end() ? iter->second.db_name : "", |
215 | 0 | iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type, |
216 | 0 | op_elapsed_micros, op_stage, op_props, state_type); |
217 | 0 | } |
218 | |
|
219 | 0 | return Status::OK(); |
220 | 0 | } |
221 | | |
222 | 136k | ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() { |
223 | 136k | if (thread_status_data_ == nullptr) { |
224 | 0 | return nullptr; |
225 | 0 | } |
226 | 136k | if (!thread_status_data_->enable_tracking.load(std::memory_order_relaxed)) { |
227 | 136k | return nullptr; |
228 | 136k | } |
229 | 0 | return thread_status_data_; |
230 | 136k | } |
231 | | |
232 | | void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key, |
233 | | const std::string& db_name, |
234 | | const void* cf_key, |
235 | 0 | const std::string& cf_name) { |
236 | | // Acquiring same lock as GetThreadList() to guarantee |
237 | | // a consistent view of global column family table (cf_info_map). |
238 | 0 | std::lock_guard<std::mutex> lck(thread_list_mutex_); |
239 | |
|
240 | 0 | cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key), |
241 | 0 | std::make_tuple(db_key, db_name, cf_name)); |
242 | 0 | db_key_map_[db_key].insert(cf_key); |
243 | 0 | } |
244 | | |
245 | 0 | void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) { |
246 | | // Acquiring same lock as GetThreadList() to guarantee |
247 | | // a consistent view of global column family table (cf_info_map). |
248 | 0 | std::lock_guard<std::mutex> lck(thread_list_mutex_); |
249 | |
|
250 | 0 | auto cf_pair = cf_info_map_.find(cf_key); |
251 | 0 | if (cf_pair != cf_info_map_.end()) { |
252 | | // Remove its entry from db_key_map_ by the following steps: |
253 | | // 1. Obtain the entry in db_key_map_ whose set contains cf_key |
254 | | // 2. Remove it from the set. |
255 | 0 | ConstantColumnFamilyInfo& cf_info = cf_pair->second; |
256 | 0 | auto db_pair = db_key_map_.find(cf_info.db_key); |
257 | 0 | assert(db_pair != db_key_map_.end()); |
258 | 0 | size_t result __attribute__((__unused__)); |
259 | 0 | result = db_pair->second.erase(cf_key); |
260 | 0 | assert(result); |
261 | 0 | cf_info_map_.erase(cf_pair); |
262 | 0 | } |
263 | 0 | } |
264 | | |
265 | 0 | void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) { |
266 | | // Acquiring same lock as GetThreadList() to guarantee |
267 | | // a consistent view of global column family table (cf_info_map). |
268 | 0 | std::lock_guard<std::mutex> lck(thread_list_mutex_); |
269 | 0 | auto db_pair = db_key_map_.find(db_key); |
270 | 0 | if (UNLIKELY(db_pair == db_key_map_.end())) { |
271 | | // In some occasional cases such as DB::Open fails, we won't |
272 | | // register ColumnFamilyInfo for a db. |
273 | 0 | return; |
274 | 0 | } |
275 | | |
276 | 0 | for (auto cf_key : db_pair->second) { |
277 | 0 | auto cf_pair = cf_info_map_.find(cf_key); |
278 | 0 | if (cf_pair != cf_info_map_.end()) { |
279 | 0 | cf_info_map_.erase(cf_pair); |
280 | 0 | } |
281 | 0 | } |
282 | 0 | db_key_map_.erase(db_key); |
283 | 0 | } |
284 | | |
285 | | #else |
286 | | |
287 | | void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/, |
288 | | uint64_t /*thread_id*/) {} |
289 | | |
290 | | void ThreadStatusUpdater::UnregisterThread() {} |
291 | | |
292 | | void ThreadStatusUpdater::ResetThreadStatus() {} |
293 | | |
294 | | void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {} |
295 | | |
296 | | void ThreadStatusUpdater::SetThreadOperation( |
297 | | const ThreadStatus::OperationType /*type*/) {} |
298 | | |
299 | | void ThreadStatusUpdater::ClearThreadOperation() {} |
300 | | |
301 | | void ThreadStatusUpdater::SetThreadState( |
302 | | const ThreadStatus::StateType /*type*/) {} |
303 | | |
304 | | void ThreadStatusUpdater::ClearThreadState() {} |
305 | | |
306 | | Status ThreadStatusUpdater::GetThreadList( |
307 | | std::vector<ThreadStatus>* /*thread_list*/) { |
308 | | return Status::NotSupported( |
309 | | "GetThreadList is not supported in the current running environment."); |
310 | | } |
311 | | |
312 | | void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/, |
313 | | const std::string& /*db_name*/, |
314 | | const void* /*cf_key*/, |
315 | | const std::string& /*cf_name*/) {} |
316 | | |
317 | | void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {} |
318 | | |
319 | | void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {} |
320 | | |
321 | | void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/, |
322 | | uint64_t /*value*/) {} |
323 | | |
324 | | void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/, |
325 | | uint64_t /*delta*/) {} |
326 | | |
327 | | #endif // !NROCKSDB_THREAD_STATUS |
328 | | } // namespace ROCKSDB_NAMESPACE |