Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/monitoring/thread_status_updater.cc
Line
Count
Source
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "monitoring/thread_status_updater.h"
7
8
#include <memory>
9
10
#include "port/likely.h"
11
#include "rocksdb/env.h"
12
#include "rocksdb/system_clock.h"
13
#include "util/mutexlock.h"
14
15
namespace ROCKSDB_NAMESPACE {
16
17
#ifndef NROCKSDB_THREAD_STATUS
18
19
thread_local ThreadStatusData* ThreadStatusUpdater::thread_status_data_ =
20
    nullptr;
21
22
void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType ttype,
23
4
                                         uint64_t thread_id) {
24
4
  if (UNLIKELY(thread_status_data_ == nullptr)) {
25
4
    thread_status_data_ = new ThreadStatusData();
26
4
    thread_status_data_->thread_type = ttype;
27
4
    thread_status_data_->thread_id = thread_id;
28
4
    std::lock_guard<std::mutex> lck(thread_list_mutex_);
29
4
    thread_data_set_.insert(thread_status_data_);
30
4
  }
31
32
4
  ClearThreadOperationProperties();
33
4
}
34
35
4
void ThreadStatusUpdater::UnregisterThread() {
36
4
  if (thread_status_data_ != nullptr) {
37
4
    std::lock_guard<std::mutex> lck(thread_list_mutex_);
38
4
    thread_data_set_.erase(thread_status_data_);
39
4
    delete thread_status_data_;
40
4
    thread_status_data_ = nullptr;
41
4
  }
42
4
}
43
44
8.15k
void ThreadStatusUpdater::ResetThreadStatus() {
45
8.15k
  ClearThreadState();
46
8.15k
  ClearThreadOperation();
47
8.15k
  SetColumnFamilyInfoKey(nullptr);
48
8.15k
}
49
50
5.26k
void ThreadStatusUpdater::SetEnableTracking(bool enable_tracking) {
51
5.26k
  auto* data = Get();
52
5.26k
  if (data == nullptr) {
53
0
    return;
54
0
  }
55
5.26k
  data->enable_tracking.store(enable_tracking, std::memory_order_relaxed);
56
5.26k
}
57
58
16.3k
void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* cf_key) {
59
16.3k
  auto* data = Get();
60
16.3k
  if (data == nullptr) {
61
0
    return;
62
0
  }
63
16.3k
  data->cf_key.store(const_cast<void*>(cf_key), std::memory_order_relaxed);
64
16.3k
}
65
66
0
const void* ThreadStatusUpdater::GetColumnFamilyInfoKey() {
67
0
  auto* data = GetLocalThreadStatus();
68
0
  if (data == nullptr) {
69
0
    return nullptr;
70
0
  }
71
0
  return data->cf_key.load(std::memory_order_relaxed);
72
0
}
73
74
void ThreadStatusUpdater::SetThreadOperation(
75
15.2k
    const ThreadStatus::OperationType type) {
76
15.2k
  auto* data = GetLocalThreadStatus();
77
15.2k
  if (data == nullptr) {
78
15.2k
    return;
79
15.2k
  }
80
  // NOTE: Our practice here is to set all the thread operation properties
81
  //       and stage before we set thread operation, and thread operation
82
  //       will be set in std::memory_order_release.  This is to ensure
83
  //       whenever a thread operation is not OP_UNKNOWN, we will always
84
  //       have a consistent information on its properties.
85
0
  data->operation_type.store(type, std::memory_order_release);
86
0
  if (type == ThreadStatus::OP_UNKNOWN) {
87
0
    data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
88
0
                                std::memory_order_relaxed);
89
0
    ClearThreadOperationProperties();
90
0
  }
91
0
}
92
93
1.83k
ThreadStatus::OperationType ThreadStatusUpdater::GetThreadOperation() {
94
1.83k
  ThreadStatusData* data = GetLocalThreadStatus();
95
1.83k
  if (data == nullptr) {
96
1.83k
    return ThreadStatus::OperationType::OP_UNKNOWN;
97
1.83k
  }
98
0
  return data->operation_type.load(std::memory_order_relaxed);
99
1.83k
}
100
101
22.3k
void ThreadStatusUpdater::SetThreadOperationProperty(int i, uint64_t value) {
102
22.3k
  auto* data = GetLocalThreadStatus();
103
22.3k
  if (data == nullptr) {
104
22.3k
    return;
105
22.3k
  }
106
0
  data->op_properties[i].store(value, std::memory_order_relaxed);
107
0
}
108
109
void ThreadStatusUpdater::IncreaseThreadOperationProperty(int i,
110
19.2k
                                                          uint64_t delta) {
111
19.2k
  auto* data = GetLocalThreadStatus();
112
19.2k
  if (data == nullptr) {
113
19.2k
    return;
114
19.2k
  }
115
18.4E
  data->op_properties[i].fetch_add(delta, std::memory_order_relaxed);
116
18.4E
}
117
118
15.2k
void ThreadStatusUpdater::SetOperationStartTime(const uint64_t start_time) {
119
15.2k
  auto* data = GetLocalThreadStatus();
120
15.2k
  if (data == nullptr) {
121
15.2k
    return;
122
15.2k
  }
123
0
  data->op_start_time.store(start_time, std::memory_order_relaxed);
124
0
}
125
126
8.15k
void ThreadStatusUpdater::ClearThreadOperation() {
127
8.15k
  auto* data = GetLocalThreadStatus();
128
8.15k
  if (data == nullptr) {
129
8.15k
    return;
130
8.15k
  }
131
0
  data->operation_stage.store(ThreadStatus::STAGE_UNKNOWN,
132
0
                              std::memory_order_relaxed);
133
0
  data->operation_type.store(ThreadStatus::OP_UNKNOWN,
134
0
                             std::memory_order_relaxed);
135
0
  ClearThreadOperationProperties();
136
0
}
137
138
4
void ThreadStatusUpdater::ClearThreadOperationProperties() {
139
4
  auto* data = GetLocalThreadStatus();
140
4
  if (data == nullptr) {
141
4
    return;
142
4
  }
143
0
  for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
144
0
    data->op_properties[i].store(0, std::memory_order_relaxed);
145
0
  }
