Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_service_job.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
//
7
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8
// Use of this source code is governed by a BSD-style license that can be
9
// found in the LICENSE file. See the AUTHORS file for names of contributors.
10
11
#include "db/compaction/compaction_job.h"
12
#include "db/compaction/compaction_state.h"
13
#include "logging/logging.h"
14
#include "monitoring/iostats_context_imp.h"
15
#include "monitoring/thread_status_util.h"
16
#include "options/options_helper.h"
17
#include "rocksdb/utilities/options_type.h"
18
19
namespace ROCKSDB_NAMESPACE {
20
class SubcompactionState;
21
22
CompactionServiceJobStatus
23
CompactionJob::ProcessKeyValueCompactionWithCompactionService(
24
0
    SubcompactionState* sub_compact) {
25
0
  assert(sub_compact);
26
0
  assert(sub_compact->compaction);
27
0
  assert(db_options_.compaction_service);
28
29
0
  const Compaction* compaction = sub_compact->compaction;
30
0
  CompactionServiceInput compaction_input;
31
0
  compaction_input.output_level = compaction->output_level();
32
0
  compaction_input.db_id = db_id_;
33
34
0
  const std::vector<CompactionInputFiles>& inputs =
35
0
      *(compact_->compaction->inputs());
36
0
  for (const auto& files_per_level : inputs) {
37
0
    for (const auto& file : files_per_level.files) {
38
0
      compaction_input.input_files.emplace_back(
39
0
          MakeTableFileName(file->fd.GetNumber()));
40
0
    }
41
0
  }
42
43
0
  compaction_input.cf_name = compaction->column_family_data()->GetName();
44
0
  compaction_input.snapshots = job_context_->snapshot_seqs;
45
0
  compaction_input.has_begin = sub_compact->start.has_value();
46
0
  compaction_input.begin =
47
0
      compaction_input.has_begin ? sub_compact->start->ToString() : "";
48
0
  compaction_input.has_end = sub_compact->end.has_value();
49
0
  compaction_input.end =
50
0
      compaction_input.has_end ? sub_compact->end->ToString() : "";
51
0
  compaction_input.options_file_number = options_file_number_;
52
53
0
  TEST_SYNC_POINT_CALLBACK(
54
0
      "CompactionServiceJob::ProcessKeyValueCompactionWithCompactionService",
55
0
      &compaction_input);
56
57
0
  std::string compaction_input_binary;
58
0
  Status s = compaction_input.Write(&compaction_input_binary);
59
0
  if (!s.ok()) {
60
0
    sub_compact->status = s;
61
0
    return CompactionServiceJobStatus::kFailure;
62
0
  }
63
64
0
  std::ostringstream input_files_oss;
65
0
  bool is_first_one = true;
66
0
  for (const auto& file : compaction_input.input_files) {
67
0
    input_files_oss << (is_first_one ? "" : ", ") << file;
68
0
    is_first_one = false;
69
0
  }
70
71
0
  ROCKS_LOG_INFO(
72
0
      db_options_.info_log,
73
0
      "[%s] [JOB %d] Starting remote compaction (output level: %d): %s",
74
0
      compaction->column_family_data()->GetName().c_str(), job_id_,
75
0
      compaction_input.output_level, input_files_oss.str().c_str());
76
0
  CompactionServiceJobInfo info(
77
0
      dbname_, db_id_, db_session_id_,
78
0
      compaction->column_family_data()->GetID(),
79
0
      compaction->column_family_data()->GetName(), GetCompactionId(sub_compact),
80
0
      thread_pri_, compaction->compaction_reason(),
81
0
      compaction->is_full_compaction(), compaction->is_manual_compaction(),
82
0
      compaction->bottommost_level(), compaction->start_level(),
83
0
      compaction->output_level());
84
85
0
  CompactionServiceScheduleResponse response =
86
0
      db_options_.compaction_service->Schedule(info, compaction_input_binary);
87
0
  switch (response.status) {
88
0
    case CompactionServiceJobStatus::kSuccess:
89
0
      break;
90
0
    case CompactionServiceJobStatus::kAborted:
91
0
      sub_compact->status =
92
0
          Status::Aborted("Scheduling a remote compaction job was aborted");
93
0
      ROCKS_LOG_WARN(
94
0
          db_options_.info_log,
95
0
          "[%s] [JOB %d] Remote compaction was aborted at Schedule()",
96
0
          compaction->column_family_data()->GetName().c_str(), job_id_);
97
0
      return response.status;
98
0
    case CompactionServiceJobStatus::kFailure:
99
0
      sub_compact->status = Status::Incomplete(
100
0
          "CompactionService failed to schedule a remote compaction job.");
101
0
      ROCKS_LOG_WARN(db_options_.info_log,
102
0
                     "[%s] [JOB %d] Remote compaction failed to start.",
103
0
                     compaction->column_family_data()->GetName().c_str(),
104
0
                     job_id_);
105
0
      return response.status;
106
0
    case CompactionServiceJobStatus::kUseLocal:
107
0
      ROCKS_LOG_INFO(
108
0
          db_options_.info_log,
109
0
          "[%s] [JOB %d] Remote compaction fallback to local by API (Schedule)",
110
0
          compaction->column_family_data()->GetName().c_str(), job_id_);
111
0
      return response.status;
112
0
    default:
113
0
      assert(false);  // unknown status
114
0
      break;
115
0
  }
116
117
0
  std::string debug_str_before_wait =
118
0
      compaction->input_version()->DebugString(/*hex=*/true);
119
120
  // TODO: Update CompactionService API to support abort and resume
121
  // functionality. Currently, remote compaction jobs cannot be aborted via
122
  // AbortAllCompactions() because the CompactionService interface lacks methods
123
  // to signal abort to remote workers and to properly resume after an abort.
124
  // The API needs to be extended with:
125
  // - A method to signal abort to running remote compaction jobs
126
  // - A method to resume/re-enable scheduling after an abort is lifted
127
128
0
  ROCKS_LOG_INFO(db_options_.info_log,
129
0
                 "[%s] [JOB %d] Waiting for remote compaction...",
130
0
                 compaction->column_family_data()->GetName().c_str(), job_id_);
131
0
  std::string compaction_result_binary;
132
0
  CompactionServiceJobStatus compaction_status =
133
0
      db_options_.compaction_service->Wait(response.scheduled_job_id,
134
0
                                           &compaction_result_binary);
135
136
0
  if (compaction_status != CompactionServiceJobStatus::kSuccess) {
137
0
    ROCKS_LOG_ERROR(
138
0
        db_options_.info_log,
139
0
        "[%s] [JOB %d] Wait() status is not kSuccess. "
140
0
        "\nDebugString Before Wait():\n%s"
141
0
        "\nDebugString After Wait():\n%s",
142
0
        compaction->column_family_data()->GetName().c_str(), job_id_,
143
0
        debug_str_before_wait.c_str(),
144
0
        compaction->input_version()->DebugString(/*hex=*/true).c_str());
145
0
  }
146
147
0
  if (compaction_status == CompactionServiceJobStatus::kUseLocal) {
148
0
    ROCKS_LOG_INFO(
149
0
        db_options_.info_log,
150
0
        "[%s] [JOB %d] Remote compaction fallback to local by API (Wait)",
151
0
        compaction->column_family_data()->GetName().c_str(), job_id_);
152
0
    return compaction_status;
153
0
  }
154
155
0
  if (compaction_status == CompactionServiceJobStatus::kAborted) {
156
0
    sub_compact->status =
157
0
        Status::Aborted("Waiting a remote compaction job was aborted");
158
0
    ROCKS_LOG_INFO(db_options_.info_log,
159
0
                   "[%s] [JOB %d] Remote compaction was aborted during Wait()",
160
0
                   compaction->column_family_data()->GetName().c_str(),
161
0
                   job_id_);
162
0
    return compaction_status;
163
0
  }
164
165
0
  CompactionServiceResult compaction_result;
166
0
  s = CompactionServiceResult::Read(compaction_result_binary,
167
0
                                    &compaction_result);
168
169
0
  if (compaction_status == CompactionServiceJobStatus::kFailure) {
170
0
    if (s.ok()) {
171
0
      if (compaction_result.status.ok()) {
172
0
        sub_compact->status = Status::Incomplete(
173
0
            "CompactionService failed to run the compaction job (even though "
174
0
            "the internal status is okay).");
175
0
      } else {
176
        // set the current sub compaction status with the status returned from
177
        // remote
178
0
        sub_compact->status = compaction_result.status;
179
0
      }
180
0
    } else {
181
0
      sub_compact->status = Status::Incomplete(
182
0
          "CompactionService failed to run the compaction job (and no valid "
183
0
          "result is returned).");
184
0
      compaction_result.status.PermitUncheckedError();
185
0
    }
186
0
    ROCKS_LOG_WARN(
187
0
        db_options_.info_log, "[%s] [JOB %d] Remote compaction failed.",
188
0
        compaction->column_family_data()->GetName().c_str(), job_id_);
189
0
    return compaction_status;
190
0
  }
191
192
  // CompactionServiceJobStatus::kSuccess was returned, but somehow we failed to
193
  // read the result. Consider this as an installation failure
194
0
  if (!s.ok()) {
195
0
    sub_compact->status = s;
196
0
    compaction_result.status.PermitUncheckedError();
197
0
    db_options_.compaction_service->OnInstallation(
198
0
        response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
199
0
    return CompactionServiceJobStatus::kFailure;
200
0
  }
201
0
  sub_compact->status = compaction_result.status;
202
203
0
  std::ostringstream output_files_oss;
204
0
  is_first_one = true;
205
0
  for (const auto& file : compaction_result.output_files) {
206
0
    output_files_oss << (is_first_one ? "" : ", ") << file.file_name;
207
0
    is_first_one = false;
208
0
  }
209
210
0
  ROCKS_LOG_INFO(
211
0
      db_options_.info_log,
212
0
      "[%s] [JOB %d] Received remote compaction result, output path: "
213
0
      "%s, files: %s",
214
0
      compaction->column_family_data()->GetName().c_str(), job_id_,
215
0
      compaction_result.output_path.c_str(), output_files_oss.str().c_str());
216
217
  // Installation Starts
218
0
  for (const auto& file : compaction_result.output_files) {
219
0
    uint64_t file_num = versions_->NewFileNumber();
220
0
    auto src_file = compaction_result.output_path + "/" + file.file_name;
221
0
    auto tgt_file = TableFileName(compaction->immutable_options().cf_paths,
222
0
                                  file_num, compaction->output_path_id());
223
0
    s = fs_->RenameFile(src_file, tgt_file, IOOptions(), nullptr);
224
0
    if (!s.ok()) {
225
0
      sub_compact->status = s;
226
0
      db_options_.compaction_service->OnInstallation(
227
0
          response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
228
0
      return CompactionServiceJobStatus::kFailure;
229
0
    }
230
231
0
    FileMetaData meta;
232
0
    uint64_t file_size = file.file_size;
233
234
    // TODO - Clean this up in the next release.
235
    // For backward compatibility - in case the remote worker does not populate
236
    // the file_size yet. If missing, continue to populate this from the file
237
    // system.
238
0
    if (file_size == 0) {
239
0
      s = fs_->GetFileSize(tgt_file, IOOptions(), &file_size, nullptr);
240
0
    }
241
242
0
    if (!s.ok()) {
243
0
      sub_compact->status = s;
244
0
      db_options_.compaction_service->OnInstallation(
245
0
          response.scheduled_job_id, CompactionServiceJobStatus::kFailure);
246
0
      return CompactionServiceJobStatus::kFailure;
247
0
    }
248
0
    assert(file_size > 0);
249
250
0
    meta.fd = FileDescriptor(file_num, compaction->output_path_id(), file_size,
251
0
                             file.smallest_seqno, file.largest_seqno);
252
0
    meta.smallest.DecodeFrom(file.smallest_internal_key);
253
0
    meta.largest.DecodeFrom(file.largest_internal_key);
254
0
    meta.oldest_ancester_time = file.oldest_ancester_time;
255
0
    meta.file_creation_time = file.file_creation_time;
256
0
    meta.epoch_number = file.epoch_number;
257
0
    meta.file_checksum = file.file_checksum;
258
0
    meta.file_checksum_func_name = file.file_checksum_func_name;
259
0
    meta.marked_for_compaction = file.marked_for_compaction;
260
0
    meta.unique_id = file.unique_id;
261
0
    meta.temperature = file.file_temperature;
262
0
    meta.tail_size =
263
0
        FileMetaData::CalculateTailSize(file_size, file.table_properties);
264
0
    auto cfd = compaction->column_family_data();
265
0
    CompactionOutputs* compaction_outputs =
266
0
        sub_compact->Outputs(file.is_proximal_level_output);
267
0
    assert(compaction_outputs);
268
0
    compaction_outputs->AddOutput(std::move(meta), cfd->internal_comparator(),
269
0
                                  false, true, file.paranoid_hash);
270
0
    compaction_outputs->UpdateTableProperties(file.table_properties);
271
0
  }
272
273
  // Set per-level stats
274
0
  auto compaction_output_stats =
275
0
      sub_compact->OutputStats(false /* is_proximal_level */);
276
0
  assert(compaction_output_stats);
277
0
  compaction_output_stats->Add(
278
0
      compaction_result.internal_stats.output_level_stats);
279
0
  if (compaction->SupportsPerKeyPlacement()) {
280
0
    compaction_output_stats =
281
0
        sub_compact->OutputStats(true /* is_proximal_level */);
282
0
    assert(compaction_output_stats);
283
0
    compaction_output_stats->Add(
284
0
        compaction_result.internal_stats.proximal_level_stats);
285
0
  }
286
287
  // Set job stats
288
0
  sub_compact->compaction_job_stats = compaction_result.stats;
289
290
0
  RecordTick(stats_, REMOTE_COMPACT_READ_BYTES, compaction_result.bytes_read);
291
0
  RecordTick(stats_, REMOTE_COMPACT_WRITE_BYTES,
292
0
             compaction_result.bytes_written);
293
0
  db_options_.compaction_service->OnInstallation(
294
0
      response.scheduled_job_id, CompactionServiceJobStatus::kSuccess);
295
0
  return CompactionServiceJobStatus::kSuccess;
296
0
}
297
298
std::string CompactionServiceCompactionJob::GetTableFileName(
299
0
    uint64_t file_number) {
300
0
  return MakeTableFileName(output_path_, file_number);
301
0
}
302
303
0
void CompactionServiceCompactionJob::RecordCompactionIOStats() {
304
0
  compaction_result_->bytes_read += IOSTATS(bytes_read);
305
0
  compaction_result_->bytes_written += IOSTATS(bytes_written);
306
0
  CompactionJob::RecordCompactionIOStats();
307
0
}
308
309
CompactionServiceCompactionJob::CompactionServiceCompactionJob(
310
    int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
311
    const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
312
    VersionSet* versions, const std::atomic<bool>* shutting_down,
313
    LogBuffer* log_buffer, FSDirectory* output_directory, Statistics* stats,
314
    InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler,
315
    JobContext* job_context, std::shared_ptr<Cache> table_cache,
316
    EventLogger* event_logger, const std::string& dbname,
317
    const std::shared_ptr<IOTracer>& io_tracer,
318
    const std::atomic<bool>& manual_compaction_canceled,
319
    const std::string& db_id, const std::string& db_session_id,
320
    std::string output_path,
321
    const CompactionServiceInput& compaction_service_input,
322
    CompactionServiceResult* compaction_service_result)
323
0
    : CompactionJob(
324
0
          job_id, compaction, db_options, mutable_db_options, file_options,
325
0
          versions, shutting_down, log_buffer, nullptr, output_directory,
326
0
          nullptr, stats, db_mutex, db_error_handler, job_context,
327
0
          std::move(table_cache), event_logger,
328
0
          compaction->mutable_cf_options().paranoid_file_checks,
329
0
          compaction->mutable_cf_options().report_bg_io_stats, dbname,
330
0
          &(compaction_service_result->stats), Env::Priority::USER, io_tracer,
331
0
          manual_compaction_canceled, CompactionJob::kCompactionAbortedFalse,
332
0
          db_id, db_session_id,
333
0
          compaction->column_family_data()->GetFullHistoryTsLow()),
334
0
      output_path_(std::move(output_path)),
335
0
      compaction_input_(compaction_service_input),
336
0
      compaction_result_(compaction_service_result) {}
337
338
void CompactionServiceCompactionJob::Prepare(
339
    const CompactionProgress& compaction_progress,
340
0
    log::Writer* compaction_progress_writer) {
341
0
  std::optional<Slice> begin;
342
0
  if (compaction_input_.has_begin) {
343
0
    begin = compaction_input_.begin;
344
0
  }
345
0
  std::optional<Slice> end;
346
0
  if (compaction_input_.has_end) {
347
0
    end = compaction_input_.end;
348
0
  }
349
0
  CompactionJob::Prepare(std::make_pair(begin, end), compaction_progress,
350
0
                         compaction_progress_writer);
351
0
}
352
353
0
Status CompactionServiceCompactionJob::Run() {
354
0
  AutoThreadOperationStageUpdater stage_updater(
355
0
      ThreadStatus::STAGE_COMPACTION_RUN);
356
357
0
  auto* c = compact_->compaction;
358
359
0
  log_buffer_->FlushBufferToLog();
360
0
  LogCompaction();
361
362
0
  compaction_result_->stats.Reset();
363
364
0
  const uint64_t start_micros = db_options_.clock->NowMicros();
365
0
  c->GetOrInitInputTableProperties();
366
367
  // Pick the only sub-compaction we should have
368
0
  assert(compact_->sub_compact_states.size() == 1);
369
0
  SubcompactionState* sub_compact = compact_->sub_compact_states.data();
370
371
0
  ProcessKeyValueCompaction(sub_compact);
372
373
0
  uint64_t elapsed_micros = db_options_.clock->NowMicros() - start_micros;
374
0
  internal_stats_.SetMicros(elapsed_micros);
375
0
  internal_stats_.AddCpuMicros(elapsed_micros);
376
377
0
  RecordTimeToHistogram(stats_, COMPACTION_TIME,
378
0
                        internal_stats_.output_level_stats.micros);
379
0
  RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
380
0
                        internal_stats_.output_level_stats.cpu_micros);
381
382
0
  Status status = sub_compact->status;
383
0
  IOStatus io_s = sub_compact->io_status;
384
385
0
  if (io_status_.ok()) {
386
0
    io_status_ = io_s;
387
0
  }
388
389
0
  if (status.ok()) {
390
0
    constexpr IODebugContext* dbg = nullptr;
391
392
0
    if (output_directory_) {
393
0
      io_s = output_directory_->FsyncWithDirOptions(IOOptions(), dbg,
394
0
                                                    DirFsyncOptions());
395
0
    }
396
0
  }
397
0
  if (io_status_.ok()) {
398
0
    io_status_ = io_s;
399
0
  }
400
0
  if (status.ok()) {
401
0
    status = io_s;
402
0
  }
403
404
0
  LogFlush(db_options_.info_log);
405
0
  compact_->status = status;
406
0
  compact_->status.PermitUncheckedError();
407
408
  // Build Compaction Job Stats
409
410
  // 1. Aggregate internal stats and job stats for all subcompactions
411
  // internal stats: sub_compact.proximal_level_outputs_.stats and
412
  //                 sub_compact.compaction_outputs_.stats into
413
  //                 internal_stats_.output_level_stats and
414
  //                 internal_stats_.proximal_level_stats
415
  // job-level stats: sub_compact.compaction_job_stats into compact.job_stats_
416
  //
417
  // For remote compaction, there's only one subcompaction.
418
0
  compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
419
420
  // 2. Update job-level output stats with the aggregated internal_stats_
421
  // Please note that input stats will be updated by primary host when all
422
  // subcompactions are finished
423
0
  UpdateCompactionJobOutputStatsFromInternalStats(status, internal_stats_);
424
  // and set fields that are not propagated as part of the update
425
0
  compaction_result_->stats.is_manual_compaction = c->is_manual_compaction();
426
0
  compaction_result_->stats.is_full_compaction = c->is_full_compaction();
427
0
  compaction_result_->stats.is_remote_compaction = true;
428
429
  // 3. Update IO Stats that are not part of the the update above
430
  // (bytes_read, bytes_written)
431
0
  RecordCompactionIOStats();
432
433
  // Build Output
434
0
  compaction_result_->internal_stats = internal_stats_;
435
0
  compaction_result_->output_level = compact_->compaction->output_level();
436
0
  compaction_result_->output_path = output_path_;
437
0
  if (status.ok()) {
438
0
    for (const auto& output_file : sub_compact->GetOutputs()) {
439
0
      auto& meta = output_file.meta;
440
0
      compaction_result_->output_files.emplace_back(
441
0
          MakeTableFileName(meta.fd.GetNumber()), meta.fd.GetFileSize(),
442
0
          meta.fd.smallest_seqno, meta.fd.largest_seqno,
443
0
          meta.smallest.Encode().ToString(), meta.largest.Encode().ToString(),
444
0
          meta.oldest_ancester_time, meta.file_creation_time, meta.epoch_number,
445
0
          meta.file_checksum, meta.file_checksum_func_name,
446
0
          output_file.validator.GetHash(), meta.marked_for_compaction,
447
0
          meta.unique_id, *output_file.table_properties,
448
0
          output_file.is_proximal_level, meta.temperature);
449
0
    }
450
0
  }
451
452
0
  TEST_SYNC_POINT_CALLBACK("CompactionServiceCompactionJob::Run:0",
453
0
                           &compaction_result_);
454
0
  return status;
455
0
}
456
457
0
void CompactionServiceCompactionJob::CleanupCompaction() {
458
0
  CompactionJob::CleanupCompaction();
459
0
}
460
461
// Internal binary format for the input and result data
462
enum BinaryFormatVersion : uint32_t {
463
  kOptionsString = 1,  // Use string format similar to Option string format
464
};
465
466
static std::unordered_map<std::string, OptionTypeInfo> cfd_type_info = {
467
    {"name",
468
     {offsetof(struct ColumnFamilyDescriptor, name), OptionType::kEncodedString,
469
      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
470
    {"options",
471
     {offsetof(struct ColumnFamilyDescriptor, options),
472
      OptionType::kConfigurable, OptionVerificationType::kNormal,
473
      OptionTypeFlags::kNone,
474
      [](const ConfigOptions& opts, const std::string& /*name*/,
475
0
         const std::string& value, void* addr) {
476
0
        auto cf_options = static_cast<ColumnFamilyOptions*>(addr);
477
0
        return GetColumnFamilyOptionsFromString(opts, ColumnFamilyOptions(),
478
0
                                                value, cf_options);
479
0
      },
480
      [](const ConfigOptions& opts, const std::string& /*name*/,
481
0
         const void* addr, std::string* value) {
482
0
        const auto cf_options = static_cast<const ColumnFamilyOptions*>(addr);
483
0
        std::string result;
484
0
        auto status =
485
0
            GetStringFromColumnFamilyOptions(opts, *cf_options, &result);
486
0
        *value = "{" + result + "}";
487
0
        return status;
488
0
      },
489
      [](const ConfigOptions& opts, const std::string& name, const void* addr1,
490
0
         const void* addr2, std::string* mismatch) {
491
0
        const auto this_one = static_cast<const ColumnFamilyOptions*>(addr1);
492
0
        const auto that_one = static_cast<const ColumnFamilyOptions*>(addr2);
493
0
        auto this_conf = CFOptionsAsConfigurable(*this_one);
494
0
        auto that_conf = CFOptionsAsConfigurable(*that_one);
495
0
        std::string mismatch_opt;
496
0
        bool result =
497
0
            this_conf->AreEquivalent(opts, that_conf.get(), &mismatch_opt);
498
0
        if (!result) {
499
0
          *mismatch = name + "." + mismatch_opt;
500
0
        }
501
0
        return result;
502
0
      }}},
503
};
504
505
static std::unordered_map<std::string, OptionTypeInfo> cs_input_type_info = {
506
    {"cf_name",
507
     {offsetof(struct CompactionServiceInput, cf_name),
508
      OptionType::kEncodedString}},
509
    {"snapshots", OptionTypeInfo::Vector<uint64_t>(
510
                      offsetof(struct CompactionServiceInput, snapshots),
511
                      OptionVerificationType::kNormal, OptionTypeFlags::kNone,
512
                      {0, OptionType::kUInt64T})},
513
    {"input_files", OptionTypeInfo::Vector<std::string>(
514
                        offsetof(struct CompactionServiceInput, input_files),
515
                        OptionVerificationType::kNormal, OptionTypeFlags::kNone,
516
                        {0, OptionType::kEncodedString})},
517
    {"output_level",
518
     {offsetof(struct CompactionServiceInput, output_level), OptionType::kInt,
519
      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
520
    {"db_id",
521
     {offsetof(struct CompactionServiceInput, db_id),
522
      OptionType::kEncodedString}},
523
    {"has_begin",
524
     {offsetof(struct CompactionServiceInput, has_begin), OptionType::kBoolean,
525
      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
526
    {"begin",
527
     {offsetof(struct CompactionServiceInput, begin),
528
      OptionType::kEncodedString, OptionVerificationType::kNormal,
529
      OptionTypeFlags::kNone}},
530
    {"has_end",
531
     {offsetof(struct CompactionServiceInput, has_end), OptionType::kBoolean,
532
      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
533
    {"end",
534
     {offsetof(struct CompactionServiceInput, end), OptionType::kEncodedString,
535
      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
536
    {"options_file_number",
537
     {offsetof(struct CompactionServiceInput, options_file_number),
538
      OptionType::kUInt64T, OptionVerificationType::kNormal,
539
      OptionTypeFlags::kNone}},
540
};
541
542
static std::unordered_map<std::string, OptionTypeInfo>
543
    cs_output_file_type_info = {
544
        {"file_name",
545
         {offsetof(struct CompactionServiceOutputFile, file_name),
546
          OptionType::kEncodedString, OptionVerificationType::kNormal,
547
          OptionTypeFlags::kNone}},
548
        {"file_size",
549
         {offsetof(struct CompactionServiceOutputFile, file_size),
550
          OptionType::kUInt64T, OptionVerificationType::kNormal,
551
          OptionTypeFlags::kNone}},
552
        {"smallest_seqno",
553
         {offsetof(struct CompactionServiceOutputFile, smallest_seqno),
554
          OptionType::kUInt64T, OptionVerificationType::kNormal,
555
          OptionTypeFlags::kNone}},
556
        {"largest_seqno",
557
         {offsetof(struct CompactionServiceOutputFile, largest_seqno),
558
          OptionType::kUInt64T, OptionVerificationType::kNormal,
559
          OptionTypeFlags::kNone}},
560
        {"smallest_internal_key",
561
         {offsetof(struct CompactionServiceOutputFile, smallest_internal_key),
562
          OptionType::kEncodedString, OptionVerificationType::kNormal,
563
          OptionTypeFlags::kNone}},
564
        {"largest_internal_key",
565
         {offsetof(struct CompactionServiceOutputFile, largest_internal_key),
566
          OptionType::kEncodedString, OptionVerificationType::kNormal,
567
          OptionTypeFlags::kNone}},
568
        {"oldest_ancester_time",
569
         {offsetof(struct CompactionServiceOutputFile, oldest_ancester_time),
570
          OptionType::kUInt64T, OptionVerificationType::kNormal,
571
          OptionTypeFlags::kNone}},
572
        {"file_creation_time",
573
         {offsetof(struct CompactionServiceOutputFile, file_creation_time),
574
          OptionType::kUInt64T, OptionVerificationType::kNormal,
575
          OptionTypeFlags::kNone}},
576
        {"epoch_number",
577
         {offsetof(struct CompactionServiceOutputFile, epoch_number),
578
          OptionType::kUInt64T, OptionVerificationType::kNormal,
579
          OptionTypeFlags::kNone}},
580
        {"file_checksum",
581
         {offsetof(struct CompactionServiceOutputFile, file_checksum),
582
          OptionType::kEncodedString, OptionVerificationType::kNormal,
583
          OptionTypeFlags::kNone}},
584
        {"file_checksum_func_name",
585
         {offsetof(struct CompactionServiceOutputFile, file_checksum_func_name),
586
          OptionType::kEncodedString, OptionVerificationType::kNormal,
587
          OptionTypeFlags::kNone}},
588
        {"paranoid_hash",
589
         {offsetof(struct CompactionServiceOutputFile, paranoid_hash),
590
          OptionType::kUInt64T, OptionVerificationType::kNormal,
591
          OptionTypeFlags::kNone}},
592
        {"marked_for_compaction",
593
         {offsetof(struct CompactionServiceOutputFile, marked_for_compaction),
594
          OptionType::kBoolean, OptionVerificationType::kNormal,
595
          OptionTypeFlags::kNone}},
596
        {"unique_id",
597
         OptionTypeInfo::Array<uint64_t, 2>(
598
             offsetof(struct CompactionServiceOutputFile, unique_id),
599
             OptionVerificationType::kNormal, OptionTypeFlags::kNone,
600
             {0, OptionType::kUInt64T})},
601
        {"table_properties",
602
         {offsetof(struct CompactionServiceOutputFile, table_properties),
603
          OptionType::kStruct, OptionVerificationType::kNormal,
604
          OptionTypeFlags::kNone,
605
          [](const ConfigOptions& opts, const std::string& /*name*/,
606
0
             const std::string& value, void* addr) {
607
0
            auto table_properties = static_cast<TableProperties*>(addr);
608
0
            return TableProperties::Parse(opts, value, table_properties);
609
0
          },
610
          [](const ConfigOptions& opts, const std::string& /*name*/,
611
0
             const void* addr, std::string* value) {
612
0
            const auto table_properties =
613
0
                static_cast<const TableProperties*>(addr);
614
0
            std::string result;
615
0
            auto status = table_properties->Serialize(opts, &result);
616
0
            *value = "{" + result + "}";
617
0
            return status;
618
0
          },
619
          [](const ConfigOptions& opts, const std::string& /*name*/,
620
0
             const void* addr1, const void* addr2, std::string* mismatch) {
621
0
            const auto this_one = static_cast<const TableProperties*>(addr1);
622
0
            const auto that_one = static_cast<const TableProperties*>(addr2);
623
0
            return this_one->AreEqual(opts, that_one, mismatch);
624
0
          }}},
625
        {"is_proximal_level_output",
626
         {offsetof(struct CompactionServiceOutputFile,
627
                   is_proximal_level_output),
628
          OptionType::kBoolean, OptionVerificationType::kNormal,
629
          OptionTypeFlags::kNone}},
630
        {"file_temperature",
631
         {offsetof(struct CompactionServiceOutputFile, file_temperature),
632
          OptionType::kTemperature, OptionVerificationType::kNormal,
633
          OptionTypeFlags::kNone}}};
634
635
static std::unordered_map<std::string, OptionTypeInfo>
636
    compaction_job_stats_type_info = {
637
        {"elapsed_micros",
638
         {offsetof(struct CompactionJobStats, elapsed_micros),
639
          OptionType::kUInt64T, OptionVerificationType::kNormal,
640
          OptionTypeFlags::kNone}},
641
        {"cpu_micros",
642
         {offsetof(struct CompactionJobStats, cpu_micros), OptionType::kUInt64T,
643
          OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
644
        {"num_input_records",
645
         {offsetof(struct CompactionJobStats, num_input_records),
646
          OptionType::kUInt64T, OptionVerificationType::kNormal,
647
          OptionTypeFlags::kNone}},
648
        {"num_blobs_read",
649
         {offsetof(struct CompactionJobStats, num_blobs_read),
650
          OptionType::kUInt64T, OptionVerificationType::kNormal,
651
          OptionTypeFlags::kNone}},
652
        {"num_input_files",
653
         {offsetof(struct CompactionJobStats, num_input_files),
654
          OptionType::kSizeT, OptionVerificationType::kNormal,
655
          OptionTypeFlags::kNone}},
656
        {"num_input_files_at_output_level",
657
         {offsetof(struct CompactionJobStats, num_input_files_at_output_level),
658
          OptionType::kSizeT, OptionVerificationType::kNormal,
659
          OptionTypeFlags::kNone}},
660
        {"num_output_records",
661
         {offsetof(struct CompactionJobStats, num_output_records),
662
          OptionType::kUInt64T, OptionVerificationType::kNormal,
663
          OptionTypeFlags::kNone}},
664
        {"num_output_files",
665
         {offsetof(struct CompactionJobStats, num_output_files),
666
          OptionType::kSizeT, OptionVerificationType::kNormal,
667
          OptionTypeFlags::kNone}},
668
        {"num_output_files_blob",
669
         {offsetof(struct CompactionJobStats, num_output_files_blob),
670
          OptionType::kSizeT, OptionVerificationType::kNormal,
671
          OptionTypeFlags::kNone}},
672
        {"is_full_compaction",
673
         {offsetof(struct CompactionJobStats, is_full_compaction),
674
          OptionType::kBoolean, OptionVerificationType::kNormal,
675
          OptionTypeFlags::kNone}},
676
        {"is_manual_compaction",
677
         {offsetof(struct CompactionJobStats, is_manual_compaction),
678
          OptionType::kBoolean, OptionVerificationType::kNormal,
679
          OptionTypeFlags::kNone}},
680
        {"is_remote_compaction",
681
         {offsetof(struct CompactionJobStats, is_remote_compaction),
682
          OptionType::kBoolean, OptionVerificationType::kNormal,
683
          OptionTypeFlags::kNone}},
684
        {"total_input_bytes",
685
         {offsetof(struct CompactionJobStats, total_input_bytes),
686
          OptionType::kUInt64T, OptionVerificationType::kNormal,
687
          OptionTypeFlags::kNone}},
688
        {"total_blob_bytes_read",
689
         {offsetof(struct CompactionJobStats, total_blob_bytes_read),
690
          OptionType::kUInt64T, OptionVerificationType::kNormal,
691
          OptionTypeFlags::kNone}},
692
        {"total_output_bytes",
693
         {offsetof(struct CompactionJobStats, total_output_bytes),
694
          OptionType::kUInt64T, OptionVerificationType::kNormal,
695
          OptionTypeFlags::kNone}},
696
        {"total_output_bytes_blob",
697
         {offsetof(struct CompactionJobStats, total_output_bytes_blob),
698
          OptionType::kUInt64T, OptionVerificationType::kNormal,
699
          OptionTypeFlags::kNone}},
700
        {"num_records_replaced",
701
         {offsetof(struct CompactionJobStats, num_records_replaced),
702
          OptionType::kUInt64T, OptionVerificationType::kNormal,
703
          OptionTypeFlags::kNone}},
704
        {"total_input_raw_key_bytes",
705
         {offsetof(struct CompactionJobStats, total_input_raw_key_bytes),
706
          OptionType::kUInt64T, OptionVerificationType::kNormal,
707
          OptionTypeFlags::kNone}},
708
        {"total_input_raw_value_bytes",
709
         {offsetof(struct CompactionJobStats, total_input_raw_value_bytes),
710
          OptionType::kUInt64T, OptionVerificationType::kNormal,
711
          OptionTypeFlags::kNone}},
712
        {"num_input_deletion_records",
713
         {offsetof(struct CompactionJobStats, num_input_deletion_records),
714
          OptionType::kUInt64T, OptionVerificationType::kNormal,
715
          OptionTypeFlags::kNone}},
716
        {"num_expired_deletion_records",
717
         {offsetof(struct CompactionJobStats, num_expired_deletion_records),
718
          OptionType::kUInt64T, OptionVerificationType::kNormal,
719
          OptionTypeFlags::kNone}},
720
        {"num_corrupt_keys",
721
         {offsetof(struct CompactionJobStats, num_corrupt_keys),
722
          OptionType::kUInt64T, OptionVerificationType::kNormal,
723
          OptionTypeFlags::kNone}},
724
        {"file_write_nanos",
725
         {offsetof(struct CompactionJobStats, file_write_nanos),
726
          OptionType::kUInt64T, OptionVerificationType::kNormal,
727
          OptionTypeFlags::kNone}},
728
        {"file_range_sync_nanos",
729
         {offsetof(struct CompactionJobStats, file_range_sync_nanos),
730
          OptionType::kUInt64T, OptionVerificationType::kNormal,
731
          OptionTypeFlags::kNone}},
732
        {"file_fsync_nanos",
733
         {offsetof(struct CompactionJobStats, file_fsync_nanos),
734
          OptionType::kUInt64T, OptionVerificationType::kNormal,
735
          OptionTypeFlags::kNone}},
736
        {"file_prepare_write_nanos",
737
         {offsetof(struct CompactionJobStats, file_prepare_write_nanos),
738
          OptionType::kUInt64T, OptionVerificationType::kNormal,
739
          OptionTypeFlags::kNone}},
740
        {"smallest_output_key_prefix",
741
         {offsetof(struct CompactionJobStats, smallest_output_key_prefix),
742
          OptionType::kEncodedString, OptionVerificationType::kNormal,
743
          OptionTypeFlags::kNone}},
744
        {"largest_output_key_prefix",
745
         {offsetof(struct CompactionJobStats, largest_output_key_prefix),
746
          OptionType::kEncodedString, OptionVerificationType::kNormal,
747
          OptionTypeFlags::kNone}},
748
        {"num_single_del_fallthru",
749
         {offsetof(struct CompactionJobStats, num_single_del_fallthru),
750
          OptionType::kUInt64T, OptionVerificationType::kNormal,
751
          OptionTypeFlags::kNone}},
752
        {"num_single_del_mismatch",
753
         {offsetof(struct CompactionJobStats, num_single_del_mismatch),
754
          OptionType::kUInt64T, OptionVerificationType::kNormal,
755
          OptionTypeFlags::kNone}},
756
};
757
758
static std::unordered_map<std::string, OptionTypeInfo>
759
    compaction_stats_type_info = {
760
        {"micros",
761
         {offsetof(struct InternalStats::CompactionStats, micros),
762
          OptionType::kUInt64T, OptionVerificationType::kNormal,
763
          OptionTypeFlags::kNone}},
764
        {"cpu_micros",
765
         {offsetof(struct InternalStats::CompactionStats, cpu_micros),
766
          OptionType::kUInt64T, OptionVerificationType::kNormal,
767
          OptionTypeFlags::kNone}},
768
        {"bytes_read_non_output_levels",
769
         {offsetof(struct InternalStats::CompactionStats,
770
                   bytes_read_non_output_levels),
771
          OptionType::kUInt64T, OptionVerificationType::kNormal,
772
          OptionTypeFlags::kNone}},
773
        {"bytes_read_output_level",
774
         {offsetof(struct InternalStats::CompactionStats,
775
                   bytes_read_output_level),
776
          OptionType::kUInt64T, OptionVerificationType::kNormal,
777
          OptionTypeFlags::kNone}},
778
        {"bytes_skipped_non_output_levels",
779
         {offsetof(struct InternalStats::CompactionStats,
780
                   bytes_skipped_non_output_levels),
781
          OptionType::kUInt64T, OptionVerificationType::kNormal,
782
          OptionTypeFlags::kNone}},
783
        {"bytes_skipped_output_level",
784
         {offsetof(struct InternalStats::CompactionStats,
785
                   bytes_skipped_output_level),
786
          OptionType::kUInt64T, OptionVerificationType::kNormal,
787
          OptionTypeFlags::kNone}},
788
        {"bytes_read_blob",
789
         {offsetof(struct InternalStats::CompactionStats, bytes_read_blob),
790
          OptionType::kUInt64T, OptionVerificationType::kNormal,
791
          OptionTypeFlags::kNone}},
792
        {"bytes_written",
793
         {offsetof(struct InternalStats::CompactionStats, bytes_written),
794
          OptionType::kUInt64T, OptionVerificationType::kNormal,
795
          OptionTypeFlags::kNone}},
796
        {"bytes_written_blob",
797
         {offsetof(struct InternalStats::CompactionStats, bytes_written_blob),
798
          OptionType::kUInt64T, OptionVerificationType::kNormal,
799
          OptionTypeFlags::kNone}},
800
        {"bytes_moved",
801
         {offsetof(struct InternalStats::CompactionStats, bytes_moved),
802
          OptionType::kUInt64T, OptionVerificationType::kNormal,
803
          OptionTypeFlags::kNone}},
804
        {"num_input_files_in_non_output_levels",
805
         {offsetof(struct InternalStats::CompactionStats,
806
                   num_input_files_in_non_output_levels),
807
          OptionType::kInt, OptionVerificationType::kNormal,
808
          OptionTypeFlags::kNone}},
809
        {"num_input_files_in_output_level",
810
         {offsetof(struct InternalStats::CompactionStats,
811
                   num_input_files_in_output_level),
812
          OptionType::kInt, OptionVerificationType::kNormal,
813
          OptionTypeFlags::kNone}},
814
        {"num_filtered_input_files_in_non_output_levels",
815
         {offsetof(struct InternalStats::CompactionStats,
816
                   num_filtered_input_files_in_non_output_levels),
817
          OptionType::kInt, OptionVerificationType::kNormal,
818
          OptionTypeFlags::kNone}},
819
        {"num_filtered_input_files_in_output_level",
820
         {offsetof(struct InternalStats::CompactionStats,
821
                   num_filtered_input_files_in_output_level),
822
          OptionType::kInt, OptionVerificationType::kNormal,
823
          OptionTypeFlags::kNone}},
824
        {"num_output_files",
825
         {offsetof(struct InternalStats::CompactionStats, num_output_files),
826
          OptionType::kInt, OptionVerificationType::kNormal,
827
          OptionTypeFlags::kNone}},
828
        {"num_output_files_blob",
829
         {offsetof(struct InternalStats::CompactionStats,
830
                   num_output_files_blob),
831
          OptionType::kInt, OptionVerificationType::kNormal,
832
          OptionTypeFlags::kNone}},
833
        {"num_input_records",
834
         {offsetof(struct InternalStats::CompactionStats, num_input_records),
835
          OptionType::kUInt64T, OptionVerificationType::kNormal,
836
          OptionTypeFlags::kNone}},
837
        {"num_dropped_records",
838
         {offsetof(struct InternalStats::CompactionStats, num_dropped_records),
839
          OptionType::kUInt64T, OptionVerificationType::kNormal,
840
          OptionTypeFlags::kNone}},
841
        {"num_output_records",
842
         {offsetof(struct InternalStats::CompactionStats, num_output_records),
843
          OptionType::kUInt64T, OptionVerificationType::kNormal,
844
          OptionTypeFlags::kNone}},
845
        {"count",
846
         {offsetof(struct InternalStats::CompactionStats, count),
847
          OptionType::kUInt64T, OptionVerificationType::kNormal,
848
          OptionTypeFlags::kNone}},
849
        {"counts", OptionTypeInfo::Array<
850
                       int, static_cast<int>(CompactionReason::kNumOfReasons)>(
851
                       offsetof(struct InternalStats::CompactionStats, counts),
852
                       OptionVerificationType::kNormal, OptionTypeFlags::kNone,
853
                       {0, OptionType::kInt})},
854
};
855
856
static std::unordered_map<std::string, OptionTypeInfo>
857
    compaction_internal_stats_type_info = {
858
        {"output_level_stats",
859
         OptionTypeInfo::Struct(
860
             "output_level_stats", &compaction_stats_type_info,
861
             offsetof(struct InternalStats::CompactionStatsFull,
862
                      output_level_stats),
863
             OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
864
        {"has_proximal_level_output",
865
         {offsetof(struct InternalStats::CompactionStatsFull,
866
                   has_proximal_level_output),
867
          OptionType::kBoolean, OptionVerificationType::kNormal,
868
          OptionTypeFlags::kNone}},
869
        {"proximal_level_stats",
870
         OptionTypeInfo::Struct(
871
             "proximal_level_stats", &compaction_stats_type_info,
872
             offsetof(struct InternalStats::CompactionStatsFull,
873
                      proximal_level_stats),
874
             OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
875
};
876
877
namespace {
878
// this is a helper struct to serialize and deserialize class Status, because
879
// Status's members are not public.
880
struct StatusSerializationAdapter {
881
  uint8_t code;
882
  uint8_t subcode;
883
  uint8_t severity;
884
  std::string message;
885
886
0
  StatusSerializationAdapter() = default;
887
0
  explicit StatusSerializationAdapter(const Status& s) {
888
0
    code = s.code();
889
0
    subcode = s.subcode();
890
0
    severity = s.severity();
891
0
    auto msg = s.getState();
892
0
    message = msg ? msg : "";
893
0
  }
894
895
0
  Status GetStatus() const {
896
0
    return Status{static_cast<Status::Code>(code),
897
0
                  static_cast<Status::SubCode>(subcode),
898
0
                  static_cast<Status::Severity>(severity), message};
899
0
  }
900
};
901
}  // namespace
902
903
static std::unordered_map<std::string, OptionTypeInfo>
904
    status_adapter_type_info = {
905
        {"code",
906
         {offsetof(struct StatusSerializationAdapter, code),
907
          OptionType::kUInt8T, OptionVerificationType::kNormal,
908
          OptionTypeFlags::kNone}},
909
        {"subcode",
910
         {offsetof(struct StatusSerializationAdapter, subcode),
911
          OptionType::kUInt8T, OptionVerificationType::kNormal,
912
          OptionTypeFlags::kNone}},
913
        {"severity",
914
         {offsetof(struct StatusSerializationAdapter, severity),
915
          OptionType::kUInt8T, OptionVerificationType::kNormal,
916
          OptionTypeFlags::kNone}},
917
        {"message",
918
         {offsetof(struct StatusSerializationAdapter, message),
919
          OptionType::kEncodedString, OptionVerificationType::kNormal,
920
          OptionTypeFlags::kNone}},
921
};
922
923
static std::unordered_map<std::string, OptionTypeInfo> cs_result_type_info = {
924
    {"status",
925
     {offsetof(struct CompactionServiceResult, status),
926
      OptionType::kCustomizable, OptionVerificationType::kNormal,
927
      OptionTypeFlags::kNone,
928
      [](const ConfigOptions& opts, const std::string& /*name*/,
929
0
         const std::string& value, void* addr) {
930
0
        auto status_obj = static_cast<Status*>(addr);
931
0
        StatusSerializationAdapter adapter;
932
0
        Status s = OptionTypeInfo::ParseType(
933
0
            opts, value, status_adapter_type_info, &adapter);
934
0
        *status_obj = adapter.GetStatus();
935
0
        return s;
936
0
      },
937
      [](const ConfigOptions& opts, const std::string& /*name*/,
938
0
         const void* addr, std::string* value) {
939
0
        const auto status_obj = static_cast<const Status*>(addr);
940
0
        StatusSerializationAdapter adapter(*status_obj);
941
0
        std::string result;
942
0
        Status s = OptionTypeInfo::SerializeType(opts, status_adapter_type_info,
943
0
                                                 &adapter, &result);
944
0
        *value = "{" + result + "}";
945
0
        return s;
946
0
      },
947
      [](const ConfigOptions& opts, const std::string& /*name*/,
948
0
         const void* addr1, const void* addr2, std::string* mismatch) {
949
0
        const auto status1 = static_cast<const Status*>(addr1);
950
0
        const auto status2 = static_cast<const Status*>(addr2);
951
952
0
        StatusSerializationAdapter adatper1(*status1);
953
0
        StatusSerializationAdapter adapter2(*status2);
954
0
        return OptionTypeInfo::TypesAreEqual(opts, status_adapter_type_info,
955
0
                                             &adatper1, &adapter2, mismatch);
956
0
      }}},
957
    {"output_files",
958
     OptionTypeInfo::Vector<CompactionServiceOutputFile>(
959
         offsetof(struct CompactionServiceResult, output_files),
960
         OptionVerificationType::kNormal, OptionTypeFlags::kNone,
961
         OptionTypeInfo::Struct("output_files", &cs_output_file_type_info, 0,
962
                                OptionVerificationType::kNormal,
963
                                OptionTypeFlags::kNone))},
964
    {"output_level",
965
     {offsetof(struct CompactionServiceResult, output_level), OptionType::kInt,
966
      OptionVerificationType::kNormal, OptionTypeFlags::kNone}},
967
    {"output_path",
968
     {offsetof(struct CompactionServiceResult, output_path),
969
      OptionType::kEncodedString, OptionVerificationType::kNormal,
970
      OptionTypeFlags::kNone}},
971
    {"bytes_read",
972
     {offsetof(struct CompactionServiceResult, bytes_read),
973
      OptionType::kUInt64T, OptionVerificationType::kNormal,
974
      OptionTypeFlags::kNone}},
975
    {"bytes_written",
976
     {offsetof(struct CompactionServiceResult, bytes_written),
977
      OptionType::kUInt64T, OptionVerificationType::kNormal,
978
      OptionTypeFlags::kNone}},
979
    {"stats", OptionTypeInfo::Struct(
980
                  "stats", &compaction_job_stats_type_info,
981
                  offsetof(struct CompactionServiceResult, stats),
982
                  OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
983
    {"internal_stats",
984
     OptionTypeInfo::Struct(
985
         "internal_stats", &compaction_internal_stats_type_info,
986
         offsetof(struct CompactionServiceResult, internal_stats),
987
         OptionVerificationType::kNormal, OptionTypeFlags::kNone)},
988
};
989
990
Status CompactionServiceInput::Read(const std::string& data_str,
991
0
                                    CompactionServiceInput* obj) {
992
0
  if (data_str.size() <= sizeof(BinaryFormatVersion)) {
993
0
    return Status::InvalidArgument("Invalid CompactionServiceInput string");
994
0
  }
995
0
  auto format_version = DecodeFixed32(data_str.data());
996
0
  if (format_version == kOptionsString) {
997
0
    ConfigOptions cf;
998
0
    cf.invoke_prepare_options = false;
999
0
    cf.ignore_unknown_options = true;
1000
0
    return OptionTypeInfo::ParseType(
1001
0
        cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_input_type_info,
1002
0
        obj);
1003
0
  } else {
1004
0
    return Status::NotSupported(
1005
0
        "Compaction Service Input data version not supported: " +
1006
0
        std::to_string(format_version));
1007
0
  }
1008
0
}
1009
1010
0
Status CompactionServiceInput::Write(std::string* output) {
1011
0
  char buf[sizeof(BinaryFormatVersion)];
1012
0
  EncodeFixed32(buf, kOptionsString);
1013
0
  output->append(buf, sizeof(BinaryFormatVersion));
1014
0
  ConfigOptions cf;
1015
0
  cf.invoke_prepare_options = false;
1016
0
  return OptionTypeInfo::SerializeType(cf, cs_input_type_info, this, output);
1017
0
}
1018
1019
Status CompactionServiceResult::Read(const std::string& data_str,
1020
0
                                     CompactionServiceResult* obj) {
1021
0
  if (data_str.size() <= sizeof(BinaryFormatVersion)) {
1022
0
    return Status::InvalidArgument("Invalid CompactionServiceResult string");
1023
0
  }
1024
0
  auto format_version = DecodeFixed32(data_str.data());
1025
0
  if (format_version == kOptionsString) {
1026
0
    ConfigOptions cf;
1027
0
    cf.invoke_prepare_options = false;
1028
0
    cf.ignore_unknown_options = true;
1029
0
    return OptionTypeInfo::ParseType(
1030
0
        cf, data_str.substr(sizeof(BinaryFormatVersion)), cs_result_type_info,
1031
0
        obj);
1032
0
  } else {
1033
0
    return Status::NotSupported(
1034
0
        "Compaction Service Result data version not supported: " +
1035
0
        std::to_string(format_version));
1036
0
  }
1037
0
}
1038
1039
0
Status CompactionServiceResult::Write(std::string* output) {
1040
0
  char buf[sizeof(BinaryFormatVersion)];
1041
0
  EncodeFixed32(buf, kOptionsString);
1042
0
  output->append(buf, sizeof(BinaryFormatVersion));
1043
0
  ConfigOptions cf;
1044
0
  cf.invoke_prepare_options = false;
1045
0
  return OptionTypeInfo::SerializeType(cf, cs_result_type_info, this, output);
1046
0
}
1047
1048
#ifndef NDEBUG
1049
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other) {
1050
  std::string mismatch;
1051
  return TEST_Equals(other, &mismatch);
1052
}
1053
1054
bool CompactionServiceResult::TEST_Equals(CompactionServiceResult* other,
1055
                                          std::string* mismatch) {
1056
  ConfigOptions cf;
1057
  cf.invoke_prepare_options = false;
1058
  return OptionTypeInfo::TypesAreEqual(cf, cs_result_type_info, this, other,
1059
                                       mismatch);
1060
}
1061
1062
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other) {
1063
  std::string mismatch;
1064
  return TEST_Equals(other, &mismatch);
1065
}
1066
1067
bool CompactionServiceInput::TEST_Equals(CompactionServiceInput* other,
1068
                                         std::string* mismatch) {
1069
  ConfigOptions cf;
1070
  cf.invoke_prepare_options = false;
1071
  return OptionTypeInfo::TypesAreEqual(cf, cs_input_type_info, this, other,
1072
                                       mismatch);
1073
}
1074
#endif  // NDEBUG
1075
}  // namespace ROCKSDB_NAMESPACE