Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/event_helpers.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/event_helpers.h"
7
8
#include "rocksdb/convenience.h"
9
#include "rocksdb/listener.h"
10
#include "rocksdb/utilities/customizable_util.h"
11
12
namespace ROCKSDB_NAMESPACE {
13
Status EventListener::CreateFromString(const ConfigOptions& config_options,
14
                                       const std::string& id,
15
0
                                       std::shared_ptr<EventListener>* result) {
16
0
  return LoadSharedObject<EventListener>(config_options, id, result);
17
0
}
18
19
namespace {
20
template <class T>
21
36.7k
inline T SafeDivide(T a, T b) {
22
36.7k
  return b == 0 ? 0 : a / b;
23
36.7k
}
24
}  // anonymous namespace
25
26
24.5k
void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
27
24.5k
  *jwriter << "time_micros"
28
24.5k
           << std::chrono::duration_cast<std::chrono::microseconds>(
29
24.5k
                  std::chrono::system_clock::now().time_since_epoch())
30
24.5k
                  .count();
31
24.5k
}
32
33
void EventHelpers::NotifyTableFileCreationStarted(
34
    const std::vector<std::shared_ptr<EventListener>>& listeners,
35
    const std::string& db_name, const std::string& cf_name,
36
18.3k
    const std::string& file_path, int job_id, TableFileCreationReason reason) {
37
18.3k
  if (listeners.empty()) {
38
18.3k
    return;
39
18.3k
  }
40
0
  TableFileCreationBriefInfo info;
41
0
  info.db_name = db_name;
42
0
  info.cf_name = cf_name;
43
0
  info.file_path = file_path;
44
0
  info.job_id = job_id;
45
0
  info.reason = reason;
46
0
  for (auto& listener : listeners) {
47
0
    listener->OnTableFileCreationStarted(info);
48
0
  }
49
0
}
50
51
void EventHelpers::NotifyOnBackgroundError(
52
    const std::vector<std::shared_ptr<EventListener>>& listeners,
53
    BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
54
0
    bool* auto_recovery) {
55
0
  if (listeners.empty()) {
56
0
    return;
57
0
  }
58
0
  db_mutex->AssertHeld();
59
  // release lock while notifying events
60
0
  db_mutex->Unlock();
61
0
  for (auto& listener : listeners) {
62
0
    listener->OnBackgroundError(reason, bg_error);
63
0
    bg_error->PermitUncheckedError();
64
0
    if (*auto_recovery) {
65
0
      listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
66
0
    }
67
0
  }
68
0
  db_mutex->Lock();
69
0
}
70
71
void EventHelpers::LogAndNotifyTableFileCreationFinished(
72
    EventLogger* event_logger,
73
    const std::vector<std::shared_ptr<EventListener>>& listeners,
74
    const std::string& db_name, const std::string& cf_name,
75
    const std::string& file_path, int job_id, const FileDescriptor& fd,
76
    uint64_t oldest_blob_file_number, const TableProperties& table_properties,
77
    TableFileCreationReason reason, const Status& s,
78
    const std::string& file_checksum,
79
18.3k
    const std::string& file_checksum_func_name) {
80
18.3k
  if (!event_logger && listeners.empty()) {
81
0
    s.PermitUncheckedError();
82
0
    return;
83
0
  }
84
85
18.3k
  if (event_logger) {
86
18.3k
    JSONWriter jwriter;
87
18.3k
    AppendCurrentTime(&jwriter);
88
18.3k
    jwriter << "cf_name" << cf_name << "job" << job_id << "event"
89
18.3k
            << "table_file_creation" << "file_number" << fd.GetNumber()
90
18.3k
            << "file_size" << fd.GetFileSize() << "file_checksum"
91
18.3k
            << Slice(file_checksum).ToString(true) << "file_checksum_func_name"
92
18.3k
            << file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
93
18.3k
            << "largest_seqno" << fd.largest_seqno;
94
95
    // table_properties
96
18.3k
    {
97
18.3k
      jwriter << "table_properties";
98
18.3k
      jwriter.StartObject();
99
100
      // basic properties:
101
18.3k
      jwriter << "data_size" << table_properties.data_size << "index_size"
102
18.3k
              << table_properties.index_size << "index_partitions"
103
18.3k
              << table_properties.index_partitions << "top_level_index_size"
104
18.3k
              << table_properties.top_level_index_size
105
18.3k
              << "index_key_is_user_key"
106
18.3k
              << table_properties.index_key_is_user_key
107
18.3k
              << "index_value_is_delta_encoded"
108
18.3k
              << table_properties.index_value_is_delta_encoded << "filter_size"
109
18.3k
              << table_properties.filter_size << "raw_key_size"
110
18.3k
              << table_properties.raw_key_size << "raw_average_key_size"
111
18.3k
              << SafeDivide(table_properties.raw_key_size,
112
18.3k
                            table_properties.num_entries)
113
18.3k
              << "raw_value_size" << table_properties.raw_value_size
114
18.3k
              << "raw_average_value_size"
115
18.3k
              << SafeDivide(table_properties.raw_value_size,
116
18.3k
                            table_properties.num_entries)
117
18.3k
              << "num_data_blocks" << table_properties.num_data_blocks
118
18.3k
              << "num_entries" << table_properties.num_entries
119
18.3k
              << "num_filter_entries" << table_properties.num_filter_entries
120
18.3k
              << "num_deletions" << table_properties.num_deletions
121
18.3k
              << "num_merge_operands" << table_properties.num_merge_operands
122
18.3k
              << "num_range_deletions" << table_properties.num_range_deletions
123
18.3k
              << "format_version" << table_properties.format_version
124
18.3k
              << "fixed_key_len" << table_properties.fixed_key_len
125
18.3k
              << "filter_policy" << table_properties.filter_policy_name
126
18.3k
              << "column_family_name" << table_properties.column_family_name
127
18.3k
              << "column_family_id" << table_properties.column_family_id
128
18.3k
              << "comparator" << table_properties.comparator_name
129
18.3k
              << "user_defined_timestamps_persisted"
130
18.3k
              << table_properties.user_defined_timestamps_persisted
131
18.3k
              << "key_largest_seqno" << table_properties.key_largest_seqno
132
18.3k
              << "key_smallest_seqno" << table_properties.key_smallest_seqno
133
18.3k
              << "merge_operator" << table_properties.merge_operator_name
134
18.3k
              << "prefix_extractor_name"
135
18.3k
              << table_properties.prefix_extractor_name << "property_collectors"
136
18.3k
              << table_properties.property_collectors_names << "compression"
137
18.3k
              << table_properties.compression_name << "compression_options"
138
18.3k
              << table_properties.compression_options << "creation_time"
139
18.3k
              << table_properties.creation_time << "oldest_key_time"
140
18.3k
              << table_properties.newest_key_time << "newest_key_time"
141
18.3k
              << table_properties.oldest_key_time << "file_creation_time"
142
18.3k
              << table_properties.file_creation_time
143
18.3k
              << "slow_compression_estimated_data_size"
144
18.3k
              << table_properties.slow_compression_estimated_data_size
145
18.3k
              << "fast_compression_estimated_data_size"
146
18.3k
              << table_properties.fast_compression_estimated_data_size
147
18.3k
              << "db_id" << table_properties.db_id << "db_session_id"
148
18.3k
              << table_properties.db_session_id << "orig_file_number"
149
18.3k
              << table_properties.orig_file_number << "seqno_to_time_mapping";
150
151
18.3k
      if (table_properties.seqno_to_time_mapping.empty()) {
152
18.3k
        jwriter << "N/A";
153
18.3k
      } else {
154
0
        SeqnoToTimeMapping tmp;
155
0
        Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
156
0
        if (status.ok()) {
157
0
          jwriter << tmp.ToHumanString();
158
0
        } else {
159
0
          jwriter << "Invalid";
160
0
        }
161
0
      }
162
163
      // user collected properties
164
18.3k
      for (const auto& prop : table_properties.readable_properties) {
165
0
        jwriter << prop.first << prop.second;
166
0
      }
167
18.3k
      jwriter.EndObject();
168
18.3k
    }
169
170
18.3k
    if (oldest_blob_file_number != kInvalidBlobFileNumber) {
171
0
      jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
172
0
    }
173
174
18.3k
    jwriter << "status" << s.ToString();
175
176
18.3k
    jwriter.EndObject();
177
178
18.3k
    event_logger->Log(jwriter);
179
18.3k
  }
180
181
18.3k
  if (listeners.empty()) {
182
18.3k
    return;
183
18.3k
  }
184
0
  TableFileCreationInfo info;
185
0
  info.db_name = db_name;
186
0
  info.cf_name = cf_name;
187
0
  info.file_path = file_path;
188
0
  info.file_size = fd.file_size;
189
0
  info.job_id = job_id;
190
0
  info.table_properties = table_properties;
191
0
  info.reason = reason;
192
0
  info.status = s;
193
0
  info.file_checksum = file_checksum;
194
0
  info.file_checksum_func_name = file_checksum_func_name;
195
0
  for (auto& listener : listeners) {
196
0
    listener->OnTableFileCreated(info);
197
0
  }
198
0
  info.status.PermitUncheckedError();
199
0
}
200
201
void EventHelpers::LogAndNotifyTableFileDeletion(
202
    EventLogger* event_logger, int job_id, uint64_t file_number,
203
    const std::string& file_path, const Status& status,
204
    const std::string& dbname,
205
6.14k
    const std::vector<std::shared_ptr<EventListener>>& listeners) {
206
6.14k
  if (!event_logger && listeners.empty()) {
207
0
    status.PermitUncheckedError();
208
0
    return;
209
0
  }
210
211
6.14k
  if (event_logger) {
212
6.14k
    JSONWriter jwriter;
213
6.14k
    AppendCurrentTime(&jwriter);
214
215
6.14k
    jwriter << "job" << job_id << "event" << "table_file_deletion"
216
6.14k
            << "file_number" << file_number << "status" << status.ToString();
217
218
6.14k
    jwriter.EndObject();
219
220
6.14k
    event_logger->Log(jwriter);
221
6.14k
  }
222
223
6.14k
  if (listeners.empty()) {
224
6.14k
    return;
225
6.14k
  }
226
0
  TableFileDeletionInfo info;
227
0
  info.db_name = dbname;
228
0
  info.job_id = job_id;
229
0
  info.file_path = file_path;
230
0
  info.status = status;
231
0
  for (auto& listener : listeners) {
232
0
    listener->OnTableFileDeleted(info);
233
0
  }
234
0
  info.status.PermitUncheckedError();
235
0
}
236
237
void EventHelpers::NotifyOnErrorRecoveryEnd(
238
    const std::vector<std::shared_ptr<EventListener>>& listeners,
239
    const Status& old_bg_error, const Status& new_bg_error,
240
0
    InstrumentedMutex* db_mutex) {
241
0
  if (!listeners.empty()) {
242
0
    db_mutex->AssertHeld();
243
    // Make copies before releasing mutex to avoid race.
244
0
    Status old_bg_error_cp = old_bg_error;
245
0
    Status new_bg_error_cp = new_bg_error;
246
    // release lock while notifying events
247
0
    db_mutex->Unlock();
248
0
    TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1");
249
0
    TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2");
250
0
    for (auto& listener : listeners) {
251
0
      BackgroundErrorRecoveryInfo info;
252
0
      info.old_bg_error = old_bg_error_cp;
253
0
      info.new_bg_error = new_bg_error_cp;
254
0
      listener->OnErrorRecoveryCompleted(old_bg_error_cp);
255
0
      listener->OnErrorRecoveryEnd(info);
256
0
      info.old_bg_error.PermitUncheckedError();
257
0
      info.new_bg_error.PermitUncheckedError();
258
0
    }
259
0
    db_mutex->Lock();
260
0
  } else {
261
0
    old_bg_error.PermitUncheckedError();
262
0
    new_bg_error.PermitUncheckedError();
263
0
  }
264
0
}
265
266
void EventHelpers::NotifyBlobFileCreationStarted(
267
    const std::vector<std::shared_ptr<EventListener>>& listeners,
268
    const std::string& db_name, const std::string& cf_name,
269
    const std::string& file_path, int job_id,
270
0
    BlobFileCreationReason creation_reason) {
271
0
  if (listeners.empty()) {
272
0
    return;
273
0
  }
274
0
  BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
275
0
                                 creation_reason);
