Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/db/event_helpers.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/event_helpers.h"
7
8
#include "rocksdb/convenience.h"
9
#include "rocksdb/listener.h"
10
#include "rocksdb/utilities/customizable_util.h"
11
12
namespace ROCKSDB_NAMESPACE {
13
Status EventListener::CreateFromString(const ConfigOptions& config_options,
14
                                       const std::string& id,
15
0
                                       std::shared_ptr<EventListener>* result) {
16
0
  return LoadSharedObject<EventListener>(config_options, id, result);
17
0
}
18
19
namespace {
20
template <class T>
21
8.96k
inline T SafeDivide(T a, T b) {
22
8.96k
  return b == 0 ? 0 : a / b;
23
8.96k
}
24
}  // anonymous namespace
25
26
4.48k
void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) {
27
4.48k
  *jwriter << "time_micros"
28
4.48k
           << std::chrono::duration_cast<std::chrono::microseconds>(
29
4.48k
                  std::chrono::system_clock::now().time_since_epoch())
30
4.48k
                  .count();
31
4.48k
}
32
33
void EventHelpers::NotifyTableFileCreationStarted(
34
    const std::vector<std::shared_ptr<EventListener>>& listeners,
35
    const std::string& db_name, const std::string& cf_name,
36
4.48k
    const std::string& file_path, int job_id, TableFileCreationReason reason) {
37
4.48k
  if (listeners.empty()) {
38
4.48k
    return;
39
4.48k
  }
40
0
  TableFileCreationBriefInfo info;
41
0
  info.db_name = db_name;
42
0
  info.cf_name = cf_name;
43
0
  info.file_path = file_path;
44
0
  info.job_id = job_id;
45
0
  info.reason = reason;
46
0
  for (auto& listener : listeners) {
47
0
    listener->OnTableFileCreationStarted(info);
48
0
  }
49
0
}
50
51
void EventHelpers::NotifyOnBackgroundError(
52
    const std::vector<std::shared_ptr<EventListener>>& listeners,
53
    BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex,
54
0
    bool* auto_recovery) {
55
0
  if (listeners.empty()) {
56
0
    return;
57
0
  }
58
0
  db_mutex->AssertHeld();
59
  // release lock while notifying events
60
0
  db_mutex->Unlock();
61
0
  for (auto& listener : listeners) {
62
0
    listener->OnBackgroundError(reason, bg_error);
63
0
    bg_error->PermitUncheckedError();
64
0
    if (*auto_recovery) {
65
0
      listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery);
66
0
    }
67
0
  }
68
0
  db_mutex->Lock();