146
0
}
147
148
ThreadStatus::OperationStage ThreadStatusUpdater::SetThreadOperationStage(
149
45.7k
    ThreadStatus::OperationStage stage) {
150
45.7k
  auto* data = GetLocalThreadStatus();
151
45.7k
  if (data == nullptr) {
152
45.7k
    return ThreadStatus::STAGE_UNKNOWN;
153
45.7k
  }
154
0
  return data->operation_stage.exchange(stage, std::memory_order_relaxed);
155
45.7k
}
156
157
0
void ThreadStatusUpdater::SetThreadState(const ThreadStatus::StateType type) {
158
0
  auto* data = GetLocalThreadStatus();
159
0
  if (data == nullptr) {
160
0
    return;
161
0
  }
162
0
  data->state_type.store(type, std::memory_order_relaxed);
163
0
}
164
165
8.15k
void ThreadStatusUpdater::ClearThreadState() {
166
8.15k
  auto* data = GetLocalThreadStatus();
167
8.15k
  if (data == nullptr) {
168
8.15k
    return;
169
8.15k
  }
170
0
  data->state_type.store(ThreadStatus::STATE_UNKNOWN,
171
0
                         std::memory_order_relaxed);
172
0
}
173
174
Status ThreadStatusUpdater::GetThreadList(
175
0
    std::vector<ThreadStatus>* thread_list) {
176
0
  thread_list->clear();
177
0
  std::vector<std::shared_ptr<ThreadStatusData>> valid_list;
178
0
  uint64_t now_micros = SystemClock::Default()->NowMicros();
179
180
0
  std::lock_guard<std::mutex> lck(thread_list_mutex_);
181
0
  for (auto* thread_data : thread_data_set_) {
182
0
    assert(thread_data);
183
0
    auto thread_id = thread_data->thread_id.load(std::memory_order_relaxed);
184
0
    auto thread_type = thread_data->thread_type.load(std::memory_order_relaxed);
185
    // Since any change to cf_info_map requires thread_list_mutex,
186
    // which is currently held by GetThreadList(), here we can safely
187
    // use "memory_order_relaxed" to load the cf_key.
188
0
    auto cf_key = thread_data->cf_key.load(std::memory_order_relaxed);
189
190
0
    ThreadStatus::OperationType op_type = ThreadStatus::OP_UNKNOWN;
191
0
    ThreadStatus::OperationStage op_stage = ThreadStatus::STAGE_UNKNOWN;
192
0
    ThreadStatus::StateType state_type = ThreadStatus::STATE_UNKNOWN;
193
0
    uint64_t op_elapsed_micros = 0;
194
0
    uint64_t op_props[ThreadStatus::kNumOperationProperties] = {0};
195
196
0
    auto iter = cf_info_map_.find(cf_key);
197
0
    if (iter != cf_info_map_.end()) {
198
0
      op_type = thread_data->operation_type.load(std::memory_order_acquire);
199
      // display lower-level info only when higher-level info is available.
200
0
      if (op_type != ThreadStatus::OP_UNKNOWN) {
201
0
        op_elapsed_micros = now_micros - thread_data->op_start_time.load(
202
0
                                             std::memory_order_relaxed);
203
0
        op_stage = thread_data->operation_stage.load(std::memory_order_relaxed);
204
0
        state_type = thread_data->state_type.load(std::memory_order_relaxed);
205
0
        for (int i = 0; i < ThreadStatus::kNumOperationProperties; ++i) {
206
0
          op_props[i] =
207
0
              thread_data->op_properties[i].load(std::memory_order_relaxed);
208
0
        }
209
0
      }
210
0
    }
211
212
0
    thread_list->emplace_back(
213
0
        thread_id, thread_type,
214
0
        iter != cf_info_map_.end() ? iter->second.db_name : "",
215
0
        iter != cf_info_map_.end() ? iter->second.cf_name : "", op_type,
216
0
        op_elapsed_micros, op_stage, op_props, state_type);
217
0
  }
218
219
0
  return Status::OK();
220
0
}
221
222
136k
ThreadStatusData* ThreadStatusUpdater::GetLocalThreadStatus() {
223
136k
  if (thread_status_data_ == nullptr) {
224
0
    return nullptr;
225
0
  }
226
136k
  if (!thread_status_data_->enable_tracking.load(std::memory_order_relaxed)) {
227
136k
    return nullptr;
228
136k
  }
229
0
  return thread_status_data_;
230
136k
}
231
232
void ThreadStatusUpdater::NewColumnFamilyInfo(const void* db_key,
233
                                              const std::string& db_name,
234
                                              const void* cf_key,
235
0
                                              const std::string& cf_name) {
236
  // Acquiring same lock as GetThreadList() to guarantee
237
  // a consistent view of global column family table (cf_info_map).
238
0
  std::lock_guard<std::mutex> lck(thread_list_mutex_);
239
240
0
  cf_info_map_.emplace(std::piecewise_construct, std::make_tuple(cf_key),
241
0
                       std::make_tuple(db_key, db_name, cf_name));
242
0
  db_key_map_[db_key].insert(cf_key);
243
0
}
244
245
0
void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* cf_key) {
246
  // Acquiring same lock as GetThreadList() to guarantee
247
  // a consistent view of global column family table (cf_info_map).
248
0
  std::lock_guard<std::mutex> lck(thread_list_mutex_);
249
250
0
  auto cf_pair = cf_info_map_.find(cf_key);
251
0
  if (cf_pair != cf_info_map_.end()) {
252
    // Remove its entry from db_key_map_ by the following steps:
253
    // 1. Obtain the entry in db_key_map_ whose set contains cf_key
254
    // 2. Remove it from the set.
255
0
    ConstantColumnFamilyInfo& cf_info = cf_pair->second;
256
0
    auto db_pair = db_key_map_.find(cf_info.db_key);
257
0
    assert(db_pair != db_key_map_.end());
258
0
    size_t result __attribute__((__unused__));
259
0
    result = db_pair->second.erase(cf_key);
260
0
    assert(result);
261
0
    cf_info_map_.erase(cf_pair);
262
0
  }
263
0
}
264
265
0
void ThreadStatusUpdater::EraseDatabaseInfo(const void* db_key) {
266
  // Acquiring same lock as GetThreadList() to guarantee
267
  // a consistent view of global column family table (cf_info_map).
268
0
  std::lock_guard<std::mutex> lck(thread_list_mutex_);
269
0
  auto db_pair = db_key_map_.find(db_key);
270
0
  if (UNLIKELY(db_pair == db_key_map_.end())) {
271
    // In some occasional cases such as DB::Open fails, we won't
272
    // register ColumnFamilyInfo for a db.
273
0
    return;
274
0
  }
275
276
0
  for (auto cf_key : db_pair->second) {
277
0
    auto cf_pair = cf_info_map_.find(cf_key);
278
0
    if (cf_pair != cf_info_map_.end()) {
279
0
      cf_info_map_.erase(cf_pair);
280
0
    }
281
0
  }
282
0
  db_key_map_.erase(db_key);
283
0
}
284
285
#else
286
287
void ThreadStatusUpdater::RegisterThread(ThreadStatus::ThreadType /*ttype*/,
288
                                         uint64_t /*thread_id*/) {}