276
0
  for (const auto& listener : listeners) {
277
0
    listener->OnBlobFileCreationStarted(info);
278
0
  }
279
0
}
280
281
void EventHelpers::LogAndNotifyBlobFileCreationFinished(
282
    EventLogger* event_logger,
283
    const std::vector<std::shared_ptr<EventListener>>& listeners,
284
    const std::string& db_name, const std::string& cf_name,
285
    const std::string& file_path, int job_id, uint64_t file_number,
286
    BlobFileCreationReason creation_reason, const Status& s,
287
    const std::string& file_checksum,
288
    const std::string& file_checksum_func_name, uint64_t total_blob_count,
289
0
    uint64_t total_blob_bytes) {
290
0
  if (!event_logger && listeners.empty()) {
291
0
    s.PermitUncheckedError();
292
0
    return;
293
0
  }
294
295
0
  if (event_logger) {
296
0
    JSONWriter jwriter;
297
0
    AppendCurrentTime(&jwriter);
298
0
    jwriter << "cf_name" << cf_name << "job" << job_id << "event"
299
0
            << "blob_file_creation" << "file_number" << file_number
300
0
            << "total_blob_count" << total_blob_count << "total_blob_bytes"
301
0
            << total_blob_bytes << "file_checksum" << file_checksum
302
0
            << "file_checksum_func_name" << file_checksum_func_name << "status"
303
0
            << s.ToString();
304
305
0
    jwriter.EndObject();
306
0
    event_logger->Log(jwriter);
307
0
  }
308
309
0
  if (listeners.empty()) {
310
0
    return;
311
0
  }
312
0
  BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
313
0
                            creation_reason, total_blob_count, total_blob_bytes,
314
0
                            s, file_checksum, file_checksum_func_name);