69
0
}
70
71
void EventHelpers::LogAndNotifyTableFileCreationFinished(
72
    EventLogger* event_logger,
73
    const std::vector<std::shared_ptr<EventListener>>& listeners,
74
    const std::string& db_name, const std::string& cf_name,
75
    const std::string& file_path, int job_id, const FileDescriptor& fd,
76
    uint64_t oldest_blob_file_number, const TableProperties& table_properties,
77
    TableFileCreationReason reason, const Status& s,
78
    const std::string& file_checksum,
79
4.48k
    const std::string& file_checksum_func_name) {
80
4.48k
  if (s.ok() && event_logger) {
81
4.48k
    JSONWriter jwriter;
82
4.48k
    AppendCurrentTime(&jwriter);
83
4.48k
    jwriter << "cf_name" << cf_name << "job" << job_id << "event"
84
4.48k
            << "table_file_creation"
85
4.48k
            << "file_number" << fd.GetNumber() << "file_size"
86
4.48k
            << fd.GetFileSize() << "file_checksum"
87
4.48k
            << Slice(file_checksum).ToString(true) << "file_checksum_func_name"
88
4.48k
            << file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno
89
4.48k
            << "largest_seqno" << fd.largest_seqno;
90
91
    // table_properties
92
4.48k
    {
93
4.48k
      jwriter << "table_properties";
94
4.48k
      jwriter.StartObject();
95
96
      // basic properties:
97
4.48k
      jwriter << "data_size" << table_properties.data_size << "index_size"
98
4.48k
              << table_properties.index_size << "index_partitions"
99
4.48k
              << table_properties.index_partitions << "top_level_index_size"
100
4.48k
              << table_properties.top_level_index_size
101
4.48k
              << "index_key_is_user_key"
102
4.48k
              << table_properties.index_key_is_user_key
103
4.48k
              << "index_value_is_delta_encoded"
104
4.48k
              << table_properties.index_value_is_delta_encoded << "filter_size"
105
4.48k
              << table_properties.filter_size << "raw_key_size"
106
4.48k
              << table_properties.raw_key_size << "raw_average_key_size"
107
4.48k
              << SafeDivide(table_properties.raw_key_size,
108
4.48k
                            table_properties.num_entries)
109
4.48k
              << "raw_value_size" << table_properties.raw_value_size
110
4.48k
              << "raw_average_value_size"
111
4.48k
              << SafeDivide(table_properties.raw_value_size,
112
4.48k
                            table_properties.num_entries)
113
4.48k
              << "num_data_blocks" << table_properties.num_data_blocks
114
4.48k
              << "num_entries" << table_properties.num_entries
115
4.48k
              << "num_filter_entries" << table_properties.num_filter_entries
116
4.48k
              << "num_deletions" << table_properties.num_deletions
117
4.48k
              << "num_merge_operands" << table_properties.num_merge_operands
118
4.48k
              << "num_range_deletions" << table_properties.num_range_deletions
119
4.48k
              << "format_version" << table_properties.format_version
120
4.48k
              << "fixed_key_len" << table_properties.fixed_key_len
121
4.48k
              << "filter_policy" << table_properties.filter_policy_name
122
4.48k
              << "column_family_name" << table_properties.column_family_name
123
4.48k
              << "column_family_id" << table_properties.column_family_id
124
4.48k
              << "comparator" << table_properties.comparator_name
125
4.48k
              << "user_defined_timestamps_persisted"
126
4.48k
              << table_properties.user_defined_timestamps_persisted
127
4.48k
              << "merge_operator" << table_properties.merge_operator_name
128
4.48k
              << "prefix_extractor_name"
129
4.48k
              << table_properties.prefix_extractor_name << "property_collectors"
130
4.48k
              << table_properties.property_collectors_names << "compression"
131
4.48k
              << table_properties.compression_name << "compression_options"
132
4.48k
              << table_properties.compression_options << "creation_time"
133
4.48k
              << table_properties.creation_time << "oldest_key_time"
134
4.48k
              << table_properties.oldest_key_time << "file_creation_time"
135
4.48k
              << table_properties.file_creation_time
136
4.48k
              << "slow_compression_estimated_data_size"
137
4.48k
              << table_properties.slow_compression_estimated_data_size
138
4.48k
              << "fast_compression_estimated_data_size"
139
4.48k
              << table_properties.fast_compression_estimated_data_size
140
4.48k
              << "db_id" << table_properties.db_id << "db_session_id"
141
4.48k
              << table_properties.db_session_id << "orig_file_number"
142
4.48k
              << table_properties.orig_file_number << "seqno_to_time_mapping";
143
144
4.48k
      if (table_properties.seqno_to_time_mapping.empty()) {
145
4.48k
        jwriter << "N/A";
146
4.48k
      } else {
147
0
        SeqnoToTimeMapping tmp;
148
0
        Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping);
149
0
        if (status.ok()) {
150
0
          jwriter << tmp.ToHumanString();
151
0
        } else {
152
0
          jwriter << "Invalid";
153
0
        }
154
0
      }
155
156
      // user collected properties
157
4.48k
      for (const auto& prop : table_properties.readable_properties) {
158
0
        jwriter << prop.first << prop.second;
159
0
      }
160
4.48k
      jwriter.EndObject();
161
4.48k
    }
162
163
4.48k
    if (oldest_blob_file_number != kInvalidBlobFileNumber) {
164
0
      jwriter << "oldest_blob_file_number" << oldest_blob_file_number;
165
0
    }
166
167
4.48k
    jwriter.EndObject();
168
169
4.48k
    event_logger->Log(jwriter);
170
4.48k
  }
171
172
4.48k
  if (listeners.empty()) {
173
4.48k
    return;
174
4.48k
  }
175
0
  TableFileCreationInfo info;
176
0
  info.db_name = db_name;
177
0
  info.cf_name = cf_name;
178
0
  info.file_path = file_path;
179
0
  info.file_size = fd.file_size;
180
0
  info.job_id = job_id;
181
0
  info.table_properties = table_properties;
182
0
  info.reason = reason;
183
0
  info.status = s;
184
0
  info.file_checksum = file_checksum;
185
0
  info.file_checksum_func_name = file_checksum_func_name;
186
0
  for (auto& listener : listeners) {
187
0
    listener->OnTableFileCreated(info);
188
0
  }
189
0
  info.status.PermitUncheckedError();
