/src/rocksdb/db/compaction/compaction_service_job.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | // |
7 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
8 | | // Use of this source code is governed by a BSD-style license that can be |
9 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
10 | | |
11 | | #include "db/compaction/compaction_job.h" |
12 | | #include "db/compaction/compaction_state.h" |
13 | | #include "logging/logging.h" |
14 | | #include "monitoring/iostats_context_imp.h" |
15 | | #include "monitoring/thread_status_util.h" |
16 | | #include "options/options_helper.h" |
17 | | #include "rocksdb/utilities/options_type.h" |
18 | | |
19 | | namespace ROCKSDB_NAMESPACE { |
20 | | class SubcompactionState; |
21 | | |
22 | | CompactionServiceJobStatus |
23 | | CompactionJob::ProcessKeyValueCompactionWithCompactionService( |
24 | 0 | SubcompactionState* sub_compact) { |
25 | 0 | assert(sub_compact); |
26 | 0 | assert(sub_compact->compaction); |
27 | 0 | assert(db_options_.compaction_service); |
28 | |
|
29 | 0 | const Compaction* compaction = sub_compact->compaction; |
30 | 0 | CompactionServiceInput compaction_input; |
31 | 0 | compaction_input.output_level = compaction->output_level(); |
32 | 0 | compaction_input.db_id = db_id_; |
33 | |
|
34 | 0 | const std::vector<CompactionInputFiles>& inputs = |
35 | 0 | *(compact_->compaction->inputs()); |
36 | 0 | for (const auto& files_per_level : inputs) { |
37 | 0 | for (const auto& file : files_per_level.files) { |
38 | 0 | compaction_input.input_files.emplace_back( |
39 | 0 | MakeTableFileName(file->fd.GetNumber())); |
40 | 0 | } |
41 | 0 | } |
42 | |
|
43 | 0 | compaction_input.cf_name = compaction->column_family_data()->GetName(); |
44 | 0 | compaction_input.snapshots = job_context_->snapshot_seqs; |
45 | 0 | compaction_input.has_begin = sub_compact->start.has_value(); |
46 | 0 | compaction_input.begin = |
47 | 0 | compaction_input.has_begin ? sub_compact->start->ToString() : ""; |
48 | 0 | compaction_input.has_end = sub_compact->end.has_value(); |
49 | 0 | compaction_input.end = |
50 | 0 | compaction_input.has_end ? sub_compact->end->ToString() : ""; |
51 | 0 | compaction_input.options_file_number = options_file_number_; |
52 | |
|
53 | 0 | TEST_SYNC_POINT_CALLBACK( |
54 | 0 | "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService", |
55 | 0 | &compaction_input); |
56 | |
|
57 | 0 | std::string compaction_input_binary; |
58 | 0 | Status s = compaction_input.Write(&compaction_input_binary); |
59 | 0 | if (!s.ok()) { |
60 | 0 | sub_compact->status = s; |
61 | 0 | return CompactionServiceJobStatus::kFailure; |
62 | 0 | } |
63 | | |
64 | 0 | std::ostringstream input_files_oss; |
65 | 0 | bool is_first_one = true; |
66 | 0 | for (const auto& file : compaction_input.input_files) { |
67 | 0 | input_files_oss << (is_first_one ? "" : ", ") << file; |
68 | 0 | is_first_one = false; |
69 | 0 | } |
70 | |
|
71 | 0 | ROCKS_LOG_INFO( |
72 | 0 | db_options_.info_log, |
73 | 0 | "[%s] [JOB %d] Starting remote compaction (output level: %d): %s", |
74 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
75 | 0 | compaction_input.output_level, input_files_oss.str().c_str()); |
76 | 0 | CompactionServiceJobInfo info( |
77 | 0 | dbname_, db_id_, db_session_id_, |
78 | 0 | compaction->column_family_data()->GetID(), |
79 | 0 | compaction->column_family_data()->GetName(), GetCompactionId(sub_compact), |
80 | 0 | thread_pri_, compaction->compaction_reason(), |
81 | 0 | compaction->is_full_compaction(), compaction->is_manual_compaction(), |
82 | 0 | compaction->bottommost_level(), compaction->start_level(), |
83 | 0 | compaction->output_level()); |
84 | |
|
85 | 0 | CompactionServiceScheduleResponse response = |
86 | 0 | db_options_.compaction_service->Schedule(info, compaction_input_binary); |
87 | 0 | switch (response.status) { |
88 | 0 | case CompactionServiceJobStatus::kSuccess: |
89 | 0 | break; |
90 | 0 | case CompactionServiceJobStatus::kAborted: |
91 | 0 | sub_compact->status = |
92 | 0 | Status::Aborted("Scheduling a remote compaction job was aborted"); |
93 | 0 | ROCKS_LOG_WARN( |
94 | 0 | db_options_.info_log, |
95 | 0 | "[%s] [JOB %d] Remote compaction was aborted at Schedule()", |
96 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
97 | 0 | return response.status; |
98 | 0 | case CompactionServiceJobStatus::kFailure: |
99 | 0 | sub_compact->status = Status::Incomplete( |
100 | 0 | "CompactionService failed to schedule a remote compaction job."); |
101 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
102 | 0 | "[%s] [JOB %d] Remote compaction failed to start.", |
103 | 0 | compaction->column_family_data()->GetName().c_str(), |
104 | 0 | job_id_); |
105 | 0 | return response.status; |
106 | 0 | case CompactionServiceJobStatus::kUseLocal: |
107 | 0 | ROCKS_LOG_INFO( |
108 | 0 | db_options_.info_log, |
109 | 0 | "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)", |
110 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
111 | 0 | return response.status; |
112 | 0 | default: |
113 | 0 | assert(false); // unknown status |
114 | 0 | break; |
115 | 0 | } |
116 | | |
117 | 0 | std::string debug_str_before_wait = |
118 | 0 | compaction->input_version()->DebugString(/*hex=*/true); |
119 | |
|
120 | 0 | ROCKS_LOG_INFO(db_options_.info_log, |
121 | 0 | "[%s] [JOB %d] Waiting for remote compaction...", |
122 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
123 | 0 | std::string compaction_result_binary; |
124 | 0 | CompactionServiceJobStatus compaction_status = |
125 | 0 | db_options_.compaction_service->Wait(response.scheduled_job_id, |
126 | 0 | &compaction_result_binary); |
127 | |
|
128 | 0 | if (compaction_status != CompactionServiceJobStatus::kSuccess) { |
129 | 0 | ROCKS_LOG_ERROR( |
130 | 0 | db_options_.info_log, |
131 | 0 | "[%s] [JOB %d] Wait() status is not kSuccess. " |
132 | 0 | "\nDebugString Before Wait():\n%s" |
133 | 0 | "\nDebugString After Wait():\n%s", |
134 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
135 | 0 | debug_str_before_wait.c_str(), |
136 | 0 | compaction->input_version()->DebugString(/*hex=*/true).c_str()); |
137 | 0 | } |
138 | |
|
139 | 0 | if (compaction_status == CompactionServiceJobStatus::kUseLocal) { |
140 | 0 | ROCKS_LOG_INFO( |
141 | 0 | db_options_.info_log, |
142 | 0 | "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)", |
143 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
144 | 0 | return compaction_status; |
145 | 0 | } |
146 | | |
147 | 0 | if (compaction_status == CompactionServiceJobStatus::kAborted) { |
148 | 0 | sub_compact->status = |
149 | 0 | Status::Aborted("Waiting a remote compaction job was aborted"); |
150 | 0 | ROCKS_LOG_INFO(db_options_.info_log, |
151 | 0 | "[%s] [JOB %d] Remote compaction was aborted during Wait()", |
152 | 0 | compaction->column_family_data()->GetName().c_str(), |
153 | 0 | job_id_); |
154 | 0 | return compaction_status; |
155 | 0 | } |
156 | | |
157 | 0 | CompactionServiceResult compaction_result; |
158 | 0 | s = CompactionServiceResult::Read(compaction_result_binary, |
159 | 0 | &compaction_result); |
160 | |
|
161 | 0 | if (compaction_status == CompactionServiceJobStatus::kFailure) { |
162 | 0 | if (s.ok()) { |
163 | 0 | if (compaction_result.status.ok()) { |
164 | 0 | sub_compact->status = Status::Incomplete( |
165 | 0 | "CompactionService failed to run the compaction job (even though " |
166 | 0 | "the internal status is okay)."); |
167 | 0 | } else { |
168 | | // set the current sub compaction status with the status returned from |
169 | | // remote |
170 | 0 | sub_compact->status = compaction_result.status; |
171 | 0 | } |
172 | 0 | } else { |
173 | 0 | sub_compact->status = Status::Incomplete( |
174 | 0 | "CompactionService failed to run the compaction job (and no valid " |
175 | 0 | "result is returned)."); |
176 | 0 | compaction_result.status.PermitUncheckedError(); |
177 | 0 | } |
178 | 0 | ROCKS_LOG_WARN( |
179 | 0 | db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.", |
180 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_); |
181 | 0 | return compaction_status; |
182 | 0 | } |
183 | | |
184 | | // CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to |
185 | | // read the result. Consider this as an installation failure |
186 | 0 | if (!s.ok()) { |
187 | 0 | sub_compact->status = s; |
188 | 0 | compaction_result.status.PermitUncheckedError(); |
189 | 0 | db_options_.compaction_service->OnInstallation( |
190 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kFailure); |
191 | 0 | return CompactionServiceJobStatus::kFailure; |
192 | 0 | } |
193 | 0 | sub_compact->status = compaction_result.status; |
194 | |
|
195 | 0 | std::ostringstream output_files_oss; |
196 | 0 | is_first_one = true; |
197 | 0 | for (const auto& file : compaction_result.output_files) { |
198 | 0 | output_files_oss << (is_first_one ? "" : ", ") << file.file_name; |
199 | 0 | is_first_one = false; |
200 | 0 | } |
201 | |
|
202 | 0 | ROCKS_LOG_INFO( |
203 | 0 | db_options_.info_log, |
204 | 0 | "[%s] [JOB %d] Received remote compaction result, output path: " |
205 | 0 | "%s, files: %s", |
206 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
207 | 0 | compaction_result.output_path.c_str(), output_files_oss.str().c_str()); |
208 | | |
209 | | // Installation Starts |
210 | 0 | for (const auto& file : compaction_result.output_files) { |
211 | 0 | uint64_t file_num = versions_->NewFileNumber(); |
212 | 0 | auto src_file = compaction_result.output_path + "/" + file.file_name; |
213 | 0 | auto tgt_file = TableFileName(compaction->immutable_options().cf_paths, |
214 | 0 | file_num, compaction->output_path_id()); |
215 | 0 | s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr); |
216 | 0 | if (!s.ok()) { |
217 | 0 | sub_compact->status = s; |
218 | 0 | db_options_.compaction_service->OnInstallation( |
219 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kFailure); |
220 | 0 | return CompactionServiceJobStatus::kFailure; |
221 | 0 | } |
222 | | |
223 | 0 | FileMetaData meta; |
224 | 0 | uint64_t file_size = file.file_size; |
225 | | |
226 | | // TODO - Clean this up in the next release. |
227 | | // For backward compatibility - in case the remote worker does not populate |
228 | | // the file_size yet. If missing, continue to populate this from the file |
229 | | // system. |
230 | 0 | if (file_size == 0) { |
231 | 0 | s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr); |
232 | 0 | } |
233 | |
|
234 | 0 | if (!s.ok()) { |
235 | 0 | sub_compact->status = s; |
236 | 0 | db_options_.compaction_service->OnInstallation( |
237 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kFailure); |
238 | 0 | return CompactionServiceJobStatus::kFailure; |
239 | 0 | } |
240 | 0 | assert(file_size > 0); |
241 | |
|
242 | 0 | meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size, |
243 | 0 | file.smallest_seqno, file.largest_seqno); |
244 | 0 | meta.smallest.DecodeFrom(file.smallest_internal_key); |
245 | 0 | meta.largest.DecodeFrom(file.largest_internal_key); |
246 | 0 | meta.oldest_ancester_time = file.oldest_ancester_time; |
247 | 0 | meta.file_creation_time = file.file_creation_time; |
248 | 0 | meta.epoch_number = file.epoch_number; |
249 | 0 | meta.file_checksum = file.file_checksum; |
250 | 0 | meta.file_checksum_func_name = file.file_checksum_func_name; |
251 | 0 | meta.marked_for_compaction = file.marked_for_compaction; |
252 | 0 | meta.unique_id = file.unique_id; |
253 | 0 | meta.temperature = file.file_temperature; |
254 | 0 | meta.tail_size = |
255 | 0 | FileMetaData::CalculateTailSize(file_size, file.table_properties); |
256 | 0 | auto cfd = compaction->column_family_data(); |
257 | 0 | CompactionOutputs* compaction_outputs = |
258 | 0 | sub_compact->Outputs(file.is_proximal_level_output); |
259 | 0 | assert(compaction_outputs); |
260 | 0 | compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(), |
261 | 0 | false, true, file.paranoid_hash); |
262 | 0 | compaction_outputs->UpdateTableProperties(file.table_properties); |
263 | 0 | } |
264 | | |
265 | | // Set per-level stats |
266 | 0 | auto compaction_output_stats = |
267 | 0 | sub_compact->OutputStats(false /* is_proximal_level */); |
268 | 0 | assert(compaction_output_stats); |
269 | 0 | compaction_output_stats->Add( |
270 | 0 | compaction_result.internal_stats.output_level_stats); |
271 | 0 | if (compaction->SupportsPerKeyPlacement()) { |
272 | 0 | compaction_output_stats = |
273 | 0 | sub_compact->OutputStats(true /* is_proximal_level */); |
274 | 0 | assert(compaction_output_stats); |
275 | 0 | compaction_output_stats->Add( |
276 | 0 | compaction_result.internal_stats.proximal_level_stats); |
277 | 0 | } |
278 | | |
279 | | // Set job stats |
280 | 0 | sub_compact->compaction_job_stats = compaction_result.stats; |
281 | |
|
282 | 0 | RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read); |
283 | 0 | RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES, |
284 | 0 | compaction_result.bytes_written); |
285 | 0 | db_options_.compaction_service->OnInstallation( |
286 | 0 | response.scheduled_job_id, CompactionServiceJobStatus::kSuccess); |
287 | 0 | return CompactionServiceJobStatus::kSuccess; |
288 | 0 | } |
289 | | |
290 | | std::string CompactionServiceCompactionJob::GetTableFileName( |
291 | 0 | uint64_t file_number) { |
292 | 0 | return MakeTableFileName(output_path_, file_number); |
293 | 0 | } |
294 | | |
295 | 0 | void CompactionServiceCompactionJob::RecordCompactionIOStats() { |
296 | 0 | compaction_result_->bytes_read += IOSTATS(bytes_read); |
297 | 0 | compaction_result_->bytes_written += IOSTATS(bytes_written); |
298 | 0 | CompactionJob::RecordCompactionIOStats(); |
299 | 0 | } |
300 | | |
301 | | CompactionServiceCompactionJob::CompactionServiceCompactionJob( |
302 | | int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, |
303 | | const MutableDBOptions& mutable_db_options, const FileOptions& file_options, |
304 | | VersionSet* versions, const std::atomic<bool>* shutting_down, |
305 | | LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats, |
306 | | InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, |
307 | | JobContext* job_context, std::shared_ptr<Cache> table_cache, |
308 | | EventLogger* event_logger, const std::string& dbname, |
309 | | const std::shared_ptr<IOTracer>& io_tracer, |
310 | | const std::atomic<bool>& manual_compaction_canceled, |
311 | | const std::string& db_id, const std::string& db_session_id, |
312 | | std::string output_path, |
313 | | const CompactionServiceInput& compaction_service_input, |
314 | | CompactionServiceResult* compaction_service_result) |
315 | 0 | : CompactionJob(job_id, compaction, db_options, mutable_db_options, |
316 | 0 | file_options, versions, shutting_down, log_buffer, nullptr, |
317 | 0 | output_directory, nullptr, stats, db_mutex, |
318 | 0 | db_error_handler, job_context, std::move(table_cache), |
319 | 0 | event_logger, |
320 | 0 | compaction->mutable_cf_options().paranoid_file_checks, |
321 | 0 | compaction->mutable_cf_options().report_bg_io_stats, dbname, |
322 | 0 | &(compaction_service_result->stats), Env::Priority::USER, |
323 | 0 | io_tracer, manual_compaction_canceled, db_id, db_session_id, |
324 | 0 | compaction->column_family_data()->GetFullHistoryTsLow()), |
325 | 0 | output_path_(std::move(output_path)), |
326 | 0 | compaction_input_(compaction_service_input), |
327 | 0 | compaction_result_(compaction_service_result) {} |
328 | | |
329 | | void CompactionServiceCompactionJob::Prepare( |
330 | | const CompactionProgress& compaction_progress, |
331 | 0 | log::Writer* compaction_progress_writer) { |
332 | 0 | std::optional<Slice> begin; |
333 | 0 | if (compaction_input_.has_begin) { |
334 | 0 | begin = compaction_input_.begin; |
335 | 0 | } |
336 | 0 | std::optional<Slice> end; |
337 | 0 | if (compaction_input_.has_end) { |
338 | 0 | end = compaction_input_.end; |
339 | 0 | } |
340 | 0 | CompactionJob::Prepare(std::make_pair(begin, end), compaction_progress, |
341 | 0 | compaction_progress_writer); |
342 | 0 | } |
343 | | |
344 | 0 | Status CompactionServiceCompactionJob::Run() { |
345 | 0 | AutoThreadOperationStageUpdater stage_updater( |
346 | 0 | ThreadStatus::STAGE_COMPACTION_RUN); |
347 | |
|
348 | 0 | auto* c = compact_->compaction; |
349 | |
|
350 | 0 | log_buffer_->FlushBufferToLog(); |
351 | 0 | LogCompaction(); |
352 | |
|
353 | 0 | compaction_result_->stats.Reset(); |
354 | |
|
355 | 0 | const uint64_t start_micros = db_options_.clock->NowMicros(); |
356 | 0 | c->GetOrInitInputTableProperties(); |
357 | | |
358 | | // Pick the only sub-compaction we should have |
359 | 0 | assert(compact_->sub_compact_states.size() == 1); |
360 | 0 | SubcompactionState* sub_compact = compact_->sub_compact_states.data(); |
361 | |
|
362 | 0 | ProcessKeyValueCompaction(sub_compact); |
363 | |
|
364 | 0 | uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros; |
365 | 0 | internal_stats_.SetMicros(elapsed_micros); |
366 | 0 | internal_stats_.AddCpuMicros(elapsed_micros); |
367 | |
|
368 | 0 | RecordTimeToHistogram(stats_, COMPACTION_TIME, |
369 | 0 | internal_stats_.output_level_stats.micros); |
370 | 0 | RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME, |
371 | 0 | internal_stats_.output_level_stats.cpu_micros); |
372 | |
|
373 | 0 | Status status = sub_compact->status; |
374 | 0 | IOStatus io_s = sub_compact->io_status; |
375 | |
|
376 | 0 | if (io_status_.ok()) { |
377 | 0 | io_status_ = io_s; |
378 | 0 | } |
379 | |
|
380 | 0 | if (status.ok()) { |
381 | 0 | constexpr IODebugContext* dbg = nullptr; |
382 | |
|
383 | 0 | if (output_directory_) { |
384 | 0 | io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg, |
385 | 0 | DirFsyncOptions()); |
386 | 0 | } |
387 | 0 | } |
388 | 0 | if (io_status_.ok()) { |
389 | 0 | io_status_ = io_s; |
390 | 0 | } |
391 | 0 | if (status.ok()) { |
392 | 0 | status = io_s; |
393 | 0 | } |
394 | |
|
395 | 0 | LogFlush(db_options_.info_log); |
396 | 0 | compact_->status = status; |
397 | 0 | compact_->status.PermitUncheckedError(); |
398 | | |
399 | | // Build Compaction Job Stats |
400 | | |
401 | | // 1. Aggregate internal stats and job stats for all subcompactions |
402 | | // internal stats: sub_compact.proximal_level_outputs_.stats and |
403 | | // sub_compact.compaction_outputs_.stats into |
404 | | // internal_stats_.output_level_stats and |
405 | | // internal_stats_.proximal_level_stats |
406 | | // job-level stats: sub_compact.compaction_job_stats into compact.job_stats_ |
407 | | // |
408 | | // For remote compaction, there's only one subcompaction. |
409 | 0 | compact_->AggregateCompactionStats(internal_stats_, *job_stats_); |
410 | | |
411 | | // 2. Update job-level output stats with the aggregated internal_stats_ |
412 | | // Please note that input stats will be updated by primary host when all |
413 | | // subcompactions are finished |
414 | 0 | UpdateCompactionJobOutputStatsFromInternalStats(status, internal_stats_); |
415 | | // and set fields that are not propagated as part of the update |
416 | 0 | compaction_result_->stats.is_manual_compaction = c->is_manual_compaction(); |
417 | 0 | compaction_result_->stats.is_full_compaction = c->is_full_compaction(); |
418 | 0 | compaction_result_->stats.is_remote_compaction = true; |
419 | | |
420 | | // 3. Update IO Stats that are not part of the the update above |
421 | | // (bytes_read, bytes_written) |
422 | 0 | RecordCompactionIOStats(); |
423 | | |
424 | | // Build Output |
425 | 0 | compaction_result_->internal_stats = internal_stats_; |
426 | 0 | compaction_result_->output_level = compact_->compaction->output_level(); |
427 | 0 | compaction_result_->output_path = output_path_; |
428 | 0 | if (status.ok()) { |
429 | 0 | for (const auto& output_file : sub_compact->GetOutputs()) { |
430 | 0 | auto& meta = output_file.meta; |
431 | 0 | compaction_result_->output_files.emplace_back( |
432 | 0 | MakeTableFileName(meta.fd.GetNumber()), meta.fd.GetFileSize(), |
433 | 0 | meta.fd.smallest_seqno, meta.fd.largest_seqno, |
434 | 0 | meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(), |
435 | 0 | meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number, |
436 | 0 | meta.file_checksum, meta.file_checksum_func_name, |
437 | 0 | output_file.validator.GetHash(), meta.marked_for_compaction, |
438 | 0 | meta.unique_id, *output_file.table_properties, |
439 | 0 | output_file.is_proximal_level, meta.temperature); |
440 | 0 | } |
441 | 0 | } |
442 | |
|
443 | 0 | TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0", |
444 | 0 | &compaction_result_); |
445 | 0 | return status; |
446 | 0 | } |
447 | | |
448 | 0 | void CompactionServiceCompactionJob::CleanupCompaction() { |
449 | 0 | CompactionJob::CleanupCompaction(); |
450 | 0 | } |
451 | | |
452 | | // Internal binary format for the input and result data |
453 | | enum BinaryFormatVersion : uint32_t { |
454 | | kOptionsString = 1, // Use string format similar to Option string format |
455 | | }; |
456 | | |
457 | | static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = { |
458 | | {"name", |
459 | | {offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString, |
460 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
461 | | {"options", |
462 | | {offsetof(struct ColumnFamilyDescriptor, options), |
463 | | OptionType::kConfigurable, OptionVerificationType::kNormal, |
464 | | OptionTypeFlags::kNone, |
465 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
466 | 0 | const std::string& value, void* addr) { |
467 | 0 | auto cf_options = static_cast<ColumnFamilyOptions*>(addr); |
468 | 0 | return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(), |
469 | 0 | value, cf_options); |
470 | 0 | }, |
471 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
472 | 0 | const void* addr, std::string* value) { |
473 | 0 | const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr); |
474 | 0 | std::string result; |
475 | 0 | auto status = |
476 | 0 | GetStringFromColumnFamilyOptions(opts, *cf_options, &result); |
477 | 0 | *value = "{" + result + "}"; |
478 | 0 | return status; |
479 | 0 | }, |
480 | | [](const ConfigOptions& opts, const std::string& name, const void* addr1, |
481 | 0 | const void* addr2, std::string* mismatch) { |
482 | 0 | const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1); |
483 | 0 | const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2); |
484 | 0 | auto this_conf = CFOptionsAsConfigurable(*this_one); |
485 | 0 | auto that_conf = CFOptionsAsConfigurable(*that_one); |
486 | 0 | std::string mismatch_opt; |
487 | 0 | bool result = |
488 | 0 | this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt); |
489 | 0 | if (!result) { |
490 | 0 | *mismatch = name + "." + mismatch_opt; |
491 | 0 | } |
492 | 0 | return result; |
493 | 0 | }}}, |
494 | | }; |
495 | | |
496 | | static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = { |
497 | | {"cf_name", |
498 | | {offsetof(struct CompactionServiceInput, cf_name), |
499 | | OptionType::kEncodedString}}, |
500 | | {"snapshots", OptionTypeInfo::Vector<uint64_t>( |
501 | | offsetof(struct CompactionServiceInput, snapshots), |
502 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
503 | | {0, OptionType::kUInt64T})}, |
504 | | {"input_files", OptionTypeInfo::Vector<std::string>( |
505 | | offsetof(struct CompactionServiceInput, input_files), |
506 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
507 | | {0, OptionType::kEncodedString})}, |
508 | | {"output_level", |
509 | | {offsetof(struct CompactionServiceInput, output_level), OptionType::kInt, |
510 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
511 | | {"db_id", |
512 | | {offsetof(struct CompactionServiceInput, db_id), |
513 | | OptionType::kEncodedString}}, |
514 | | {"has_begin", |
515 | | {offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean, |
516 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
517 | | {"begin", |
518 | | {offsetof(struct CompactionServiceInput, begin), |
519 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
520 | | OptionTypeFlags::kNone}}, |
521 | | {"has_end", |
522 | | {offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean, |
523 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
524 | | {"end", |
525 | | {offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString, |
526 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
527 | | {"options_file_number", |
528 | | {offsetof(struct CompactionServiceInput, options_file_number), |
529 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
530 | | OptionTypeFlags::kNone}}, |
531 | | }; |
532 | | |
533 | | static std::unordered_map<std::string, OptionTypeInfo> |
534 | | cs_output_file_type_info = { |
535 | | {"file_name", |
536 | | {offsetof(struct CompactionServiceOutputFile, file_name), |
537 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
538 | | OptionTypeFlags::kNone}}, |
539 | | {"file_size", |
540 | | {offsetof(struct CompactionServiceOutputFile, file_size), |
541 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
542 | | OptionTypeFlags::kNone}}, |
543 | | {"smallest_seqno", |
544 | | {offsetof(struct CompactionServiceOutputFile, smallest_seqno), |
545 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
546 | | OptionTypeFlags::kNone}}, |
547 | | {"largest_seqno", |
548 | | {offsetof(struct CompactionServiceOutputFile, largest_seqno), |
549 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
550 | | OptionTypeFlags::kNone}}, |
551 | | {"smallest_internal_key", |
552 | | {offsetof(struct CompactionServiceOutputFile, smallest_internal_key), |
553 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
554 | | OptionTypeFlags::kNone}}, |
555 | | {"largest_internal_key", |
556 | | {offsetof(struct CompactionServiceOutputFile, largest_internal_key), |
557 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
558 | | OptionTypeFlags::kNone}}, |
559 | | {"oldest_ancester_time", |
560 | | {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time), |
561 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
562 | | OptionTypeFlags::kNone}}, |
563 | | {"file_creation_time", |
564 | | {offsetof(struct CompactionServiceOutputFile, file_creation_time), |
565 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
566 | | OptionTypeFlags::kNone}}, |
567 | | {"epoch_number", |
568 | | {offsetof(struct CompactionServiceOutputFile, epoch_number), |
569 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
570 | | OptionTypeFlags::kNone}}, |
571 | | {"file_checksum", |
572 | | {offsetof(struct CompactionServiceOutputFile, file_checksum), |
573 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
574 | | OptionTypeFlags::kNone}}, |
575 | | {"file_checksum_func_name", |
576 | | {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name), |
577 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
578 | | OptionTypeFlags::kNone}}, |
579 | | {"paranoid_hash", |
580 | | {offsetof(struct CompactionServiceOutputFile, paranoid_hash), |
581 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
582 | | OptionTypeFlags::kNone}}, |
583 | | {"marked_for_compaction", |
584 | | {offsetof(struct CompactionServiceOutputFile, marked_for_compaction), |
585 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
586 | | OptionTypeFlags::kNone}}, |
587 | | {"unique_id", |
588 | | OptionTypeInfo::Array<uint64_t, 2>( |
589 | | offsetof(struct CompactionServiceOutputFile, unique_id), |
590 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
591 | | {0, OptionType::kUInt64T})}, |
592 | | {"table_properties", |
593 | | {offsetof(struct CompactionServiceOutputFile, table_properties), |
594 | | OptionType::kStruct, OptionVerificationType::kNormal, |
595 | | OptionTypeFlags::kNone, |
596 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
597 | 0 | const std::string& value, void* addr) { |
598 | 0 | auto table_properties = static_cast<TableProperties*>(addr); |
599 | 0 | return TableProperties::Parse(opts, value, table_properties); |
600 | 0 | }, |
601 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
602 | 0 | const void* addr, std::string* value) { |
603 | 0 | const auto table_properties = |
604 | 0 | static_cast<const TableProperties*>(addr); |
605 | 0 | std::string result; |
606 | 0 | auto status = table_properties->Serialize(opts, &result); |
607 | 0 | *value = "{" + result + "}"; |
608 | 0 | return status; |
609 | 0 | }, |
610 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
611 | 0 | const void* addr1, const void* addr2, std::string* mismatch) { |
612 | 0 | const auto this_one = static_cast<const TableProperties*>(addr1); |
613 | 0 | const auto that_one = static_cast<const TableProperties*>(addr2); |
614 | 0 | return this_one->AreEqual(opts, that_one, mismatch); |
615 | 0 | }}}, |
616 | | {"is_proximal_level_output", |
617 | | {offsetof(struct CompactionServiceOutputFile, |
618 | | is_proximal_level_output), |
619 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
620 | | OptionTypeFlags::kNone}}, |
621 | | {"file_temperature", |
622 | | {offsetof(struct CompactionServiceOutputFile, file_temperature), |
623 | | OptionType::kTemperature, OptionVerificationType::kNormal, |
624 | | OptionTypeFlags::kNone}}}; |
625 | | |
626 | | static std::unordered_map<std::string, OptionTypeInfo> |
627 | | compaction_job_stats_type_info = { |
628 | | {"elapsed_micros", |
629 | | {offsetof(struct CompactionJobStats, elapsed_micros), |
630 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
631 | | OptionTypeFlags::kNone}}, |
632 | | {"cpu_micros", |
633 | | {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T, |
634 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
635 | | {"num_input_records", |
636 | | {offsetof(struct CompactionJobStats, num_input_records), |
637 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
638 | | OptionTypeFlags::kNone}}, |
639 | | {"num_blobs_read", |
640 | | {offsetof(struct CompactionJobStats, num_blobs_read), |
641 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
642 | | OptionTypeFlags::kNone}}, |
643 | | {"num_input_files", |
644 | | {offsetof(struct CompactionJobStats, num_input_files), |
645 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
646 | | OptionTypeFlags::kNone}}, |
647 | | {"num_input_files_at_output_level", |
648 | | {offsetof(struct CompactionJobStats, num_input_files_at_output_level), |
649 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
650 | | OptionTypeFlags::kNone}}, |
651 | | {"num_output_records", |
652 | | {offsetof(struct CompactionJobStats, num_output_records), |
653 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
654 | | OptionTypeFlags::kNone}}, |
655 | | {"num_output_files", |
656 | | {offsetof(struct CompactionJobStats, num_output_files), |
657 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
658 | | OptionTypeFlags::kNone}}, |
659 | | {"num_output_files_blob", |
660 | | {offsetof(struct CompactionJobStats, num_output_files_blob), |
661 | | OptionType::kSizeT, OptionVerificationType::kNormal, |
662 | | OptionTypeFlags::kNone}}, |
663 | | {"is_full_compaction", |
664 | | {offsetof(struct CompactionJobStats, is_full_compaction), |
665 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
666 | | OptionTypeFlags::kNone}}, |
667 | | {"is_manual_compaction", |
668 | | {offsetof(struct CompactionJobStats, is_manual_compaction), |
669 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
670 | | OptionTypeFlags::kNone}}, |
671 | | {"is_remote_compaction", |
672 | | {offsetof(struct CompactionJobStats, is_remote_compaction), |
673 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
674 | | OptionTypeFlags::kNone}}, |
675 | | {"total_input_bytes", |
676 | | {offsetof(struct CompactionJobStats, total_input_bytes), |
677 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
678 | | OptionTypeFlags::kNone}}, |
679 | | {"total_blob_bytes_read", |
680 | | {offsetof(struct CompactionJobStats, total_blob_bytes_read), |
681 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
682 | | OptionTypeFlags::kNone}}, |
683 | | {"total_output_bytes", |
684 | | {offsetof(struct CompactionJobStats, total_output_bytes), |
685 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
686 | | OptionTypeFlags::kNone}}, |
687 | | {"total_output_bytes_blob", |
688 | | {offsetof(struct CompactionJobStats, total_output_bytes_blob), |
689 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
690 | | OptionTypeFlags::kNone}}, |
691 | | {"num_records_replaced", |
692 | | {offsetof(struct CompactionJobStats, num_records_replaced), |
693 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
694 | | OptionTypeFlags::kNone}}, |
695 | | {"total_input_raw_key_bytes", |
696 | | {offsetof(struct CompactionJobStats, total_input_raw_key_bytes), |
697 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
698 | | OptionTypeFlags::kNone}}, |
699 | | {"total_input_raw_value_bytes", |
700 | | {offsetof(struct CompactionJobStats, total_input_raw_value_bytes), |
701 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
702 | | OptionTypeFlags::kNone}}, |
703 | | {"num_input_deletion_records", |
704 | | {offsetof(struct CompactionJobStats, num_input_deletion_records), |
705 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
706 | | OptionTypeFlags::kNone}}, |
707 | | {"num_expired_deletion_records", |
708 | | {offsetof(struct CompactionJobStats, num_expired_deletion_records), |
709 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
710 | | OptionTypeFlags::kNone}}, |
711 | | {"num_corrupt_keys", |
712 | | {offsetof(struct CompactionJobStats, num_corrupt_keys), |
713 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
714 | | OptionTypeFlags::kNone}}, |
715 | | {"file_write_nanos", |
716 | | {offsetof(struct CompactionJobStats, file_write_nanos), |
717 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
718 | | OptionTypeFlags::kNone}}, |
719 | | {"file_range_sync_nanos", |
720 | | {offsetof(struct CompactionJobStats, file_range_sync_nanos), |
721 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
722 | | OptionTypeFlags::kNone}}, |
723 | | {"file_fsync_nanos", |
724 | | {offsetof(struct CompactionJobStats, file_fsync_nanos), |
725 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
726 | | OptionTypeFlags::kNone}}, |
727 | | {"file_prepare_write_nanos", |
728 | | {offsetof(struct CompactionJobStats, file_prepare_write_nanos), |
729 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
730 | | OptionTypeFlags::kNone}}, |
731 | | {"smallest_output_key_prefix", |
732 | | {offsetof(struct CompactionJobStats, smallest_output_key_prefix), |
733 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
734 | | OptionTypeFlags::kNone}}, |
735 | | {"largest_output_key_prefix", |
736 | | {offsetof(struct CompactionJobStats, largest_output_key_prefix), |
737 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
738 | | OptionTypeFlags::kNone}}, |
739 | | {"num_single_del_fallthru", |
740 | | {offsetof(struct CompactionJobStats, num_single_del_fallthru), |
741 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
742 | | OptionTypeFlags::kNone}}, |
743 | | {"num_single_del_mismatch", |
744 | | {offsetof(struct CompactionJobStats, num_single_del_mismatch), |
745 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
746 | | OptionTypeFlags::kNone}}, |
747 | | }; |
748 | | |
749 | | static std::unordered_map<std::string, OptionTypeInfo> |
750 | | compaction_stats_type_info = { |
751 | | {"micros", |
752 | | {offsetof(struct InternalStats::CompactionStats, micros), |
753 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
754 | | OptionTypeFlags::kNone}}, |
755 | | {"cpu_micros", |
756 | | {offsetof(struct InternalStats::CompactionStats, cpu_micros), |
757 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
758 | | OptionTypeFlags::kNone}}, |
759 | | {"bytes_read_non_output_levels", |
760 | | {offsetof(struct InternalStats::CompactionStats, |
761 | | bytes_read_non_output_levels), |
762 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
763 | | OptionTypeFlags::kNone}}, |
764 | | {"bytes_read_output_level", |
765 | | {offsetof(struct InternalStats::CompactionStats, |
766 | | bytes_read_output_level), |
767 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
768 | | OptionTypeFlags::kNone}}, |
769 | | {"bytes_skipped_non_output_levels", |
770 | | {offsetof(struct InternalStats::CompactionStats, |
771 | | bytes_skipped_non_output_levels), |
772 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
773 | | OptionTypeFlags::kNone}}, |
774 | | {"bytes_skipped_output_level", |
775 | | {offsetof(struct InternalStats::CompactionStats, |
776 | | bytes_skipped_output_level), |
777 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
778 | | OptionTypeFlags::kNone}}, |
779 | | {"bytes_read_blob", |
780 | | {offsetof(struct InternalStats::CompactionStats, bytes_read_blob), |
781 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
782 | | OptionTypeFlags::kNone}}, |
783 | | {"bytes_written", |
784 | | {offsetof(struct InternalStats::CompactionStats, bytes_written), |
785 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
786 | | OptionTypeFlags::kNone}}, |
787 | | {"bytes_written_blob", |
788 | | {offsetof(struct InternalStats::CompactionStats, bytes_written_blob), |
789 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
790 | | OptionTypeFlags::kNone}}, |
791 | | {"bytes_moved", |
792 | | {offsetof(struct InternalStats::CompactionStats, bytes_moved), |
793 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
794 | | OptionTypeFlags::kNone}}, |
795 | | {"num_input_files_in_non_output_levels", |
796 | | {offsetof(struct InternalStats::CompactionStats, |
797 | | num_input_files_in_non_output_levels), |
798 | | OptionType::kInt, OptionVerificationType::kNormal, |
799 | | OptionTypeFlags::kNone}}, |
800 | | {"num_input_files_in_output_level", |
801 | | {offsetof(struct InternalStats::CompactionStats, |
802 | | num_input_files_in_output_level), |
803 | | OptionType::kInt, OptionVerificationType::kNormal, |
804 | | OptionTypeFlags::kNone}}, |
805 | | {"num_filtered_input_files_in_non_output_levels", |
806 | | {offsetof(struct InternalStats::CompactionStats, |
807 | | num_filtered_input_files_in_non_output_levels), |
808 | | OptionType::kInt, OptionVerificationType::kNormal, |
809 | | OptionTypeFlags::kNone}}, |
810 | | {"num_filtered_input_files_in_output_level", |
811 | | {offsetof(struct InternalStats::CompactionStats, |
812 | | num_filtered_input_files_in_output_level), |
813 | | OptionType::kInt, OptionVerificationType::kNormal, |
814 | | OptionTypeFlags::kNone}}, |
815 | | {"num_output_files", |
816 | | {offsetof(struct InternalStats::CompactionStats, num_output_files), |
817 | | OptionType::kInt, OptionVerificationType::kNormal, |
818 | | OptionTypeFlags::kNone}}, |
819 | | {"num_output_files_blob", |
820 | | {offsetof(struct InternalStats::CompactionStats, |
821 | | num_output_files_blob), |
822 | | OptionType::kInt, OptionVerificationType::kNormal, |
823 | | OptionTypeFlags::kNone}}, |
824 | | {"num_input_records", |
825 | | {offsetof(struct InternalStats::CompactionStats, num_input_records), |
826 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
827 | | OptionTypeFlags::kNone}}, |
828 | | {"num_dropped_records", |
829 | | {offsetof(struct InternalStats::CompactionStats, num_dropped_records), |
830 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
831 | | OptionTypeFlags::kNone}}, |
832 | | {"num_output_records", |
833 | | {offsetof(struct InternalStats::CompactionStats, num_output_records), |
834 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
835 | | OptionTypeFlags::kNone}}, |
836 | | {"count", |
837 | | {offsetof(struct InternalStats::CompactionStats, count), |
838 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
839 | | OptionTypeFlags::kNone}}, |
840 | | {"counts", OptionTypeInfo::Array< |
841 | | int, static_cast<int>(CompactionReason::kNumOfReasons)>( |
842 | | offsetof(struct InternalStats::CompactionStats, counts), |
843 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
844 | | {0, OptionType::kInt})}, |
845 | | }; |
846 | | |
847 | | static std::unordered_map<std::string, OptionTypeInfo> |
848 | | compaction_internal_stats_type_info = { |
849 | | {"output_level_stats", |
850 | | OptionTypeInfo::Struct( |
851 | | "output_level_stats", &compaction_stats_type_info, |
852 | | offsetof(struct InternalStats::CompactionStatsFull, |
853 | | output_level_stats), |
854 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
855 | | {"has_proximal_level_output", |
856 | | {offsetof(struct InternalStats::CompactionStatsFull, |
857 | | has_proximal_level_output), |
858 | | OptionType::kBoolean, OptionVerificationType::kNormal, |
859 | | OptionTypeFlags::kNone}}, |
860 | | {"proximal_level_stats", |
861 | | OptionTypeInfo::Struct( |
862 | | "proximal_level_stats", &compaction_stats_type_info, |
863 | | offsetof(struct InternalStats::CompactionStatsFull, |
864 | | proximal_level_stats), |
865 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
866 | | }; |
867 | | |
868 | | namespace { |
869 | | // this is a helper struct to serialize and deserialize class Status, because |
870 | | // Status's members are not public. |
871 | | struct StatusSerializationAdapter { |
872 | | uint8_t code; |
873 | | uint8_t subcode; |
874 | | uint8_t severity; |
875 | | std::string message; |
876 | | |
877 | 0 | StatusSerializationAdapter() = default; |
878 | 0 | explicit StatusSerializationAdapter(const Status& s) { |
879 | 0 | code = s.code(); |
880 | 0 | subcode = s.subcode(); |
881 | 0 | severity = s.severity(); |
882 | 0 | auto msg = s.getState(); |
883 | 0 | message = msg ? msg : ""; |
884 | 0 | } |
885 | | |
886 | 0 | Status GetStatus() const { |
887 | 0 | return Status{static_cast<Status::Code>(code), |
888 | 0 | static_cast<Status::SubCode>(subcode), |
889 | 0 | static_cast<Status::Severity>(severity), message}; |
890 | 0 | } |
891 | | }; |
892 | | } // namespace |
893 | | |
894 | | static std::unordered_map<std::string, OptionTypeInfo> |
895 | | status_adapter_type_info = { |
896 | | {"code", |
897 | | {offsetof(struct StatusSerializationAdapter, code), |
898 | | OptionType::kUInt8T, OptionVerificationType::kNormal, |
899 | | OptionTypeFlags::kNone}}, |
900 | | {"subcode", |
901 | | {offsetof(struct StatusSerializationAdapter, subcode), |
902 | | OptionType::kUInt8T, OptionVerificationType::kNormal, |
903 | | OptionTypeFlags::kNone}}, |
904 | | {"severity", |
905 | | {offsetof(struct StatusSerializationAdapter, severity), |
906 | | OptionType::kUInt8T, OptionVerificationType::kNormal, |
907 | | OptionTypeFlags::kNone}}, |
908 | | {"message", |
909 | | {offsetof(struct StatusSerializationAdapter, message), |
910 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
911 | | OptionTypeFlags::kNone}}, |
912 | | }; |
913 | | |
914 | | static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = { |
915 | | {"status", |
916 | | {offsetof(struct CompactionServiceResult, status), |
917 | | OptionType::kCustomizable, OptionVerificationType::kNormal, |
918 | | OptionTypeFlags::kNone, |
919 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
920 | 0 | const std::string& value, void* addr) { |
921 | 0 | auto status_obj = static_cast<Status*>(addr); |
922 | 0 | StatusSerializationAdapter adapter; |
923 | 0 | Status s = OptionTypeInfo::ParseType( |
924 | 0 | opts, value, status_adapter_type_info, &adapter); |
925 | 0 | *status_obj = adapter.GetStatus(); |
926 | 0 | return s; |
927 | 0 | }, |
928 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
929 | 0 | const void* addr, std::string* value) { |
930 | 0 | const auto status_obj = static_cast<const Status*>(addr); |
931 | 0 | StatusSerializationAdapter adapter(*status_obj); |
932 | 0 | std::string result; |
933 | 0 | Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info, |
934 | 0 | &adapter, &result); |
935 | 0 | *value = "{" + result + "}"; |
936 | 0 | return s; |
937 | 0 | }, |
938 | | [](const ConfigOptions& opts, const std::string& /*name*/, |
939 | 0 | const void* addr1, const void* addr2, std::string* mismatch) { |
940 | 0 | const auto status1 = static_cast<const Status*>(addr1); |
941 | 0 | const auto status2 = static_cast<const Status*>(addr2); |
942 | |
|
943 | 0 | StatusSerializationAdapter adatper1(*status1); |
944 | 0 | StatusSerializationAdapter adapter2(*status2); |
945 | 0 | return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info, |
946 | 0 | &adatper1, &adapter2, mismatch); |
947 | 0 | }}}, |
948 | | {"output_files", |
949 | | OptionTypeInfo::Vector<CompactionServiceOutputFile>( |
950 | | offsetof(struct CompactionServiceResult, output_files), |
951 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone, |
952 | | OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0, |
953 | | OptionVerificationType::kNormal, |
954 | | OptionTypeFlags::kNone))}, |
955 | | {"output_level", |
956 | | {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt, |
957 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone}}, |
958 | | {"output_path", |
959 | | {offsetof(struct CompactionServiceResult, output_path), |
960 | | OptionType::kEncodedString, OptionVerificationType::kNormal, |
961 | | OptionTypeFlags::kNone}}, |
962 | | {"bytes_read", |
963 | | {offsetof(struct CompactionServiceResult, bytes_read), |
964 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
965 | | OptionTypeFlags::kNone}}, |
966 | | {"bytes_written", |
967 | | {offsetof(struct CompactionServiceResult, bytes_written), |
968 | | OptionType::kUInt64T, OptionVerificationType::kNormal, |
969 | | OptionTypeFlags::kNone}}, |
970 | | {"stats", OptionTypeInfo::Struct( |
971 | | "stats", &compaction_job_stats_type_info, |
972 | | offsetof(struct CompactionServiceResult, stats), |
973 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
974 | | {"internal_stats", |
975 | | OptionTypeInfo::Struct( |
976 | | "internal_stats", &compaction_internal_stats_type_info, |
977 | | offsetof(struct CompactionServiceResult, internal_stats), |
978 | | OptionVerificationType::kNormal, OptionTypeFlags::kNone)}, |
979 | | }; |
980 | | |
981 | | Status CompactionServiceInput::Read(const std::string& data_str, |
982 | 0 | CompactionServiceInput* obj) { |
983 | 0 | if (data_str.size() <= sizeof(BinaryFormatVersion)) { |
984 | 0 | return Status::InvalidArgument("Invalid CompactionServiceInput string"); |
985 | 0 | } |
986 | 0 | auto format_version = DecodeFixed32(data_str.data()); |
987 | 0 | if (format_version == kOptionsString) { |
988 | 0 | ConfigOptions cf; |
989 | 0 | cf.invoke_prepare_options = false; |
990 | 0 | cf.ignore_unknown_options = true; |
991 | 0 | return OptionTypeInfo::ParseType( |
992 | 0 | cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info, |
993 | 0 | obj); |
994 | 0 | } else { |
995 | 0 | return Status::NotSupported( |
996 | 0 | "Compaction Service Input data version not supported: " + |
997 | 0 | std::to_string(format_version)); |
998 | 0 | } |
999 | 0 | } |
1000 | | |
1001 | 0 | Status CompactionServiceInput::Write(std::string* output) { |
1002 | 0 | char buf[sizeof(BinaryFormatVersion)]; |
1003 | 0 | EncodeFixed32(buf, kOptionsString); |
1004 | 0 | output->append(buf, sizeof(BinaryFormatVersion)); |
1005 | 0 | ConfigOptions cf; |
1006 | 0 | cf.invoke_prepare_options = false; |
1007 | 0 | return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output); |
1008 | 0 | } |
1009 | | |
1010 | | Status CompactionServiceResult::Read(const std::string& data_str, |
1011 | 0 | CompactionServiceResult* obj) { |
1012 | 0 | if (data_str.size() <= sizeof(BinaryFormatVersion)) { |
1013 | 0 | return Status::InvalidArgument("Invalid CompactionServiceResult string"); |
1014 | 0 | } |
1015 | 0 | auto format_version = DecodeFixed32(data_str.data()); |
1016 | 0 | if (format_version == kOptionsString) { |
1017 | 0 | ConfigOptions cf; |
1018 | 0 | cf.invoke_prepare_options = false; |
1019 | 0 | cf.ignore_unknown_options = true; |
1020 | 0 | return OptionTypeInfo::ParseType( |
1021 | 0 | cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info, |
1022 | 0 | obj); |
1023 | 0 | } else { |
1024 | 0 | return Status::NotSupported( |
1025 | 0 | "Compaction Service Result data version not supported: " + |
1026 | 0 | std::to_string(format_version)); |
1027 | 0 | } |
1028 | 0 | } |
1029 | | |
1030 | 0 | Status CompactionServiceResult::Write(std::string* output) { |
1031 | 0 | char buf[sizeof(BinaryFormatVersion)]; |
1032 | 0 | EncodeFixed32(buf, kOptionsString); |
1033 | 0 | output->append(buf, sizeof(BinaryFormatVersion)); |
1034 | 0 | ConfigOptions cf; |
1035 | 0 | cf.invoke_prepare_options = false; |
1036 | 0 | return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output); |
1037 | 0 | } |
1038 | | |
1039 | | #ifndef NDEBUG |
1040 | | bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) { |
1041 | | std::string mismatch; |
1042 | | return TEST_Equals(other, &mismatch); |
1043 | | } |
1044 | | |
1045 | | bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other, |
1046 | | std::string* mismatch) { |
1047 | | ConfigOptions cf; |
1048 | | cf.invoke_prepare_options = false; |
1049 | | return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other, |
1050 | | mismatch); |
1051 | | } |
1052 | | |
1053 | | bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) { |
1054 | | std::string mismatch; |
1055 | | return TEST_Equals(other, &mismatch); |
1056 | | } |
1057 | | |
1058 | | bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other, |
1059 | | std::string* mismatch) { |
1060 | | ConfigOptions cf; |
1061 | | cf.invoke_prepare_options = false; |
1062 | | return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other, |
1063 | | mismatch); |
1064 | | } |
1065 | | #endif // NDEBUG |
1066 | | } // namespace ROCKSDB_NAMESPACE |