315
0
  for (const auto& listener : listeners) {
316
0
    listener->OnBlobFileCreated(info);
317
0
  }
318
0
  info.status.PermitUncheckedError();
319
0
}
320
321
void EventHelpers::LogAndNotifyBlobFileDeletion(
322
    EventLogger* event_logger,
323
    const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
324
    uint64_t file_number, const std::string& file_path, const Status& status,
325
0
    const std::string& dbname) {
326
0
  if (!event_logger && listeners.empty()) {
327
0
    status.PermitUncheckedError();
328
0
    return;
329
0
  }
330
331
0
  if (event_logger) {
332
0
    JSONWriter jwriter;
333
0
    AppendCurrentTime(&jwriter);
334
335
0
    jwriter << "job" << job_id << "event" << "blob_file_deletion"
336
0
            << "file_number" << file_number << "status" << status.ToString();
337
338
0
    jwriter.EndObject();
339
0
    event_logger->Log(jwriter);
340
0
  }
341
0
  if (listeners.empty()) {
342
0
    return;
343
0
  }
344
0
  BlobFileDeletionInfo info(dbname, file_path, job_id, status);
345
0
  for (const auto& listener : listeners) {
346
0
    listener->OnBlobFileDeleted(info);
347
0
  }
348
0
  info.status.PermitUncheckedError();
349
0
}
350
351
}  // namespace ROCKSDB_NAMESPACE