190
0
}
191
192
void EventHelpers::LogAndNotifyTableFileDeletion(
193
    EventLogger* event_logger, int job_id, uint64_t file_number,
194
    const std::string& file_path, const Status& status,
195
    const std::string& dbname,
196
0
    const std::vector<std::shared_ptr<EventListener>>& listeners) {
197
0
  JSONWriter jwriter;
198
0
  AppendCurrentTime(&jwriter);
199
200
0
  jwriter << "job" << job_id << "event"
201
0
          << "table_file_deletion"
202
0
          << "file_number" << file_number;
203
0
  if (!status.ok()) {
204
0
    jwriter << "status" << status.ToString();
205
0
  }
206
207
0
  jwriter.EndObject();
208
209
0
  event_logger->Log(jwriter);
210
211
0
  if (listeners.empty()) {
212
0
    return;
213
0
  }
214
0
  TableFileDeletionInfo info;
215
0
  info.db_name = dbname;
216
0
  info.job_id = job_id;
217
0
  info.file_path = file_path;
218
0
  info.status = status;
219
0
  for (auto& listener : listeners) {
220
0
    listener->OnTableFileDeleted(info);
221
0
  }
222
0
  info.status.PermitUncheckedError();
223
0
}
224
225
void EventHelpers::NotifyOnErrorRecoveryEnd(
226
    const std::vector<std::shared_ptr<EventListener>>& listeners,
227
    const Status& old_bg_error, const Status& new_bg_error,
228
0
    InstrumentedMutex* db_mutex) {
229
0
  if (!listeners.empty()) {
230
0
    db_mutex->AssertHeld();
231
    // Make copies before releasing mutex to avoid race.
232
0
    Status old_bg_error_cp = old_bg_error;
233
0
    Status new_bg_error_cp = new_bg_error;
234
    // release lock while notifying events
235
0
    db_mutex->Unlock();
236
0
    TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1");
237
0
    TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2");
238
0
    for (auto& listener : listeners) {
239
0
      BackgroundErrorRecoveryInfo info;
240
0
      info.old_bg_error = old_bg_error_cp;
241
0
      info.new_bg_error = new_bg_error_cp;
242
0
      listener->OnErrorRecoveryCompleted(old_bg_error_cp);
243
0
      listener->OnErrorRecoveryEnd(info);
244
0
      info.old_bg_error.PermitUncheckedError();
245
0
      info.new_bg_error.PermitUncheckedError();
246
0
    }
247
0
    db_mutex->Lock();
248
0
  } else {
249
0
    old_bg_error.PermitUncheckedError();
250
0
  }
251
0
}
252
253
void EventHelpers::NotifyBlobFileCreationStarted(
254
    const std::vector<std::shared_ptr<EventListener>>& listeners,
255
    const std::string& db_name, const std::string& cf_name,
256
    const std::string& file_path, int job_id,
257
0
    BlobFileCreationReason creation_reason) {
258
0
  if (listeners.empty()) {
259
0
    return;
260
0
  }
261
0
  BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id,
262
0
                                 creation_reason);
263
0
  for (const auto& listener : listeners) {
264
0
    listener->OnBlobFileCreationStarted(info);
265
0
  }
266
0
}
267
268
void EventHelpers::LogAndNotifyBlobFileCreationFinished(
269
    EventLogger* event_logger,
270
    const std::vector<std::shared_ptr<EventListener>>& listeners,
271
    const std::string& db_name, const std::string& cf_name,
272
    const std::string& file_path, int job_id, uint64_t file_number,
273
    BlobFileCreationReason creation_reason, const Status& s,
274
    const std::string& file_checksum,
275
    const std::string& file_checksum_func_name, uint64_t total_blob_count,
276
0
    uint64_t total_blob_bytes) {
277
0
  if (s.ok() && event_logger) {
278
0
    JSONWriter jwriter;
279
0
    AppendCurrentTime(&jwriter);
280
0
    jwriter << "cf_name" << cf_name << "job" << job_id << "event"
281
0
            << "blob_file_creation"
282
0
            << "file_number" << file_number << "total_blob_count"
283
0
            << total_blob_count << "total_blob_bytes" << total_blob_bytes
284
0
            << "file_checksum" << file_checksum << "file_checksum_func_name"
285
0
            << file_checksum_func_name << "status" << s.ToString();
286
287
0
    jwriter.EndObject();
288
0
    event_logger->Log(jwriter);
289
0
  }
290
291
0
  if (listeners.empty()) {
292
0
    return;
293
0
  }
294
0
  BlobFileCreationInfo info(db_name, cf_name, file_path, job_id,
295
0
                            creation_reason, total_blob_count, total_blob_bytes,
296
0
                            s, file_checksum, file_checksum_func_name);
297
0
  for (const auto& listener : listeners) {
298
0
    listener->OnBlobFileCreated(info);
299
0
  }
300
0
  info.status.PermitUncheckedError();
301
0
}
302
303
void EventHelpers::LogAndNotifyBlobFileDeletion(
304
    EventLogger* event_logger,
305
    const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id,
306
    uint64_t file_number, const std::string& file_path, const Status& status,
307
0
    const std::string& dbname) {
308
0
  if (event_logger) {
309
0
    JSONWriter jwriter;
310
0
    AppendCurrentTime(&jwriter);
311
312
0
    jwriter << "job" << job_id << "event"
313
0
            << "blob_file_deletion"
314
0
            << "file_number" << file_number;
315
0
    if (!status.ok()) {
316
0
      jwriter << "status" << status.ToString();
317
0
    }
318
319
0
    jwriter.EndObject();
320
0
    event_logger->Log(jwriter);
321
0
  }
322
0
  if (listeners.empty()) {
323
0
    return;
324
0
  }
325
0
  BlobFileDeletionInfo info(dbname, file_path, job_id, status);
326
0
  for (const auto& listener : listeners) {
327
0
    listener->OnBlobFileDeleted(info);
328
0
  }
329
0
  info.status.PermitUncheckedError();
330
0
}
331
332
}  // namespace ROCKSDB_NAMESPACE