/src/rocksdb/db/compaction/compaction_service_job.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | // |
7 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
8 | | // Use of this source code is governed by a BSD-style license that can be |
9 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
10 | | |
11 | | #include "db/compaction/compaction_job.h" |
12 | | #include "db/compaction/compaction_state.h" |
13 | | #include "logging/logging.h" |
14 | | #include "monitoring/iostats_context_imp.h" |
15 | | #include "monitoring/thread_status_util.h" |
16 | | #include "options/options_helper.h" |
17 | | #include "rocksdb/utilities/options_type.h" |
18 | | |
19 | | namespace ROCKSDB_NAMESPACE { |
20 | | class SubcompactionState; |
21 | | |
22 | | CompactionServiceJobStatus |
23 | | CompactionJob::ProcessKeyValueCompactionWithCompactionService( |
24 | 0 | SubcompactionState* sub_compact) { |
25 | 0 | assert(sub_compact); |
26 | 0 | assert(sub_compact->compaction); |
27 | 0 | assert(db_options_.compaction_service); |
28 | |
|
29 | 0 | const Compaction* compaction = sub_compact->compaction; |
30 | 0 | CompactionServiceInput compaction_input; |
31 | 0 | compaction_input.output_level = compaction->output_level(); |
32 | 0 | compaction_input.db_id = db_id_; |
33 | |
|
34 | 0 | const std::vector<CompactionInputFiles>& inputs = |
35 | 0 | *(compact_->compaction->inputs()); |
36 | 0 | for (const auto& files_per_level : inputs) { |
37 | 0 | for (const auto& file : files_per_level.files) { |
38 | 0 | compaction_input.input_files.emplace_back( |
39 | 0 | MakeTableFileName(file->fd.GetNumber())); |
40 | 0 | } |
41 | 0 | } |
42 | |
|
43 | 0 | compaction_input.cf_name = compaction->column_family_data()->GetName(); |
44 | 0 | compaction_input.snapshots = job_context_->snapshot_seqs; |
45 | 0 | compaction_input.has_begin = sub_compact->start.has_value(); |
46 | 0 | compaction_input.begin = |
47 | 0 | compaction_input.has_begin ? sub_compact->start->ToString() : ""; |
48 | 0 | compaction_input.has_end = sub_compact->end.has_value(); |
49 | 0 | compaction_input.end = |
50 | 0 | compaction_input.has_end ? sub_compact->end->ToString() : ""; |
51 | 0 | compaction_input.options_file_number = options_file_number_; |
52 | |
|
53 | 0 | TEST_SYNC_POINT_CALLBACK( |
54 | 0 | "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService", |
55 | 0 | &compaction_input); |
56 | |
|
57 | 0 | std::string compaction_input_binary; |
58 | 0 | Status s = compaction_input.Write(&compaction_input_binary); |
59 | 0 | if (!s.ok()) { |
60 | 0 | sub_compact->status = s; |
61 | 0 | return CompactionServiceJobStatus::kFailure; |
62 | 0 | } |
63 | | |
64 | 0 | std::ostringstream input_files_oss; |
65 | 0 | bool is_first_one = true; |
66 | 0 | for (const auto& file : compaction_input.input_files) { |
67 | 0 | input_files_oss << (is_first_one ? "" : ", ") << file; |
68 | 0 | is_first_one = false; |
69 | 0 | } |
70 | |
|
71 | 0 | ROCKS_LOG_INFO( |
72 | 0 | db_options_.info_log, |
73 | 0 | "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", |
74 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
75 | 0 | compaction_input.output_level, input_files_oss.str().c_str()); |
76 | 0 | CompactionServiceJobInfo info( |
77 | 0 | dbname_, db_id_, db_session_id_, |
78 | 0 | compaction->column_family_data()->GetID(), |
79 | 0 | compaction->column_family_data()->GetName(), GetCompactionId(sub_compact), |
80 | 0 | thread_pri_, compaction->compaction_reason(), |
81 | 0 | compaction->is_full_compaction(), compaction->is_manual_compaction(), |
82 | 0 | compaction->bottommost_level(), compaction->start_level(), |
83 | 0 | compaction->output_level()); |
84 | |
|
85 | 0 | CompactionServiceScheduleResponse response = |
86 | 0 | db_options_.compaction_service->Schedule(info, compaction_input_binary); |
87 | 0 | switch (response.status) { |
88 | 0 | case CompactionServiceJobStatus::kSuccess: |
89 | 0 | break; |
90 | 0 | case CompactionServiceJobStatus::kAborted: |
91 | 0 | sub_compact->status = |
92 | 0 | Status::Aborted("Scheduling a remote compaction job was aborted"); |
93 | 0 | ROCKS_LOG_WARN( |
94 | 0 | db_options_.info_log, |
95 | 0 | "[%s] [JOB %d] Remote compaction was aborted at Schedule()", |
96 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
97 | 0 | return response.status; |
98 | 0 | case CompactionServiceJobStatus::kFailure: |
99 | 0 | sub_compact->status = Status::Incomplete( |
100 | 0 | "CompactionService failed to schedule a remote compaction job."); |
101 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
102 | 0 | "[%s] [JOB %d] Remote compaction failed to start.", |
103 | 0 | compaction->column_family_data()->GetName().c_str(), |
104 | 0 | job_id_); |
105 | 0 | return response.status; |
106 | 0 | case CompactionServiceJobStatus::kUseLocal: |
107 | 0 | ROCKS_LOG_INFO( |
108 | 0 | db_options_.info_log, |
109 | 0 | "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", |
110 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
111 | 0 | return response.status; |
112 | 0 | default: |
113 | 0 | assert(false); // unknown status |
114 | 0 | break; |
115 | 0 | } |
116 | | |
117 | 0 | std::string debug_str_before_wait = |
118 | 0 | compaction->input_version()->DebugString(/*hex=*/true); |
119 | | |
120 | | // TODO: Update CompactionService API to support abort and resume |
121 | | // functionality. Currently, remote compaction jobs cannot be aborted via |
122 | | // AbortAllCompactions() because the CompactionService interface lacks methods |
123 | | // to signal abort to remote workers and to properly resume after an abort. |
124 | | // The API needs to be extended with: |
125 | | // - A method to signal abort to running remote compaction jobs |
126 | | // - A method to resume/re-enable scheduling after an abort is lifted |
127 | |
|
128 | 0 | ROCKS_LOG_INFO(db_options_.info_log, |
129 | 0 | "[%s] [JOB %d] Waiting for remote compaction...", |
130 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
131 | 0 | std::string compaction_result_binary; |
132 | 0 | CompactionServiceJobStatus compaction_status = |
133 | 0 | db_options_.compaction_service->Wait(response.scheduled_job_id, |
134 | 0 | &compaction_result_binary); |
135 | |
|
136 | 0 | if (compaction_status != CompactionServiceJobStatus::kSuccess) { |
137 | 0 | ROCKS_LOG_ERROR( |
138 | 0 | db_options_.info_log, |
139 | 0 | "[%s] [JOB %d] Wait() status is not kSuccess. " |
140 | 0 | "\nDebugString Before Wait():\n%s" |
141 | 0 | "\nDebugString After Wait():\n%s", |
142 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
143 | 0 | debug_str_before_wait.c_str(), |
144 | 0 | compaction->input_version()->DebugString(/*hex=*/true).c_str()); |
145 | 0 | } |
146 | |
|
147 | 0 | if (compaction_status == CompactionServiceJobStatus::kUseLocal) { |
148 | 0 | ROCKS_LOG_INFO( |
149 | 0 | db_options_.info_log, |
150 | 0 | "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", |
151 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
152 | 0 | return compaction_status; |
153 | 0 | } |
154 | | |
155 | 0 | if (compaction_status == CompactionServiceJobStatus::kAborted) { |
156 | 0 | sub_compact->status = |
157 | 0 | Status::Aborted("Waiting a remote compaction job was aborted"); |
158 | 0 | ROCKS_LOG_INFO(db_options_.info_log, |
159 | 0 | "[%s] [JOB %d] Remote compaction was aborted during Wait()", |
160 | 0 | compaction->column_family_data()->GetName().c_str(), |
161 | 0 | job_id_); |
162 | 0 | return compaction_status; |
163 | 0 | } |
164 | | |
165 | 0 | CompactionServiceResult compaction_result; |
166 | 0 | s = CompactionServiceResult::Read(compaction_result_binary, |
167 | 0 | &compaction_result); |
168 | |
|
169 | 0 | if (compaction_status == CompactionServiceJobStatus::kFailure) { |
170 | 0 | if (s.ok()) { |
171 | 0 | if (compaction_result.status.ok()) { |
172 | 0 | sub_compact->status = Status::Incomplete( |
173 | 0 | "CompactionService failed to run the compaction job (even though " |
174 | 0 | "the internal status is okay)."); |
175 | 0 | } else { |
176 | | // set the current sub compaction status with the status returned from |
177 | | // remote |
178 | 0 | sub_compact->status = compaction_result.status; |
179 | 0 | } |
180 | 0 | } else { |
181 | 0 | sub_compact->status = Status::Incomplete( |
182 | 0 | "CompactionService failed to run the compaction job (and no valid " |
183 | 0 | "result is returned)."); |
184 | 0 | compaction_result.status.PermitUncheckedError(); |
185 | 0 | } |
186 | 0 | ROCKS_LOG_WARN( |
187 | 0 | db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.", |
188 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
189 | 0 | return compaction_status; |
190 | 0 | } |
191 | | |
192 | | // CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to |
193 | | // read the result. Consider this as an installation failure |
194 | 0 | if (!s.ok()) { |
195 | 0 | sub_compact->status = s; |
196 | 0 | compaction_result.status.PermitUncheckedError(); |
197 | 0 | db_options_.compaction_service->OnInstallation( |
198 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kFailure); |
199 | 0 | return CompactionServiceJobStatus::kFailure; |
200 | 0 | } |
201 | 0 | sub_compact->status = compaction_result.status; |
202 | |
|
203 | 0 | std::ostringstream output_files_oss; |
204 | 0 | is_first_one = true; |
205 | 0 | for (const auto& file : compaction_result.output_files) { |
206 | 0 | output_files_oss << (is_first_one ? "" : ", ") << file.file_name; |
207 | 0 | is_first_one = false; |
208 | 0 | } |
209 | |
|
210 | 0 | ROCKS_LOG_INFO( |
211 | 0 | db_options_.info_log, |
212 | 0 | "[%s] [JOB %d] Received remote compaction result, output path: " |
213 | 0 | "%s, files: %s", |
214 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
215 | 0 | compaction_result.output_path.c_str(), output_files_oss.str().c_str()); |
216 | | |
217 | | // Installation Starts |
218 | 0 | for (const auto& file : compaction_result.output_files) { |
219 | 0 | uint64_t file_num = versions_->NewFileNumber(); |
220 | 0 | auto src_file = compaction_result.output_path + "/" + file.file_name; |
221 | 0 | auto tgt_file = TableFileName(compaction->immutable_options().cf_paths, |
222 | 0 | file_num, compaction->output_path_id()); |
223 | 0 | s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr); |
224 | 0 | if (!s.ok()) { |
225 | 0 | sub_compact->status = s; |
226 | 0 | db_options_.compaction_service->OnInstallation( |
227 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kFailure); |
228 | 0 | return CompactionServiceJobStatus::kFailure; |
229 | 0 | } |
230 | | |
231 | 0 | FileMetaData meta; |
232 | 0 | uint64_t file_size = file.file_size; |
233 | | |
234 | | // TODO - Clean this up in the next release. |
235 | | // For backward compatibility - in case the remote worker does not populate |
236 | | // the file_size yet. If missing, continue to populate this from the file |
237 | | // system. |
238 | 0 | if (file_size == 0) { |
239 | 0 | s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr); |
240 | 0 | } |
241 | |
|
242 | 0 | if (!s.ok()) { |
243 | 0 | sub_compact->status = s; |
244 | 0 | db_options_.compaction_service->OnInstallation( |
245 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kFailure); |
246 | 0 | return CompactionServiceJobStatus::kFailure; |
247 | 0 | } |
248 | 0 | assert(file_size > 0); |
249 | |
|
250 | 0 | meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size, |
251 | 0 | file.smallest_seqno, file.largest_seqno); |
252 | 0 | meta.smallest.DecodeFrom(file.smallest_internal_key); |
253 | 0 | meta.largest.DecodeFrom(file.largest_internal_key); |
254 | 0 | meta.oldest_ancester_time = file.oldest_ancester_time; |
255 | 0 | meta.file_creation_time = file.file_creation_time; |
256 | 0 | meta.epoch_number = file.epoch_number; |
257 | 0 | meta.file_checksum = file.file_checksum; |
258 | 0 | meta.file_checksum_func_name = file.file_checksum_func_name; |
259 | 0 | meta.marked_for_compaction = file.marked_for_compaction; |
260 | 0 | meta.unique_id = file.unique_id; |
261 | 0 | meta.temperature = file.file_temperature; |
262 | 0 | meta.tail_size = |
263 | 0 | FileMetaData::CalculateTailSize(file_size, file.table_properties); |
264 | 0 | auto cfd = compaction->column_family_data(); |
265 | 0 | CompactionOutputs* compaction_outputs = |
266 | 0 | sub_compact->Outputs(file.is_proximal_level_output); |
267 | 0 | assert(compaction_outputs); |
268 | 0 | compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(), |
269 | 0 | false, true, file.paranoid_hash); |
270 | 0 | compaction_outputs->UpdateTableProperties(file.table_properties); |
271 | 0 | } |
272 | | |
273 | | // Set per-level stats |
274 | 0 | auto compaction_output_stats = |
275 | 0 | sub_compact->OutputStats(false /* is_proximal_level */); |
276 | 0 | assert(compaction_output_stats); |
277 | 0 | compaction_output_stats->Add( |
278 | 0 | compaction_result.internal_stats.output_level_stats); |
279 | 0 | if (compaction->SupportsPerKeyPlacement()) { |
280 | 0 | compaction_output_stats = |
281 | 0 | sub_compact->OutputStats(true /* is_proximal_level */); |
282 | 0 | assert(compaction_output_stats); |
283 | 0 | compaction_output_stats->Add( |
284 | 0 | compaction_result.internal_stats.proximal_level_stats); |
285 | 0 | } |
286 | | |
287 | | // Set job stats |
288 | 0 | sub_compact->compaction_job_stats = compaction_result.stats; |
289 | |
|
290 | 0 | RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read); |
291 | 0 | RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES, |
292 | 0 | compaction_result.bytes_written); |
293 | 0 | db_options_.compaction_service->OnInstallation( |
294 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kSuccess); |
295 | 0 | return CompactionServiceJobStatus::kSuccess; |
296 | 0 | } |
297 | | |
298 | | std::string CompactionServiceCompactionJob::GetTableFileName( |
299 | 0 | uint64_t file_number) { |
300 | 0 | return MakeTableFileName(output_path_, file_number); |
301 | 0 | } |
302 | | |
303 | 0 | void CompactionServiceCompactionJob::RecordCompactionIOStats() { |
304 | 0 | compaction_result_->bytes_read += IOSTATS(bytes_read); |
305 | 0 | compaction_result_->bytes_written += IOSTATS(bytes_written); |
306 | 0 | CompactionJob::RecordCompactionIOStats(); |
307 | 0 | } |
308 | | |
309 | | CompactionServiceCompactionJob::CompactionServiceCompactionJob( |
310 | | int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, |
311 | | const MutableDBOptions& mutable_db_options, const FileOptions& file_options, |
312 | | VersionSet* versions, const std::atomic<bool>* shutting_down, |
313 | | LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats, |
314 | | InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, |
315 | | JobContext* job_context, std::shared_ptr<Cache> table_cache, |
316 | | EventLogger* event_logger, const std::string& dbname, |
317 | | const std::shared_ptr<IOTracer>& io_tracer, |
318 | | const std::atomic<bool>& manual_compaction_canceled, |
319 | | const std::string& db_id, const std::string& db_session_id, |
320 | | std::string output_path, |
321 | | const CompactionServiceInput& compaction_service_input, |
322 | | CompactionServiceResult* compaction_service_result) |
323 | 0 | : CompactionJob( |
324 | 0 | job_id, compaction, db_options, mutable_db_options, file_options, |
325 | 0 | versions, shutting_down, log_buffer, nullptr, output_directory, |
326 | 0 | nullptr, stats, db_mutex, db_error_handler, job_context, |
327 | 0 | std::move(table_cache), event_logger, |
328 | 0 | compaction->mutable_cf_options().paranoid_file_checks, |
329 | 0 | compaction->mutable_cf_options().report_bg_io_stats, dbname, |
330 | 0 | &(compaction_service_result->stats), Env::Priority::USER, io_tracer, |
331 | 0 | manual_compaction_canceled, CompactionJob::kCompactionAbortedFalse, |
332 | 0 | db_id, db_session_id, |
333 | 0 | compaction->column_family_data()->GetFullHistoryTsLow()), |
334 | 0 | output_path_(std::move(output_path)), |
335 | 0 | compaction_input_(compaction_service_input), |
336 | 0 | compaction_result_(compaction_service_result) {} |
337 | | |
338 | | void CompactionServiceCompactionJob::Prepare( |
339 | | const CompactionProgress& compaction_progress, |
340 | 0 | log::Writer* compaction_progress_writer) { |
341 | 0 | std::optional<Slice> begin; |
342 | 0 | if (compaction_input_.has_begin) { |
343 | 0 | begin = compaction_input_.begin; |
344 | 0 | } |
345 | 0 | std::optional<Slice> end; |
346 | 0 | if (compaction_input_.has_end) { |
347 | 0 | end = compaction_input_.end; |
348 | 0 | } |
349 | 0 | CompactionJob::Prepare(std::make_pair(begin, end), compaction_progress, |
350 | 0 | compaction_progress_writer); |
351 | 0 | } |
352 | | |
353 | 0 | Status CompactionServiceCompactionJob::Run() { |
354 | 0 | AutoThreadOperationStageUpdater stage_updater( |
355 | 0 | ThreadStatus::STAGE_COMPACTION_RUN); |
356 | |
|
357 | 0 | auto* c = compact_->compaction; |
358 | |
|
359 | 0 | log_buffer_->FlushBufferToLog(); |
360 | 0 | LogCompaction(); |
361 | |
|
362 | 0 | compaction_result_->stats.Reset(); |
363 | |
|
364 | 0 | const uint64_t start_micros = db_options_.clock->NowMicros(); |
365 | 0 | c->GetOrInitInputTableProperties(); |
366 | | |
367 | | // Pick the only sub-compaction we should have |
368 | 0 | assert(compact_->sub_compact_states.size() == 1); |
369 | 0 | SubcompactionState* sub_compact = compact_->sub_compact_states.data(); |
370 | |
|
371 | 0 | ProcessKeyValueCompaction(sub_compact); |
372 | |
|
373 | 0 | uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros; |
374 | 0 | internal_stats_.SetMicros(elapsed_micros); |
375 | 0 | internal_stats_.AddCpuMicros(elapsed_micros); |
376 | |
|
377 | 0 | RecordTimeToHistogram(stats_, COMPACTION_TIME, |
378 | 0 | internal_stats_.output_level_stats.micros); |
379 | 0 | RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME, |
380 | 0 | internal_stats_.output_level_stats.cpu_micros); |
381 | |
|
382 | 0 | Status status = sub_compact->status; |
383 | 0 | IOStatus io_s = sub_compact->io_status; |
384 | |
|
385 | 0 | if (io_status_.ok()) { |
386 | 0 | io_status_ = io_s; |
387 | 0 | } |
388 | |
|
389 | 0 | if (status.ok()) { |
390 | 0 | constexpr IODebugContext* dbg = nullptr; |
391 | |
|
392 | 0 | if (output_directory_) { |
393 | 0 | io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg, |
394 | 0 | DirFsyncOptions()); |
395 | 0 | } |
396 | 0 | } |
397 | 0 | if (io_status_.ok()) { |
398 | 0 | io_status_ = io_s; |
399 | 0 | } |
400 | 0 | if (status.ok()) { |
401 | 0 | status = io_s; |
402 | 0 | } |
403 | |
|
404 | 0 | LogFlush(db_options_.info_log); |
405 | 0 | compact_->status = status; |
406 | 0 | compact_->status.PermitUncheckedError(); |
407 | | |
408 | | // Build Compaction Job Stats |
409 | | |
410 | | // 1. Aggregate internal stats and job stats for all subcompactions |
411 | | // internal stats: sub_compact.proximal_level_outputs_.stats and |
412 | | // sub_compact.compaction_outputs_.stats into |
413 | | // internal_stats_.output_level_stats and |
414 | | // internal_stats_.proximal_level_stats |
415 | | // job-level stats: sub_compact.compaction_job_stats into compact.job_stats_ |
416 | | // |
417 | | // For remote compaction, there's only one subcompaction. |
418 | 0 | compact_->AggregateCompactionStats(internal_stats_, *job_stats_); |
419 | | |
420 | | // 2. Update job-level output stats with the aggregated internal_stats_ |
421 | | // Please note that input stats will be updated by primary host when all |
422 | | // subcompactions are finished |
423 | 0 | UpdateCompactionJobOutputStatsFromInternalStats(status, internal_stats_); |
424 | | // and set fields that are not propagated as part of the update |
425 | 0 | compaction_result_->stats.is_manual_compaction = c->is_manual_compaction(); |
426 | 0 | compaction_result_->stats.is_full_compaction = c->is_full_compaction(); |
427 | 0 | compaction_result_->stats.is_remote_compaction = true; |
428 | | |
429 | | // 3. Update IO Stats that are not part of the the update above |
430 | | // (bytes_read, bytes_written) |
431 | 0 | RecordCompactionIOStats(); |
432 | | |
433 | | // Build Output |
434 | 0 | compaction_result_->internal_stats = internal_stats_; |
435 | 0 | compaction_result_->output_level = compact_->compaction->output_level(); |
436 | 0 | compaction_result_->output_path = output_path_; |
437 | 0 | if (status.ok()) { |
438 | 0 | for (const auto& output_file : sub_compact->GetOutputs()) { |
439 | 0 | auto& meta = output_file.meta; |
440 | 0 | compaction_result_->output_files.emplace_back( |
441 | 0 | MakeTableFileName(meta.fd.GetNumber()), meta.fd.GetFileSize(), |
442 | 0 | meta.fd.smallest_seqno, meta.fd.largest_seqno, |
443 | 0 | meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(), |
444 | 0 | meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number, |
445 | 0 | meta.file_checksum, meta.file_checksum_func_name, |
446 | 0 | output_file.validator.GetHash(), meta.marked_for_compaction, |
447 | 0 | meta.unique_id, *output_file.table_properties, |
448 | 0 | output_file.is_proximal_level, meta.temperature); |
449 | 0 | } |
450 | 0 | } |
451 | |
|
452 | 0 | TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0", |
453 | 0 | &compaction_result_); |
454 | 0 | return status; |
455 | 0 | } |
456 | | |
457 | 0 | void CompactionServiceCompactionJob::CleanupCompaction() { |
458 | 0 | CompactionJob::CleanupCompaction(); |
459 | 0 | } |
460 | | |
461 | | // Internal binary format for the input and result data |
462 | | enum BinaryFormatVersion : uint32_t { |
463 | | kOptionsString = 1, // Use string format similar to Option string format |
464 | | }; |
465 | | |
466 | | static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = { |
467 | | {"name", |
468 | | {offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString, |
469 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
470 | | {"options", |
471 | | {offsetof(struct ColumnFamilyDescriptor, options), |
472 | | OptionType::kConfigurable, OptionVerificationType::kNormal, |
473 | | OptionTypeFlags::kNone, |
474 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
475 | 0 | const std::string& value, void* addr) { |
476 | 0 | auto cf_options = static_cast<ColumnFamilyOptions*>(addr); |
477 | 0 | return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(), |
478 | 0 | value, cf_options); |
479 | 0 | }, |
480 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
481 | 0 | const void* addr, std::string* value) { |
482 | 0 | const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr); |
483 | 0 | std::string result; |
484 | 0 | auto status = |
485 | 0 | GetStringFromColumnFamilyOptions(opts, *cf_options, &result); |
486 | 0 | *value = "{" + result + "}"; |
487 | 0 | return status; |
488 | 0 | }, |
489 | | [](const ConfigOptions& opts, const std::string& name, const void* addr1, |
490 | 0 | const void* addr2, std::string* mismatch) { |
491 | 0 | const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1); |
492 | 0 | const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2); |
493 | 0 | auto this_conf = CFOptionsAsConfigurable(*this_one); |
494 | 0 | auto that_conf = CFOptionsAsConfigurable(*that_one); |
495 | 0 | std::string mismatch_opt; |
496 | 0 | bool result = |
497 | 0 | this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt); |
498 | 0 | if (!result) { |
499 | 0 | *mismatch = name + "." + mismatch_opt; |
500 | 0 | } |
501 | 0 | return result; |
502 | 0 | }}}, |
503 | | }; |
504 | | |
505 | | static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = { |
506 | | {"cf_name", |
507 | | {offsetof(struct CompactionServiceInput, cf_name), |
508 | | OptionType::kEncodedString}}, |
509 | | {"snapshots", OptionTypeInfo::Vector<uint64_t>( |
510 | | offsetof(struct CompactionServiceInput, snapshots), |
511 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
512 | | {0, OptionType::kUInt64T})}, |
513 | | {"input_files", OptionTypeInfo::Vector<std::string>( |
514 | | offsetof(struct CompactionServiceInput, input_files), |
515 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
516 | | {0, OptionType::kEncodedString})}, |
517 | | {"output_level", |
518 | | {offsetof(struct CompactionServiceInput, output_level), OptionType::kInt, |
519 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
520 | | {"db_id", |
521 | | {offsetof(struct CompactionServiceInput, db_id), |
522 | | OptionType::kEncodedString}}, |
523 | | {"has_begin", |
524 | | {offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean, |
525 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
526 | | {"begin", |
527 | | {offsetof(struct CompactionServiceInput, begin), |
528 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
529 | | OptionTypeFlags::kNone}}, |
530 | | {"has_end", |
531 | | {offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean, |
532 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
533 | | {"end", |
534 | | {offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString, |
535 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
536 | | {"options_file_number", |
537 | | {offsetof(struct CompactionServiceInput, options_file_number), |
538 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
539 | | OptionTypeFlags::kNone}}, |
540 | | }; |
541 | | |
542 | | static std::unordered_map<std::string, OptionTypeInfo> |
543 | | cs_output_file_type_info = { |
544 | | {"file_name", |
545 | | {offsetof(struct CompactionServiceOutputFile, file_name), |
546 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
547 | | OptionTypeFlags::kNone}}, |
548 | | {"file_size", |
549 | | {offsetof(struct CompactionServiceOutputFile, file_size), |
550 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
551 | | OptionTypeFlags::kNone}}, |
552 | | {"smallest_seqno", |
553 | | {offsetof(struct CompactionServiceOutputFile, smallest_seqno), |
554 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
555 | | OptionTypeFlags::kNone}}, |
556 | | {"largest_seqno", |
557 | | {offsetof(struct CompactionServiceOutputFile, largest_seqno), |
558 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
559 | | OptionTypeFlags::kNone}}, |
560 | | {"smallest_internal_key", |
561 | | {offsetof(struct CompactionServiceOutputFile, smallest_internal_key), |
562 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
563 | | OptionTypeFlags::kNone}}, |
564 | | {"largest_internal_key", |
565 | | {offsetof(struct CompactionServiceOutputFile, largest_internal_key), |
566 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
567 | | OptionTypeFlags::kNone}}, |
568 | | {"oldest_ancester_time", |
569 | | {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time), |
570 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
571 | | OptionTypeFlags::kNone}}, |
572 | | {"file_creation_time", |
573 | | {offsetof(struct CompactionServiceOutputFile, file_creation_time), |
574 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
575 | | OptionTypeFlags::kNone}}, |
576 | | {"epoch_number", |
577 | | {offsetof(struct CompactionServiceOutputFile, epoch_number), |
578 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
579 | | OptionTypeFlags::kNone}}, |
580 | | {"file_checksum", |
581 | | {offsetof(struct CompactionServiceOutputFile, file_checksum), |
582 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
583 | | OptionTypeFlags::kNone}}, |
584 | | {"file_checksum_func_name", |
585 | | {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name), |
586 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
587 | | OptionTypeFlags::kNone}}, |
588 | | {"paranoid_hash", |
589 | | {offsetof(struct CompactionServiceOutputFile, paranoid_hash), |
590 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
591 | | OptionTypeFlags::kNone}}, |
592 | | {"marked_for_compaction", |
593 | | {offsetof(struct CompactionServiceOutputFile, marked_for_compaction), |
594 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
595 | | OptionTypeFlags::kNone}}, |
596 | | {"unique_id", |
597 | | OptionTypeInfo::Array<uint64_t, 2>( |
598 | | offsetof(struct CompactionServiceOutputFile, unique_id), |
599 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
600 | | {0, OptionType::kUInt64T})}, |
601 | | {"table_properties", |
602 | | {offsetof(struct CompactionServiceOutputFile, table_properties), |
603 | | OptionType::kStruct, OptionVerificationType::kNormal, |
604 | | OptionTypeFlags::kNone, |
605 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
606 | 0 | const std::string& value, void* addr) { |
607 | 0 | auto table_properties = static_cast<TableProperties*>(addr); |
608 | 0 | return TableProperties::Parse(opts, value, table_properties); |
609 | 0 | }, |
610 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
611 | 0 | const void* addr, std::string* value) { |
612 | 0 | const auto table_properties = |
613 | 0 | static_cast<const TableProperties*>(addr); |
614 | 0 | std::string result; |
615 | 0 | auto status = table_properties->Serialize(opts, &result); |
616 | 0 | *value = "{" + result + "}"; |
617 | 0 | return status; |
618 | 0 | }, |
619 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
620 | 0 | const void* addr1, const void* addr2, std::string* mismatch) { |
621 | 0 | const auto this_one = static_cast<const TableProperties*>(addr1); |
622 | 0 | const auto that_one = static_cast<const TableProperties*>(addr2); |
623 | 0 | return this_one->AreEqual(opts, that_one, mismatch); |
624 | 0 | }}}, |
625 | | {"is_proximal_level_output", |
626 | | {offsetof(struct CompactionServiceOutputFile, |
627 | | is_proximal_level_output), |
628 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
629 | | OptionTypeFlags::kNone}}, |
630 | | {"file_temperature", |
631 | | {offsetof(struct CompactionServiceOutputFile, file_temperature), |
632 | | OptionType::kTemperature, OptionVerificationType::kNormal, |
633 | | OptionTypeFlags::kNone}}}; |
634 | | |
635 | | static std::unordered_map<std::string, OptionTypeInfo> |
636 | | compaction_job_stats_type_info = { |
637 | | {"elapsed_micros", |
638 | | {offsetof(struct CompactionJobStats, elapsed_micros), |
639 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
640 | | OptionTypeFlags::kNone}}, |
641 | | {"cpu_micros", |
642 | | {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T, |
643 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
644 | | {"num_input_records", |
645 | | {offsetof(struct CompactionJobStats, num_input_records), |
646 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
647 | | OptionTypeFlags::kNone}}, |
648 | | {"num_blobs_read", |
649 | | {offsetof(struct CompactionJobStats, num_blobs_read), |
650 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
651 | | OptionTypeFlags::kNone}}, |
652 | | {"num_input_files", |
653 | | {offsetof(struct CompactionJobStats, num_input_files), |
654 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
655 | | OptionTypeFlags::kNone}}, |
656 | | {"num_input_files_at_output_level", |
657 | | {offsetof(struct CompactionJobStats, num_input_files_at_output_level), |
658 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
659 | | OptionTypeFlags::kNone}}, |
660 | | {"num_output_records", |
661 | | {offsetof(struct CompactionJobStats, num_output_records), |
662 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
663 | | OptionTypeFlags::kNone}}, |
664 | | {"num_output_files", |
665 | | {offsetof(struct CompactionJobStats, num_output_files), |
666 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
667 | | OptionTypeFlags::kNone}}, |
668 | | {"num_output_files_blob", |
669 | | {offsetof(struct CompactionJobStats, num_output_files_blob), |
670 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
671 | | OptionTypeFlags::kNone}}, |
672 | | {"is_full_compaction", |
673 | | {offsetof(struct CompactionJobStats, is_full_compaction), |
674 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
675 | | OptionTypeFlags::kNone}}, |
676 | | {"is_manual_compaction", |
677 | | {offsetof(struct CompactionJobStats, is_manual_compaction), |
678 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
679 | | OptionTypeFlags::kNone}}, |
680 | | {"is_remote_compaction", |
681 | | {offsetof(struct CompactionJobStats, is_remote_compaction), |
682 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
683 | | OptionTypeFlags::kNone}}, |
684 | | {"total_input_bytes", |
685 | | {offsetof(struct CompactionJobStats, total_input_bytes), |
686 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
687 | | OptionTypeFlags::kNone}}, |
688 | | {"total_blob_bytes_read", |
689 | | {offsetof(struct CompactionJobStats, total_blob_bytes_read), |
690 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
691 | | OptionTypeFlags::kNone}}, |
692 | | {"total_output_bytes", |
693 | | {offsetof(struct CompactionJobStats, total_output_bytes), |
694 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
695 | | OptionTypeFlags::kNone}}, |
696 | | {"total_output_bytes_blob", |
697 | | {offsetof(struct CompactionJobStats, total_output_bytes_blob), |
698 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
699 | | OptionTypeFlags::kNone}}, |
700 | | {"num_records_replaced", |
701 | | {offsetof(struct CompactionJobStats, num_records_replaced), |
702 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
703 | | OptionTypeFlags::kNone}}, |
704 | | {"total_input_raw_key_bytes", |
705 | | {offsetof(struct CompactionJobStats, total_input_raw_key_bytes), |
706 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
707 | | OptionTypeFlags::kNone}}, |
708 | | {"total_input_raw_value_bytes", |
709 | | {offsetof(struct CompactionJobStats, total_input_raw_value_bytes), |
710 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
711 | | OptionTypeFlags::kNone}}, |
712 | | {"num_input_deletion_records", |
713 | | {offsetof(struct CompactionJobStats, num_input_deletion_records), |
714 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
715 | | OptionTypeFlags::kNone}}, |
716 | | {"num_expired_deletion_records", |
717 | | {offsetof(struct CompactionJobStats, num_expired_deletion_records), |
718 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
719 | | OptionTypeFlags::kNone}}, |
720 | | {"num_corrupt_keys", |
721 | | {offsetof(struct CompactionJobStats, num_corrupt_keys), |
722 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
723 | | OptionTypeFlags::kNone}}, |
724 | | {"file_write_nanos", |
725 | | {offsetof(struct CompactionJobStats, file_write_nanos), |
726 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
727 | | OptionTypeFlags::kNone}}, |
728 | | {"file_range_sync_nanos", |
729 | | {offsetof(struct CompactionJobStats, file_range_sync_nanos), |
730 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
731 | | OptionTypeFlags::kNone}}, |
732 | | {"file_fsync_nanos", |
733 | | {offsetof(struct CompactionJobStats, file_fsync_nanos), |
734 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
735 | | OptionTypeFlags::kNone}}, |
736 | | {"file_prepare_write_nanos", |
737 | | {offsetof(struct CompactionJobStats, file_prepare_write_nanos), |
738 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
739 | | OptionTypeFlags::kNone}}, |
740 | | {"smallest_output_key_prefix", |
741 | | {offsetof(struct CompactionJobStats, smallest_output_key_prefix), |
742 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
743 | | OptionTypeFlags::kNone}}, |
744 | | {"largest_output_key_prefix", |
745 | | {offsetof(struct CompactionJobStats, largest_output_key_prefix), |
746 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
747 | | OptionTypeFlags::kNone}}, |
748 | | {"num_single_del_fallthru", |
749 | | {offsetof(struct CompactionJobStats, num_single_del_fallthru), |
750 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
751 | | OptionTypeFlags::kNone}}, |
752 | | {"num_single_del_mismatch", |
753 | | {offsetof(struct CompactionJobStats, num_single_del_mismatch), |
754 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
755 | | OptionTypeFlags::kNone}}, |
756 | | }; |
757 | | |
758 | | static std::unordered_map<std::string, OptionTypeInfo> |
759 | | compaction_stats_type_info = { |
760 | | {"micros", |
761 | | {offsetof(struct InternalStats::CompactionStats, micros), |
762 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
763 | | OptionTypeFlags::kNone}}, |
764 | | {"cpu_micros", |
765 | | {offsetof(struct InternalStats::CompactionStats, cpu_micros), |
766 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
767 | | OptionTypeFlags::kNone}}, |
768 | | {"bytes_read_non_output_levels", |
769 | | {offsetof(struct InternalStats::CompactionStats, |
770 | | bytes_read_non_output_levels), |
771 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
772 | | OptionTypeFlags::kNone}}, |
773 | | {"bytes_read_output_level", |
774 | | {offsetof(struct InternalStats::CompactionStats, |
775 | | bytes_read_output_level), |
776 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
777 | | OptionTypeFlags::kNone}}, |
778 | | {"bytes_skipped_non_output_levels", |
779 | | {offsetof(struct InternalStats::CompactionStats, |
780 | | bytes_skipped_non_output_levels), |
781 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
782 | | OptionTypeFlags::kNone}}, |
783 | | {"bytes_skipped_output_level", |
784 | | {offsetof(struct InternalStats::CompactionStats, |
785 | | bytes_skipped_output_level), |
786 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
787 | | OptionTypeFlags::kNone}}, |
788 | | {"bytes_read_blob", |
789 | | {offsetof(struct InternalStats::CompactionStats, bytes_read_blob), |
790 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
791 | | OptionTypeFlags::kNone}}, |
792 | | {"bytes_written", |
793 | | {offsetof(struct InternalStats::CompactionStats, bytes_written), |
794 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
795 | | OptionTypeFlags::kNone}}, |
796 | | {"bytes_written_blob", |
797 | | {offsetof(struct InternalStats::CompactionStats, bytes_written_blob), |
798 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
799 | | OptionTypeFlags::kNone}}, |
800 | | {"bytes_moved", |
801 | | {offsetof(struct InternalStats::CompactionStats, bytes_moved), |
802 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
803 | | OptionTypeFlags::kNone}}, |
804 | | {"num_input_files_in_non_output_levels", |
805 | | {offsetof(struct InternalStats::CompactionStats, |
806 | | num_input_files_in_non_output_levels), |
807 | | OptionType::kInt, OptionVerificationType::kNormal, |
808 | | OptionTypeFlags::kNone}}, |
809 | | {"num_input_files_in_output_level", |
810 | | {offsetof(struct InternalStats::CompactionStats, |
811 | | num_input_files_in_output_level), |
812 | | OptionType::kInt, OptionVerificationType::kNormal, |
813 | | OptionTypeFlags::kNone}}, |
814 | | {"num_filtered_input_files_in_non_output_levels", |
815 | | {offsetof(struct InternalStats::CompactionStats, |
816 | | num_filtered_input_files_in_non_output_levels), |
817 | | OptionType::kInt, OptionVerificationType::kNormal, |
818 | | OptionTypeFlags::kNone}}, |
819 | | {"num_filtered_input_files_in_output_level", |
820 | | {offsetof(struct InternalStats::CompactionStats, |
821 | | num_filtered_input_files_in_output_level), |
822 | | OptionType::kInt, OptionVerificationType::kNormal, |
823 | | OptionTypeFlags::kNone}}, |
824 | | {"num_output_files", |
825 | | {offsetof(struct InternalStats::CompactionStats, num_output_files), |
826 | | OptionType::kInt, OptionVerificationType::kNormal, |
827 | | OptionTypeFlags::kNone}}, |
828 | | {"num_output_files_blob", |
829 | | {offsetof(struct InternalStats::CompactionStats, |
830 | | num_output_files_blob), |
831 | | OptionType::kInt, OptionVerificationType::kNormal, |
832 | | OptionTypeFlags::kNone}}, |
833 | | {"num_input_records", |
834 | | {offsetof(struct InternalStats::CompactionStats, num_input_records), |
835 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
836 | | OptionTypeFlags::kNone}}, |
837 | | {"num_dropped_records", |
838 | | {offsetof(struct InternalStats::CompactionStats, num_dropped_records), |
839 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
840 | | OptionTypeFlags::kNone}}, |
841 | | {"num_output_records", |
842 | | {offsetof(struct InternalStats::CompactionStats, num_output_records), |
843 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
844 | | OptionTypeFlags::kNone}}, |
845 | | {"count", |
846 | | {offsetof(struct InternalStats::CompactionStats, count), |
847 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
848 | | OptionTypeFlags::kNone}}, |
849 | | {"counts", OptionTypeInfo::Array< |
850 | | int, static_cast<int>(CompactionReason::kNumOfReasons)>( |
851 | | offsetof(struct InternalStats::CompactionStats, counts), |
852 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
853 | | {0, OptionType::kInt})}, |
854 | | }; |
855 | | |
856 | | static std::unordered_map<std::string, OptionTypeInfo> |
857 | | compaction_internal_stats_type_info = { |
858 | | {"output_level_stats", |
859 | | OptionTypeInfo::Struct( |
860 | | "output_level_stats", &compaction_stats_type_info, |
861 | | offsetof(struct InternalStats::CompactionStatsFull, |
862 | | output_level_stats), |
863 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
864 | | {"has_proximal_level_output", |
865 | | {offsetof(struct InternalStats::CompactionStatsFull, |
866 | | has_proximal_level_output), |
867 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
868 | | OptionTypeFlags::kNone}}, |
869 | | {"proximal_level_stats", |
870 | | OptionTypeInfo::Struct( |
871 | | "proximal_level_stats", &compaction_stats_type_info, |
872 | | offsetof(struct InternalStats::CompactionStatsFull, |
873 | | proximal_level_stats), |
874 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
875 | | }; |
876 | | |
877 | | namespace { |
878 | | // this is a helper struct to serialize and deserialize class Status, because |
879 | | // Status's members are not public. |
880 | | struct StatusSerializationAdapter { |
881 | | uint8_t code; |
882 | | uint8_t subcode; |
883 | | uint8_t severity; |
884 | | std::string message; |
885 | | |
886 | 0 | StatusSerializationAdapter() = default; |
887 | 0 | explicit StatusSerializationAdapter(const Status& s) { |
888 | 0 | code = s.code(); |
889 | 0 | subcode = s.subcode(); |
890 | 0 | severity = s.severity(); |
891 | 0 | auto msg = s.getState(); |
892 | 0 | message = msg ? msg : ""; |
893 | 0 | } |
894 | | |
895 | 0 | Status GetStatus() const { |
896 | 0 | return Status{static_cast<Status::Code>(code), |
897 | 0 | static_cast<Status::SubCode>(subcode), |
898 | 0 | static_cast<Status::Severity>(severity), message}; |
899 | 0 | } |
900 | | }; |
901 | | } // namespace |
902 | | |
903 | | static std::unordered_map<std::string, OptionTypeInfo> |
904 | | status_adapter_type_info = { |
905 | | {"code", |
906 | | {offsetof(struct StatusSerializationAdapter, code), |
907 | | OptionType::kUInt8T, OptionVerificationType::kNormal, |
908 | | OptionTypeFlags::kNone}}, |
909 | | {"subcode", |
910 | | {offsetof(struct StatusSerializationAdapter, subcode), |
911 | | OptionType::kUInt8T, OptionVerificationType::kNormal, |
912 | | OptionTypeFlags::kNone}}, |
913 | | {"severity", |
914 | | {offsetof(struct StatusSerializationAdapter, severity), |
915 | | OptionType::kUInt8T, OptionVerificationType::kNormal, |
916 | | OptionTypeFlags::kNone}}, |
917 | | {"message", |
918 | | {offsetof(struct StatusSerializationAdapter, message), |
919 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
920 | | OptionTypeFlags::kNone}}, |
921 | | }; |
922 | | |
923 | | static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = { |
924 | | {"status", |
925 | | {offsetof(struct CompactionServiceResult, status), |
926 | | OptionType::kCustomizable, OptionVerificationType::kNormal, |
927 | | OptionTypeFlags::kNone, |
928 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
929 | 0 | const std::string& value, void* addr) { |
930 | 0 | auto status_obj = static_cast<Status*>(addr); |
931 | 0 | StatusSerializationAdapter adapter; |
932 | 0 | Status s = OptionTypeInfo::ParseType( |
933 | 0 | opts, value, status_adapter_type_info, &adapter); |
934 | 0 | *status_obj = adapter.GetStatus(); |
935 | 0 | return s; |
936 | 0 | }, |
937 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
938 | 0 | const void* addr, std::string* value) { |
939 | 0 | const auto status_obj = static_cast<const Status*>(addr); |
940 | 0 | StatusSerializationAdapter adapter(*status_obj); |
941 | 0 | std::string result; |
942 | 0 | Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info, |
943 | 0 | &adapter, &result); |
944 | 0 | *value = "{" + result + "}"; |
945 | 0 | return s; |
946 | 0 | }, |
947 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
948 | 0 | const void* addr1, const void* addr2, std::string* mismatch) { |
949 | 0 | const auto status1 = static_cast<const Status*>(addr1); |
950 | 0 | const auto status2 = static_cast<const Status*>(addr2); |
951 | |
|
952 | 0 | StatusSerializationAdapter adatper1(*status1); |
953 | 0 | StatusSerializationAdapter adapter2(*status2); |
954 | 0 | return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info, |
955 | 0 | &adatper1, &adapter2, mismatch); |
956 | 0 | }}}, |
957 | | {"output_files", |
958 | | OptionTypeInfo::Vector<CompactionServiceOutputFile>( |
959 | | offsetof(struct CompactionServiceResult, output_files), |
960 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
961 | | OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0, |
962 | | OptionVerificationType::kNormal, |
963 | | OptionTypeFlags::kNone))}, |
964 | | {"output_level", |
965 | | {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt, |
966 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
967 | | {"output_path", |
968 | | {offsetof(struct CompactionServiceResult, output_path), |
969 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
970 | | OptionTypeFlags::kNone}}, |
971 | | {"bytes_read", |
972 | | {offsetof(struct CompactionServiceResult, bytes_read), |
973 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
974 | | OptionTypeFlags::kNone}}, |
975 | | {"bytes_written", |
976 | | {offsetof(struct CompactionServiceResult, bytes_written), |
977 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
978 | | OptionTypeFlags::kNone}}, |
979 | | {"stats", OptionTypeInfo::Struct( |
980 | | "stats", &compaction_job_stats_type_info, |
981 | | offsetof(struct CompactionServiceResult, stats), |
982 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
983 | | {"internal_stats", |
984 | | OptionTypeInfo::Struct( |
985 | | "internal_stats", &compaction_internal_stats_type_info, |
986 | | offsetof(struct CompactionServiceResult, internal_stats), |
987 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
988 | | }; |
989 | | |
990 | | Status CompactionServiceInput::Read(const std::string& data_str, |
991 | 0 | CompactionServiceInput* obj) { |
992 | 0 | if (data_str.size() <= sizeof(BinaryFormatVersion)) { |
993 | 0 | return Status::InvalidArgument("Invalid CompactionServiceInput string"); |
994 | 0 | } |
995 | 0 | auto format_version = DecodeFixed32(data_str.data()); |
996 | 0 | if (format_version == kOptionsString) { |
997 | 0 | ConfigOptions cf; |
998 | 0 | cf.invoke_prepare_options = false; |
999 | 0 | cf.ignore_unknown_options = true; |
1000 | 0 | return OptionTypeInfo::ParseType( |
1001 | 0 | cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info, |
1002 | 0 | obj); |
1003 | 0 | } else { |
1004 | 0 | return Status::NotSupported( |
1005 | 0 | "Compaction Service Input data version not supported: " + |
1006 | 0 | std::to_string(format_version)); |
1007 | 0 | } |
1008 | 0 | } |
1009 | | |
1010 | 0 | Status CompactionServiceInput::Write(std::string* output) { |
1011 | 0 | char buf[sizeof(BinaryFormatVersion)]; |
1012 | 0 | EncodeFixed32(buf, kOptionsString); |
1013 | 0 | output->append(buf, sizeof(BinaryFormatVersion)); |
1014 | 0 | ConfigOptions cf; |
1015 | 0 | cf.invoke_prepare_options = false; |
1016 | 0 | return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output); |
1017 | 0 | } |
1018 | | |
1019 | | Status CompactionServiceResult::Read(const std::string& data_str, |
1020 | 0 | CompactionServiceResult* obj) { |
1021 | 0 | if (data_str.size() <= sizeof(BinaryFormatVersion)) { |
1022 | 0 | return Status::InvalidArgument("Invalid CompactionServiceResult string"); |
1023 | 0 | } |
1024 | 0 | auto format_version = DecodeFixed32(data_str.data()); |
1025 | 0 | if (format_version == kOptionsString) { |
1026 | 0 | ConfigOptions cf; |
1027 | 0 | cf.invoke_prepare_options = false; |
1028 | 0 | cf.ignore_unknown_options = true; |
1029 | 0 | return OptionTypeInfo::ParseType( |
1030 | 0 | cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info, |
1031 | 0 | obj); |
1032 | 0 | } else { |
1033 | 0 | return Status::NotSupported( |
1034 | 0 | "Compaction Service Result data version not supported: " + |
1035 | 0 | std::to_string(format_version)); |
1036 | 0 | } |
1037 | 0 | } |
1038 | | |
1039 | 0 | Status CompactionServiceResult::Write(std::string* output) { |
1040 | 0 | char buf[sizeof(BinaryFormatVersion)]; |
1041 | 0 | EncodeFixed32(buf, kOptionsString); |
1042 | 0 | output->append(buf, sizeof(BinaryFormatVersion)); |
1043 | 0 | ConfigOptions cf; |
1044 | 0 | cf.invoke_prepare_options = false; |
1045 | 0 | return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output); |
1046 | 0 | } |
1047 | | |
1048 | | #ifndef NDEBUG |
1049 | | bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) { |
1050 | | std::string mismatch; |
1051 | | return TEST_Equals(other, &mismatch); |
1052 | | } |
1053 | | |
1054 | | bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other, |
1055 | | std::string* mismatch) { |
1056 | | ConfigOptions cf; |
1057 | | cf.invoke_prepare_options = false; |
1058 | | return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other, |
1059 | | mismatch); |
1060 | | } |
1061 | | |
1062 | | bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) { |
1063 | | std::string mismatch; |
1064 | | return TEST_Equals(other, &mismatch); |
1065 | | } |
1066 | | |
1067 | | bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, |
1068 | | std::string* mismatch) { |
1069 | | ConfigOptions cf; |
1070 | | cf.invoke_prepare_options = false; |
1071 | | return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other, |
1072 | | mismatch); |
1073 | | } |
1074 | | #endif // NDEBUG |
1075 | | } // namespace ROCKSDB_NAMESPACE |