289
290
void ThreadStatusUpdater::UnregisterThread() {}
291
292
void ThreadStatusUpdater::ResetThreadStatus() {}
293
294
void ThreadStatusUpdater::SetColumnFamilyInfoKey(const void* /*cf_key*/) {}
295
296
void ThreadStatusUpdater::SetThreadOperation(
297
    const ThreadStatus::OperationType /*type*/) {}
298
299
void ThreadStatusUpdater::ClearThreadOperation() {}
300
301
void ThreadStatusUpdater::SetThreadState(
302
    const ThreadStatus::StateType /*type*/) {}
303
304
void ThreadStatusUpdater::ClearThreadState() {}
305
306
Status ThreadStatusUpdater::GetThreadList(
307
    std::vector<ThreadStatus>* /*thread_list*/) {
308
  return Status::NotSupported(
309
      "GetThreadList is not supported in the current running environment.");
310
}
311
312
void ThreadStatusUpdater::NewColumnFamilyInfo(const void* /*db_key*/,
313
                                              const std::string& /*db_name*/,
314
                                              const void* /*cf_key*/,
315
                                              const std::string& /*cf_name*/) {}
316
317
void ThreadStatusUpdater::EraseColumnFamilyInfo(const void* /*cf_key*/) {}
318
319
void ThreadStatusUpdater::EraseDatabaseInfo(const void* /*db_key*/) {}
320
321
void ThreadStatusUpdater::SetThreadOperationProperty(int /*i*/,
322
                                                     uint64_t /*value*/) {}
323
324
void ThreadStatusUpdater::IncreaseThreadOperationProperty(int /*i*/,
325
                                                          uint64_t /*delta*/) {}
326
327
#endif  // !NROCKSDB_THREAD_STATUS
328
}  // namespace ROCKSDB_NAMESPACE