/src/rocksdb/db/event_helpers.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/event_helpers.h" |
7 | | |
8 | | #include "rocksdb/convenience.h" |
9 | | #include "rocksdb/listener.h" |
10 | | #include "rocksdb/utilities/customizable_util.h" |
11 | | |
12 | | namespace ROCKSDB_NAMESPACE { |
13 | | Status EventListener::CreateFromString(const ConfigOptions& config_options, |
14 | | const std::string& id, |
15 | 0 | std::shared_ptr<EventListener>* result) { |
16 | 0 | return LoadSharedObject<EventListener>(config_options, id, result); |
17 | 0 | } |
18 | | |
19 | | namespace { |
20 | | template <class T> |
21 | 8.96k | inline T SafeDivide(T a, T b) { |
22 | 8.96k | return b == 0 ? 0 : a / b; |
23 | 8.96k | } |
24 | | } // anonymous namespace |
25 | | |
26 | 4.48k | void EventHelpers::AppendCurrentTime(JSONWriter* jwriter) { |
27 | 4.48k | *jwriter << "time_micros" |
28 | 4.48k | << std::chrono::duration_cast<std::chrono::microseconds>( |
29 | 4.48k | std::chrono::system_clock::now().time_since_epoch()) |
30 | 4.48k | .count(); |
31 | 4.48k | } |
32 | | |
33 | | void EventHelpers::NotifyTableFileCreationStarted( |
34 | | const std::vector<std::shared_ptr<EventListener>>& listeners, |
35 | | const std::string& db_name, const std::string& cf_name, |
36 | 4.48k | const std::string& file_path, int job_id, TableFileCreationReason reason) { |
37 | 4.48k | if (listeners.empty()) { |
38 | 4.48k | return; |
39 | 4.48k | } |
40 | 0 | TableFileCreationBriefInfo info; |
41 | 0 | info.db_name = db_name; |
42 | 0 | info.cf_name = cf_name; |
43 | 0 | info.file_path = file_path; |
44 | 0 | info.job_id = job_id; |
45 | 0 | info.reason = reason; |
46 | 0 | for (auto& listener : listeners) { |
47 | 0 | listener->OnTableFileCreationStarted(info); |
48 | 0 | } |
49 | 0 | } |
50 | | |
51 | | void EventHelpers::NotifyOnBackgroundError( |
52 | | const std::vector<std::shared_ptr<EventListener>>& listeners, |
53 | | BackgroundErrorReason reason, Status* bg_error, InstrumentedMutex* db_mutex, |
54 | 0 | bool* auto_recovery) { |
55 | 0 | if (listeners.empty()) { |
56 | 0 | return; |
57 | 0 | } |
58 | 0 | db_mutex->AssertHeld(); |
59 | | // release lock while notifying events |
60 | 0 | db_mutex->Unlock(); |
61 | 0 | for (auto& listener : listeners) { |
62 | 0 | listener->OnBackgroundError(reason, bg_error); |
63 | 0 | bg_error->PermitUncheckedError(); |
64 | 0 | if (*auto_recovery) { |
65 | 0 | listener->OnErrorRecoveryBegin(reason, *bg_error, auto_recovery); |
66 | 0 | } |
67 | 0 | } |
68 | 0 | db_mutex->Lock(); |
69 | 0 | } |
70 | | |
71 | | void EventHelpers::LogAndNotifyTableFileCreationFinished( |
72 | | EventLogger* event_logger, |
73 | | const std::vector<std::shared_ptr<EventListener>>& listeners, |
74 | | const std::string& db_name, const std::string& cf_name, |
75 | | const std::string& file_path, int job_id, const FileDescriptor& fd, |
76 | | uint64_t oldest_blob_file_number, const TableProperties& table_properties, |
77 | | TableFileCreationReason reason, const Status& s, |
78 | | const std::string& file_checksum, |
79 | 4.48k | const std::string& file_checksum_func_name) { |
80 | 4.48k | if (s.ok() && event_logger) { |
81 | 4.48k | JSONWriter jwriter; |
82 | 4.48k | AppendCurrentTime(&jwriter); |
83 | 4.48k | jwriter << "cf_name" << cf_name << "job" << job_id << "event" |
84 | 4.48k | << "table_file_creation" |
85 | 4.48k | << "file_number" << fd.GetNumber() << "file_size" |
86 | 4.48k | << fd.GetFileSize() << "file_checksum" |
87 | 4.48k | << Slice(file_checksum).ToString(true) << "file_checksum_func_name" |
88 | 4.48k | << file_checksum_func_name << "smallest_seqno" << fd.smallest_seqno |
89 | 4.48k | << "largest_seqno" << fd.largest_seqno; |
90 | | |
91 | | // table_properties |
92 | 4.48k | { |
93 | 4.48k | jwriter << "table_properties"; |
94 | 4.48k | jwriter.StartObject(); |
95 | | |
96 | | // basic properties: |
97 | 4.48k | jwriter << "data_size" << table_properties.data_size << "index_size" |
98 | 4.48k | << table_properties.index_size << "index_partitions" |
99 | 4.48k | << table_properties.index_partitions << "top_level_index_size" |
100 | 4.48k | << table_properties.top_level_index_size |
101 | 4.48k | << "index_key_is_user_key" |
102 | 4.48k | << table_properties.index_key_is_user_key |
103 | 4.48k | << "index_value_is_delta_encoded" |
104 | 4.48k | << table_properties.index_value_is_delta_encoded << "filter_size" |
105 | 4.48k | << table_properties.filter_size << "raw_key_size" |
106 | 4.48k | << table_properties.raw_key_size << "raw_average_key_size" |
107 | 4.48k | << SafeDivide(table_properties.raw_key_size, |
108 | 4.48k | table_properties.num_entries) |
109 | 4.48k | << "raw_value_size" << table_properties.raw_value_size |
110 | 4.48k | << "raw_average_value_size" |
111 | 4.48k | << SafeDivide(table_properties.raw_value_size, |
112 | 4.48k | table_properties.num_entries) |
113 | 4.48k | << "num_data_blocks" << table_properties.num_data_blocks |
114 | 4.48k | << "num_entries" << table_properties.num_entries |
115 | 4.48k | << "num_filter_entries" << table_properties.num_filter_entries |
116 | 4.48k | << "num_deletions" << table_properties.num_deletions |
117 | 4.48k | << "num_merge_operands" << table_properties.num_merge_operands |
118 | 4.48k | << "num_range_deletions" << table_properties.num_range_deletions |
119 | 4.48k | << "format_version" << table_properties.format_version |
120 | 4.48k | << "fixed_key_len" << table_properties.fixed_key_len |
121 | 4.48k | << "filter_policy" << table_properties.filter_policy_name |
122 | 4.48k | << "column_family_name" << table_properties.column_family_name |
123 | 4.48k | << "column_family_id" << table_properties.column_family_id |
124 | 4.48k | << "comparator" << table_properties.comparator_name |
125 | 4.48k | << "user_defined_timestamps_persisted" |
126 | 4.48k | << table_properties.user_defined_timestamps_persisted |
127 | 4.48k | << "merge_operator" << table_properties.merge_operator_name |
128 | 4.48k | << "prefix_extractor_name" |
129 | 4.48k | << table_properties.prefix_extractor_name << "property_collectors" |
130 | 4.48k | << table_properties.property_collectors_names << "compression" |
131 | 4.48k | << table_properties.compression_name << "compression_options" |
132 | 4.48k | << table_properties.compression_options << "creation_time" |
133 | 4.48k | << table_properties.creation_time << "oldest_key_time" |
134 | 4.48k | << table_properties.oldest_key_time << "file_creation_time" |
135 | 4.48k | << table_properties.file_creation_time |
136 | 4.48k | << "slow_compression_estimated_data_size" |
137 | 4.48k | << table_properties.slow_compression_estimated_data_size |
138 | 4.48k | << "fast_compression_estimated_data_size" |
139 | 4.48k | << table_properties.fast_compression_estimated_data_size |
140 | 4.48k | << "db_id" << table_properties.db_id << "db_session_id" |
141 | 4.48k | << table_properties.db_session_id << "orig_file_number" |
142 | 4.48k | << table_properties.orig_file_number << "seqno_to_time_mapping"; |
143 | | |
144 | 4.48k | if (table_properties.seqno_to_time_mapping.empty()) { |
145 | 4.48k | jwriter << "N/A"; |
146 | 4.48k | } else { |
147 | 0 | SeqnoToTimeMapping tmp; |
148 | 0 | Status status = tmp.DecodeFrom(table_properties.seqno_to_time_mapping); |
149 | 0 | if (status.ok()) { |
150 | 0 | jwriter << tmp.ToHumanString(); |
151 | 0 | } else { |
152 | 0 | jwriter << "Invalid"; |
153 | 0 | } |
154 | 0 | } |
155 | | |
156 | | // user collected properties |
157 | 4.48k | for (const auto& prop : table_properties.readable_properties) { |
158 | 0 | jwriter << prop.first << prop.second; |
159 | 0 | } |
160 | 4.48k | jwriter.EndObject(); |
161 | 4.48k | } |
162 | | |
163 | 4.48k | if (oldest_blob_file_number != kInvalidBlobFileNumber) { |
164 | 0 | jwriter << "oldest_blob_file_number" << oldest_blob_file_number; |
165 | 0 | } |
166 | | |
167 | 4.48k | jwriter.EndObject(); |
168 | | |
169 | 4.48k | event_logger->Log(jwriter); |
170 | 4.48k | } |
171 | | |
172 | 4.48k | if (listeners.empty()) { |
173 | 4.48k | return; |
174 | 4.48k | } |
175 | 0 | TableFileCreationInfo info; |
176 | 0 | info.db_name = db_name; |
177 | 0 | info.cf_name = cf_name; |
178 | 0 | info.file_path = file_path; |
179 | 0 | info.file_size = fd.file_size; |
180 | 0 | info.job_id = job_id; |
181 | 0 | info.table_properties = table_properties; |
182 | 0 | info.reason = reason; |
183 | 0 | info.status = s; |
184 | 0 | info.file_checksum = file_checksum; |
185 | 0 | info.file_checksum_func_name = file_checksum_func_name; |
186 | 0 | for (auto& listener : listeners) { |
187 | 0 | listener->OnTableFileCreated(info); |
188 | 0 | } |
189 | 0 | info.status.PermitUncheckedError(); |
190 | 0 | } |
191 | | |
192 | | void EventHelpers::LogAndNotifyTableFileDeletion( |
193 | | EventLogger* event_logger, int job_id, uint64_t file_number, |
194 | | const std::string& file_path, const Status& status, |
195 | | const std::string& dbname, |
196 | 0 | const std::vector<std::shared_ptr<EventListener>>& listeners) { |
197 | 0 | JSONWriter jwriter; |
198 | 0 | AppendCurrentTime(&jwriter); |
199 | |
|
200 | 0 | jwriter << "job" << job_id << "event" |
201 | 0 | << "table_file_deletion" |
202 | 0 | << "file_number" << file_number; |
203 | 0 | if (!status.ok()) { |
204 | 0 | jwriter << "status" << status.ToString(); |
205 | 0 | } |
206 | |
|
207 | 0 | jwriter.EndObject(); |
208 | |
|
209 | 0 | event_logger->Log(jwriter); |
210 | |
|
211 | 0 | if (listeners.empty()) { |
212 | 0 | return; |
213 | 0 | } |
214 | 0 | TableFileDeletionInfo info; |
215 | 0 | info.db_name = dbname; |
216 | 0 | info.job_id = job_id; |
217 | 0 | info.file_path = file_path; |
218 | 0 | info.status = status; |
219 | 0 | for (auto& listener : listeners) { |
220 | 0 | listener->OnTableFileDeleted(info); |
221 | 0 | } |
222 | 0 | info.status.PermitUncheckedError(); |
223 | 0 | } |
224 | | |
225 | | void EventHelpers::NotifyOnErrorRecoveryEnd( |
226 | | const std::vector<std::shared_ptr<EventListener>>& listeners, |
227 | | const Status& old_bg_error, const Status& new_bg_error, |
228 | 0 | InstrumentedMutex* db_mutex) { |
229 | 0 | if (!listeners.empty()) { |
230 | 0 | db_mutex->AssertHeld(); |
231 | | // Make copies before releasing mutex to avoid race. |
232 | 0 | Status old_bg_error_cp = old_bg_error; |
233 | 0 | Status new_bg_error_cp = new_bg_error; |
234 | | // release lock while notifying events |
235 | 0 | db_mutex->Unlock(); |
236 | 0 | TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:1"); |
237 | 0 | TEST_SYNC_POINT("NotifyOnErrorRecoveryEnd:MutexUnlocked:2"); |
238 | 0 | for (auto& listener : listeners) { |
239 | 0 | BackgroundErrorRecoveryInfo info; |
240 | 0 | info.old_bg_error = old_bg_error_cp; |
241 | 0 | info.new_bg_error = new_bg_error_cp; |
242 | 0 | listener->OnErrorRecoveryCompleted(old_bg_error_cp); |
243 | 0 | listener->OnErrorRecoveryEnd(info); |
244 | 0 | info.old_bg_error.PermitUncheckedError(); |
245 | 0 | info.new_bg_error.PermitUncheckedError(); |
246 | 0 | } |
247 | 0 | db_mutex->Lock(); |
248 | 0 | } else { |
249 | 0 | old_bg_error.PermitUncheckedError(); |
250 | 0 | } |
251 | 0 | } |
252 | | |
253 | | void EventHelpers::NotifyBlobFileCreationStarted( |
254 | | const std::vector<std::shared_ptr<EventListener>>& listeners, |
255 | | const std::string& db_name, const std::string& cf_name, |
256 | | const std::string& file_path, int job_id, |
257 | 0 | BlobFileCreationReason creation_reason) { |
258 | 0 | if (listeners.empty()) { |
259 | 0 | return; |
260 | 0 | } |
261 | 0 | BlobFileCreationBriefInfo info(db_name, cf_name, file_path, job_id, |
262 | 0 | creation_reason); |
263 | 0 | for (const auto& listener : listeners) { |
264 | 0 | listener->OnBlobFileCreationStarted(info); |
265 | 0 | } |
266 | 0 | } |
267 | | |
268 | | void EventHelpers::LogAndNotifyBlobFileCreationFinished( |
269 | | EventLogger* event_logger, |
270 | | const std::vector<std::shared_ptr<EventListener>>& listeners, |
271 | | const std::string& db_name, const std::string& cf_name, |
272 | | const std::string& file_path, int job_id, uint64_t file_number, |
273 | | BlobFileCreationReason creation_reason, const Status& s, |
274 | | const std::string& file_checksum, |
275 | | const std::string& file_checksum_func_name, uint64_t total_blob_count, |
276 | 0 | uint64_t total_blob_bytes) { |
277 | 0 | if (s.ok() && event_logger) { |
278 | 0 | JSONWriter jwriter; |
279 | 0 | AppendCurrentTime(&jwriter); |
280 | 0 | jwriter << "cf_name" << cf_name << "job" << job_id << "event" |
281 | 0 | << "blob_file_creation" |
282 | 0 | << "file_number" << file_number << "total_blob_count" |
283 | 0 | << total_blob_count << "total_blob_bytes" << total_blob_bytes |
284 | 0 | << "file_checksum" << file_checksum << "file_checksum_func_name" |
285 | 0 | << file_checksum_func_name << "status" << s.ToString(); |
286 | |
|
287 | 0 | jwriter.EndObject(); |
288 | 0 | event_logger->Log(jwriter); |
289 | 0 | } |
290 | |
|
291 | 0 | if (listeners.empty()) { |
292 | 0 | return; |
293 | 0 | } |
294 | 0 | BlobFileCreationInfo info(db_name, cf_name, file_path, job_id, |
295 | 0 | creation_reason, total_blob_count, total_blob_bytes, |
296 | 0 | s, file_checksum, file_checksum_func_name); |
297 | 0 | for (const auto& listener : listeners) { |
298 | 0 | listener->OnBlobFileCreated(info); |
299 | 0 | } |
300 | 0 | info.status.PermitUncheckedError(); |
301 | 0 | } |
302 | | |
303 | | void EventHelpers::LogAndNotifyBlobFileDeletion( |
304 | | EventLogger* event_logger, |
305 | | const std::vector<std::shared_ptr<EventListener>>& listeners, int job_id, |
306 | | uint64_t file_number, const std::string& file_path, const Status& status, |
307 | 0 | const std::string& dbname) { |
308 | 0 | if (event_logger) { |
309 | 0 | JSONWriter jwriter; |
310 | 0 | AppendCurrentTime(&jwriter); |
311 | |
|
312 | 0 | jwriter << "job" << job_id << "event" |
313 | 0 | << "blob_file_deletion" |
314 | 0 | << "file_number" << file_number; |
315 | 0 | if (!status.ok()) { |
316 | 0 | jwriter << "status" << status.ToString(); |
317 | 0 | } |
318 | |
|
319 | 0 | jwriter.EndObject(); |
320 | 0 | event_logger->Log(jwriter); |
321 | 0 | } |
322 | 0 | if (listeners.empty()) { |
323 | 0 | return; |
324 | 0 | } |
325 | 0 | BlobFileDeletionInfo info(dbname, file_path, job_id, status); |
326 | 0 | for (const auto& listener : listeners) { |
327 | 0 | listener->OnBlobFileDeleted(info); |
328 | 0 | } |
329 | 0 | info.status.PermitUncheckedError(); |
330 | 0 | } |
331 | | |
332 | | } // namespace ROCKSDB_NAMESPACE |