Coverage Report

Created: 2026-05-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_job.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/compaction/compaction_job.h"
11
12
#include <algorithm>
13
#include <cinttypes>
14
#include <memory>
15
#include <optional>
16
#include <set>
17
#include <utility>
18
#include <vector>
19
20
#include "db/blob/blob_counting_iterator.h"
21
#include "db/blob/blob_file_addition.h"
22
#include "db/blob/blob_file_builder.h"
23
#include "db/builder.h"
24
#include "db/compaction/clipping_iterator.h"
25
#include "db/compaction/compaction_state.h"
26
#include "db/db_impl/db_impl.h"
27
#include "db/dbformat.h"
28
#include "db/error_handler.h"
29
#include "db/event_helpers.h"
30
#include "db/history_trimming_iterator.h"
31
#include "db/log_writer.h"
32
#include "db/merge_helper.h"
33
#include "db/range_del_aggregator.h"
34
#include "db/version_edit.h"
35
#include "db/version_set.h"
36
#include "file/file_util.h"
37
#include "file/filename.h"
38
#include "file/read_write_util.h"
39
#include "file/sst_file_manager_impl.h"
40
#include "file/writable_file_writer.h"
41
#include "logging/log_buffer.h"
42
#include "logging/logging.h"
43
#include "monitoring/iostats_context_imp.h"
44
#include "monitoring/thread_status_util.h"
45
#include "options/configurable_helper.h"
46
#include "options/options_helper.h"
47
#include "port/port.h"
48
#include "rocksdb/db.h"
49
#include "rocksdb/env.h"
50
#include "rocksdb/options.h"
51
#include "rocksdb/statistics.h"
52
#include "rocksdb/status.h"
53
#include "rocksdb/table.h"
54
#include "rocksdb/utilities/options_type.h"
55
#include "table/format.h"
56
#include "table/merging_iterator.h"
57
#include "table/meta_blocks.h"
58
#include "table/table_builder.h"
59
#include "table/unique_id_impl.h"
60
#include "test_util/sync_point.h"
61
#include "util/hash_containers.h"
62
#include "util/stop_watch.h"
63
64
namespace ROCKSDB_NAMESPACE {
65
66
4.85k
const char* GetCompactionReasonString(CompactionReason compaction_reason) {
67
4.85k
  switch (compaction_reason) {
68
0
    case CompactionReason::kUnknown:
69
0
      return "Unknown";
70
2.16k
    case CompactionReason::kLevelL0FilesNum:
71
2.16k
      return "LevelL0FilesNum";
72
0
    case CompactionReason::kLevelMaxLevelSize:
73
0
      return "LevelMaxLevelSize";
74
0
    case CompactionReason::kUniversalSizeAmplification:
75
0
      return "UniversalSizeAmplification";
76
0
    case CompactionReason::kUniversalSizeRatio:
77
0
      return "UniversalSizeRatio";
78
0
    case CompactionReason::kUniversalSortedRunNum:
79
0
      return "UniversalSortedRunNum";
80
0
    case CompactionReason::kFIFOMaxSize:
81
0
      return "FIFOMaxSize";
82
0
    case CompactionReason::kFIFOReduceNumFiles:
83
0
      return "FIFOReduceNumFiles";
84
0
    case CompactionReason::kFIFOTtl:
85
0
      return "FIFOTtl";
86
1.09k
    case CompactionReason::kManualCompaction:
87
1.09k
      return "ManualCompaction";
88
0
    case CompactionReason::kFilesMarkedForCompaction:
89
0
      return "FilesMarkedForCompaction";
90
1.59k
    case CompactionReason::kBottommostFiles:
91
1.59k
      return "BottommostFiles";
92
0
    case CompactionReason::kTtl:
93
0
      return "Ttl";
94
0
    case CompactionReason::kFlush:
95
0
      return "Flush";
96
0
    case CompactionReason::kExternalSstIngestion:
97
0
      return "ExternalSstIngestion";
98
0
    case CompactionReason::kPeriodicCompaction:
99
0
      return "PeriodicCompaction";
100
0
    case CompactionReason::kChangeTemperature:
101
0
      return "ChangeTemperature";
102
0
    case CompactionReason::kForcedBlobGC:
103
0
      return "ForcedBlobGC";
104
0
    case CompactionReason::kRoundRobinTtl:
105
0
      return "RoundRobinTtl";
106
0
    case CompactionReason::kRefitLevel:
107
0
      return "RefitLevel";
108
0
    case CompactionReason::kReadTriggered:
109
0
      return "ReadTriggered";
110
0
    case CompactionReason::kNumOfReasons:
111
      // fall through
112
0
    default:
113
0
      assert(false);
114
0
      return "Invalid";
115
4.85k
  }
116
4.85k
}
117
118
const char* GetCompactionProximalOutputRangeTypeString(
119
0
    Compaction::ProximalOutputRangeType range_type) {
120
0
  switch (range_type) {
121
0
    case Compaction::ProximalOutputRangeType::kNotSupported:
122
0
      return "NotSupported";
123
0
    case Compaction::ProximalOutputRangeType::kFullRange:
124
0
      return "FullRange";
125
0
    case Compaction::ProximalOutputRangeType::kNonLastRange:
126
0
      return "NonLastRange";
127
0
    case Compaction::ProximalOutputRangeType::kDisabled:
128
0
      return "Disabled";
129
0
    default:
130
0
      assert(false);
131
0
      return "Invalid";
132
0
  }
133
0
}
134
135
// Static constant for compaction abort flag - always false, used for
136
// compaction service jobs that don't support abort signaling
137
const std::atomic<int> CompactionJob::kCompactionAbortedFalse{0};
138
139
CompactionJob::CompactionJob(
140
    int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
141
    const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
142
    VersionSet* versions, const std::atomic<bool>* shutting_down,
143
    LogBuffer* log_buffer, FSDirectory* db_directory,
144
    FSDirectory* output_directory, FSDirectory* blob_output_directory,
145
    Statistics* stats, InstrumentedMutex* db_mutex,
146
    ErrorHandler* db_error_handler, JobContext* job_context,
147
    std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
148
    bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
149
    CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
150
    const std::shared_ptr<IOTracer>& io_tracer,
151
    const std::atomic<bool>& manual_compaction_canceled,
152
    const std::atomic<int>& compaction_aborted, const std::string& db_id,
153
    const std::string& db_session_id, std::string full_history_ts_low,
154
    std::string trim_ts, BlobFileCompletionCallback* blob_callback,
155
    int* bg_compaction_scheduled, int* bg_bottom_compaction_scheduled)
156
4.85k
    : compact_(new CompactionState(compaction)),
157
4.85k
      internal_stats_(compaction->compaction_reason(), 1),
158
4.85k
      db_options_(db_options),
159
4.85k
      mutable_db_options_copy_(mutable_db_options),
160
4.85k
      log_buffer_(log_buffer),
161
4.85k
      output_directory_(output_directory),
162
4.85k
      stats_(stats),
163
4.85k
      bottommost_level_(false),
164
4.85k
      write_hint_(Env::WLTH_NOT_SET),
165
4.85k
      job_stats_(compaction_job_stats),
166
4.85k
      job_id_(job_id),
167
4.85k
      dbname_(dbname),
168
4.85k
      db_id_(db_id),
169
4.85k
      db_session_id_(db_session_id),
170
4.85k
      file_options_(file_options),
171
4.85k
      env_(db_options.env),
172
4.85k
      io_tracer_(io_tracer),
173
4.85k
      fs_(db_options.fs, io_tracer),
174
      file_options_for_read_(
175
4.85k
          fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
176
4.85k
      versions_(versions),
177
4.85k
      shutting_down_(shutting_down),
178
4.85k
      manual_compaction_canceled_(manual_compaction_canceled),
179
4.85k
      compaction_aborted_(compaction_aborted),
180
4.85k
      db_directory_(db_directory),
181
4.85k
      blob_output_directory_(blob_output_directory),
182
4.85k
      db_mutex_(db_mutex),
183
4.85k
      db_error_handler_(db_error_handler),
184
      // job_context cannot be nullptr, but we will assert later in the body of
185
      // the constructor.
186
4.85k
      earliest_snapshot_(job_context
187
4.85k
                             ? job_context->GetEarliestSnapshotSequence()
188
4.85k
                             : kMaxSequenceNumber),
189
4.85k
      job_context_(job_context),
190
4.85k
      table_cache_(std::move(table_cache)),
191
4.85k
      event_logger_(event_logger),
192
4.85k
      paranoid_file_checks_(paranoid_file_checks),
193
4.85k
      measure_io_stats_(measure_io_stats),
194
4.85k
      thread_pri_(thread_pri),
195
4.85k
      full_history_ts_low_(std::move(full_history_ts_low)),
196
4.85k
      trim_ts_(std::move(trim_ts)),
197
4.85k
      blob_callback_(blob_callback),
198
4.85k
      extra_num_subcompaction_threads_reserved_(0),
199
4.85k
      bg_compaction_scheduled_(bg_compaction_scheduled),
200
4.85k
      bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
201
4.85k
  assert(job_stats_ != nullptr);
202
4.85k
  assert(log_buffer_ != nullptr);
203
4.85k
  assert(job_context);
204
4.85k
  assert(job_context->snapshot_context_initialized);
205
206
4.85k
  const auto* cfd = compact_->compaction->column_family_data();
207
4.85k
  ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking);
208
4.85k
  ThreadStatusUtil::SetColumnFamily(cfd);
209
4.85k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
210
4.85k
  ReportStartedCompaction(compaction);
211
4.85k
}
212
213
4.85k
CompactionJob::~CompactionJob() {
214
4.85k
  assert(compact_ == nullptr);
215
4.85k
  ThreadStatusUtil::ResetThreadStatus();
216
4.85k
}
217
218
4.85k
void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
219
4.85k
  ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
220
4.85k
                                               job_id_);
221
222
4.85k
  ThreadStatusUtil::SetThreadOperationProperty(
223
4.85k
      ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
224
4.85k
      (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
225
4.85k
          compact_->compaction->output_level());
226
227
  // In the current design, a CompactionJob is always created
228
  // for non-trivial compaction.
229
4.85k
  assert(compaction->IsTrivialMove() == false ||
230
4.85k
         compaction->is_manual_compaction() == true);
231
232
4.85k
  ThreadStatusUtil::SetThreadOperationProperty(
233
4.85k
      ThreadStatus::COMPACTION_PROP_FLAGS,
234
4.85k
      compaction->is_manual_compaction() +
235
4.85k
          (compaction->deletion_compaction() << 1));
236
4.85k
  auto total_input_bytes = compaction->CalculateTotalInputSize();
237
4.85k
  ThreadStatusUtil::SetThreadOperationProperty(
238
4.85k
      ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, total_input_bytes);
239
240
4.85k
  IOSTATS_RESET(bytes_written);
241
4.85k
  IOSTATS_RESET(bytes_read);
242
4.85k
  ThreadStatusUtil::SetThreadOperationProperty(
243
4.85k
      ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
244
4.85k
  ThreadStatusUtil::SetThreadOperationProperty(
245
4.85k
      ThreadStatus::COMPACTION_BYTES_READ, 0);
246
247
  // Set the thread operation after operation properties
248
  // to ensure GetThreadList() can always show them all together.
249
4.85k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
250
251
4.85k
  job_stats_->is_manual_compaction = compaction->is_manual_compaction();
252
4.85k
  job_stats_->is_full_compaction = compaction->is_full_compaction();
253
  // populate compaction stats num_input_files and total_num_of_bytes
254
4.85k
  size_t num_input_files = 0;
255
4.85k
  for (int input_level = 0;
256
12.4k
       input_level < static_cast<int>(compaction->num_input_levels());
257
7.57k
       ++input_level) {
258
7.57k
    const LevelFilesBrief* flevel = compaction->input_levels(input_level);
259
7.57k
    num_input_files += flevel->num_files;
260
7.57k
  }
261
4.85k
  job_stats_->CompactionJobStats::num_input_files = num_input_files;
262
4.85k
  job_stats_->total_input_bytes = total_input_bytes;
263
4.85k
}
264
265
void CompactionJob::Prepare(
266
    std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
267
        known_single_subcompact,
268
    const CompactionProgress& compaction_progress,
269
4.85k
    log::Writer* compaction_progress_writer) {
270
4.85k
  db_mutex_->AssertHeld();
271
4.85k
  AutoThreadOperationStageUpdater stage_updater(
272
4.85k
      ThreadStatus::STAGE_COMPACTION_PREPARE);
273
274
  // Generate file_levels_ for compaction before making Iterator
275
4.85k
  auto* c = compact_->compaction;
276
4.85k
  [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data();
277
4.85k
  assert(cfd != nullptr);
278
4.85k
  const VersionStorageInfo* storage_info = c->input_version()->storage_info();
279
4.85k
  assert(storage_info);
280
4.85k
  assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
281
282
4.85k
  write_hint_ = storage_info->CalculateSSTWriteHint(
283
4.85k
      c->output_level(), db_options_.calculate_sst_write_lifetime_hint_set);
284
4.85k
  bottommost_level_ = c->bottommost_level();
285
286
4.85k
  if (!known_single_subcompact.has_value() && c->ShouldFormSubcompactions()) {
287
0
    StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
288
0
    GenSubcompactionBoundaries();
289
0
  }
290
4.85k
  if (boundaries_.size() >= 1) {
291
0
    assert(!known_single_subcompact.has_value());
292
0
    for (size_t i = 0; i <= boundaries_.size(); i++) {
293
0
      compact_->sub_compact_states.emplace_back(
294
0
          c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
295
0
          (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
296
0
                                    : std::nullopt,
297
0
          static_cast<uint32_t>(i));
298
      // assert to validate that boundaries don't have same user keys (without
299
      // timestamp part).
300
0
      assert(i == 0 || i == boundaries_.size() ||
301
0
             cfd->user_comparator()->CompareWithoutTimestamp(
302
0
                 boundaries_[i - 1], boundaries_[i]) < 0);
303
0
    }
304
0
    RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
305
0
                      compact_->sub_compact_states.size());
306
4.85k
  } else {
307
4.85k
    std::optional<Slice> start_key;
308
4.85k
    std::optional<Slice> end_key;
309
4.85k
    if (known_single_subcompact.has_value()) {
310
0
      start_key = known_single_subcompact.value().first;
311
0
      end_key = known_single_subcompact.value().second;
312
4.85k
    } else {
313
4.85k
      assert(!start_key.has_value() && !end_key.has_value());
314
4.85k
    }
315
4.85k
    compact_->sub_compact_states.emplace_back(c, start_key, end_key,
316
4.85k
                                              /*sub_job_id*/ 0);
317
4.85k
  }
318
319
4.85k
  MaybeAssignCompactionProgressAndWriter(compaction_progress,
320
4.85k
                                         compaction_progress_writer);
321
322
  // collect all seqno->time information from the input files which will be used
323
  // to encode seqno->time to the output files.
324
4.85k
  SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber;
325
4.85k
  SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber;
326
4.85k
  uint64_t preserve_time_duration =
327
4.85k
      MinAndMaxPreserveSeconds(c->mutable_cf_options()).max_preserve_seconds;
328
329
4.85k
  if (preserve_time_duration > 0) {
330
0
    const ReadOptions read_options(Env::IOActivity::kCompaction);
331
    // Setup seqno_to_time_mapping_ with relevant time range.
332
0
    seqno_to_time_mapping_.SetMaxTimeSpan(preserve_time_duration);
333
0
    for (const auto& each_level : *c->inputs()) {
334
0
      for (const auto& fmd : each_level.files) {
335
0
        std::shared_ptr<const TableProperties> tp;
336
0
        Status s = c->input_version()->GetTableProperties(read_options, &tp,
337
0
                                                          fmd, nullptr);
338
0
        if (s.ok()) {
339
0
          s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
340
0
        }
341
0
        if (!s.ok()) {
342
0
          ROCKS_LOG_WARN(
343
0
              db_options_.info_log,
344
0
              "Problem reading or processing seqno-to-time mapping: %s",
345
0
              s.ToString().c_str());
346
0
        }
347
0
      }
348
0
    }
349
350
0
    int64_t _current_time = 0;
351
0
    Status s = db_options_.clock->GetCurrentTime(&_current_time);
352
0
    if (!s.ok()) {
353
0
      ROCKS_LOG_WARN(db_options_.info_log,
354
0
                     "Failed to get current time in compaction: Status: %s",
355
0
                     s.ToString().c_str());
356
      // preserve all time information
357
0
      preserve_time_min_seqno = 0;
358
0
      preclude_last_level_min_seqno = 0;
359
0
      seqno_to_time_mapping_.Enforce();
360
0
    } else {
361
0
      seqno_to_time_mapping_.Enforce(_current_time);
362
0
      seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos(
363
0
          static_cast<uint64_t>(_current_time),
364
0
          c->mutable_cf_options().preserve_internal_time_seconds,
365
0
          c->mutable_cf_options().preclude_last_level_data_seconds,
366
0
          &preserve_time_min_seqno, &preclude_last_level_min_seqno);
367
0
    }
368
    // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only
369
    // limit the capacity after them.
370
    // Here If we set capacity to the per-SST limit, we could be throwing away
371
    // fidelity when a compaction output file has a narrower seqno range than
372
    // all the inputs. If we only limit capacity for each compaction output, we
373
    // could be doing a lot of unnecessary recomputation in a large compaction
374
    // (up to quadratic in number of files). Thus, we do soemthing in the
375
    // middle: enforce a resonably large constant size limit substantially
376
    // larger than kMaxSeqnoTimePairsPerSST.
377
0
    seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries);
378
0
  }
379
#ifndef NDEBUG
380
  assert(preserve_time_min_seqno <= preclude_last_level_min_seqno);
381
  TEST_SYNC_POINT_CALLBACK(
382
      "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
383
      static_cast<void*>(&preclude_last_level_min_seqno));
384
  // Restore the invariant asserted above, in case it was broken under the
385
  // callback
386
  preserve_time_min_seqno =
387
      std::min(preclude_last_level_min_seqno, preserve_time_min_seqno);
388
#endif
389
390
  // Preserve sequence numbers for preserved write times and snapshots, though
391
  // the specific sequence number of the earliest snapshot can be zeroed.
392
4.85k
  preserve_seqno_after_ =
393
4.85k
      std::max(preserve_time_min_seqno, SequenceNumber{1}) - 1;
394
4.85k
  preserve_seqno_after_ = std::min(preserve_seqno_after_, earliest_snapshot_);
395
  // If using preclude feature, also preclude snapshots from last level, just
396
  // because they are heuristically more likely to be accessed than non-snapshot
397
  // data.
398
4.85k
  if (preclude_last_level_min_seqno < kMaxSequenceNumber &&
399
0
      earliest_snapshot_ < preclude_last_level_min_seqno) {
400
0
    preclude_last_level_min_seqno = earliest_snapshot_;
401
0
  }
402
  // Now combine what we would like to preclude from last level with what we
403
  // can safely support without dangerously moving data back up the LSM tree,
404
  // to get the final seqno threshold for proximal vs. last. In particular,
405
  // when the reserved output key range for the proximal level does not
406
  // include the entire last level input key range, we need to keep entries
407
  // already in the last level there. (Even allowing within-range entries to
408
  // move back up could cause problems with range tombstones. Perhaps it
409
  // would be better in some rare cases to keep entries in the last level
410
  // one-by-one rather than based on sequence number, but that would add extra
411
  // tracking and complexity to CompactionIterator that is probably not
412
  // worthwhile overall. Correctness is also more clear when splitting by
413
  // seqno threshold.)
414
4.85k
  proximal_after_seqno_ = std::max(preclude_last_level_min_seqno,
415
4.85k
                                   c->GetKeepInLastLevelThroughSeqno());
416
417
4.85k
  options_file_number_ = versions_->options_file_number();
418
4.85k
}
419
420
void CompactionJob::MaybeAssignCompactionProgressAndWriter(
421
    const CompactionProgress& compaction_progress,
422
4.85k
    log::Writer* compaction_progress_writer) {
423
  // LIMITATION: Only supports resuming single subcompaction for now
424
4.85k
  if (compact_->sub_compact_states.size() != 1) {
425
0
    return;
426
0
  }
427
428
4.85k
  if (!compaction_progress.empty()) {
429
0
    assert(compaction_progress.size() == 1);
430
0
    SubcompactionState* sub_compact = &compact_->sub_compact_states[0];
431
0
    const SubcompactionProgress& subcompaction_progress =
432
0
        compaction_progress[0];
433
0
    sub_compact->SetSubcompactionProgress(subcompaction_progress);
434
0
  }
435
436
4.85k
  compaction_progress_writer_ = compaction_progress_writer;
437
4.85k
}
438
439
0
uint64_t CompactionJob::GetSubcompactionsLimit() {
440
0
  return extra_num_subcompaction_threads_reserved_ +
441
0
         std::max(
442
0
             std::uint64_t(1),
443
0
             static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
444
0
}
445
446
void CompactionJob::AcquireSubcompactionResources(
447
0
    int num_extra_required_subcompactions) {
448
0
  TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
449
0
  TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
450
0
  int max_db_compactions =
451
0
      DBImpl::GetBGJobLimits(
452
0
          mutable_db_options_copy_.max_background_flushes,
453
0
          mutable_db_options_copy_.max_background_compactions,
454
0
          mutable_db_options_copy_.max_background_jobs,
455
0
          versions_->GetColumnFamilySet()
456
0
              ->write_controller()
457
0
              ->NeedSpeedupCompaction())
458
0
          .max_compactions;
459
0
  InstrumentedMutexLock l(db_mutex_);
460
  // Apply min function first since We need to compute the extra subcompaction
461
  // against compaction limits. And then try to reserve threads for extra
462
  // subcompactions. The actual number of reserved threads could be less than
463
  // the desired number.
464
0
  int available_bg_compactions_against_db_limit =
465
0
      std::max(max_db_compactions - *bg_compaction_scheduled_ -
466
0
                   *bg_bottom_compaction_scheduled_,
467
0
               0);
468
  // Reservation only supports backgrdoun threads of which the priority is
469
  // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
470
  // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
471
0
  extra_num_subcompaction_threads_reserved_ =
472
0
      env_->ReserveThreads(std::min(num_extra_required_subcompactions,
473
0
                                    available_bg_compactions_against_db_limit),
474
0
                           std::min(thread_pri_, Env::Priority::HIGH));
475
476
  // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
477
  // depending on if this compaction has the bottommost priority
478
0
  if (thread_pri_ == Env::Priority::BOTTOM) {
479
0
    *bg_bottom_compaction_scheduled_ +=
480
0
        extra_num_subcompaction_threads_reserved_;
481
0
  } else {
482
0
    *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
483
0
  }
484
0
}
485
486
0
void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
487
  // Do nothing when we have zero resources to shrink
488
0
  if (num_extra_resources == 0) {
489
0
    return;
490
0
  }
491
0
  db_mutex_->Lock();
492
  // We cannot release threads more than what we reserved before
493
0
  int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
494
0
      (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
495
  // Update the number of reserved threads and the number of background
496
  // scheduled compactions for this compaction job
497
0
  extra_num_subcompaction_threads_reserved_ -=
498
0
      extra_num_subcompaction_threads_released;
499
  // TODO (zichen): design a test case with new subcompaction partitioning
500
  // when the number of actual partitions is less than the number of planned
501
  // partitions
502
0
  assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
503
  // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
504
  // depending on if this compaction has the bottommost priority
505
0
  if (thread_pri_ == Env::Priority::BOTTOM) {
506
0
    *bg_bottom_compaction_scheduled_ -=
507
0
        extra_num_subcompaction_threads_released;
508
0
  } else {
509
0
    *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
510
0
  }
511
0
  db_mutex_->Unlock();
512
0
  TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
513
0
}
514
515
4.85k
void CompactionJob::ReleaseSubcompactionResources() {
516
4.85k
  if (extra_num_subcompaction_threads_reserved_ == 0) {
517
4.85k
    return;
518
4.85k
  }
519
0
  {
520
0
    InstrumentedMutexLock l(db_mutex_);
521
    // The number of reserved threads becomes larger than 0 only if the
522
    // compaction prioity is round robin and there is no sufficient
523
    // sub-compactions available
524
525
    // The scheduled compaction must be no less than 1 + extra number
526
    // subcompactions using acquired resources since this compaction job has not
527
    // finished yet
528
0
    assert(*bg_bottom_compaction_scheduled_ >=
529
0
               1 + extra_num_subcompaction_threads_reserved_ ||
530
0
           *bg_compaction_scheduled_ >=
531
0
               1 + extra_num_subcompaction_threads_reserved_);
532
0
  }
533
0
  ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
534
0
}
535
536
0
void CompactionJob::GenSubcompactionBoundaries() {
537
  // The goal is to find some boundary keys so that we can evenly partition
538
  // the compaction input data into max_subcompactions ranges.
539
  // For every input file, we ask TableReader to estimate 128 anchor points
540
  // that evenly partition the input file into 128 ranges and the range
541
  // sizes. This can be calculated by scanning index blocks of the file.
542
  // Once we have the anchor points for all the input files, we merge them
543
  // together and try to find keys dividing ranges evenly.
544
  // For example, if we have two input files, and each returns following
545
  // ranges:
546
  //   File1: (a1, 1000), (b1, 1200), (c1, 1100)
547
  //   File2: (a2, 1100), (b2, 1000), (c2, 1000)
548
  // We total sort the keys to following:
549
  //  (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
550
  // We calculate the total size by adding up all ranges' size, which is 6400.
551
  // If we would like to partition into 2 subcompactions, the target of the
552
  // range size is 3200. Based on the size, we take "b1" as the partition key
553
  // since the first three ranges would hit 3200.
554
  //
555
  // Note that the ranges are actually overlapping. For example, in the example
556
  // above, the range ending with "b1" is overlapping with the range ending with
557
  // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
558
  // "b1". In extreme cases where we only compact N L0 files, a range can
559
  // overlap with N-1 other ranges. Since we requested a relatively large number
560
  // (128) of ranges from each input files, even N range overlapping would
561
  // cause relatively small inaccuracy.
562
0
  ReadOptions read_options(Env::IOActivity::kCompaction);
563
0
  read_options.rate_limiter_priority = GetRateLimiterPriority();
564
0
  auto* c = compact_->compaction;
565
0
  if (c->mutable_cf_options().table_factory->Name() ==
566
0
      TableFactory::kPlainTableName()) {
567
0
    return;
568
0
  }
569
570
0
  if (c->max_subcompactions() <= 1 &&
571
0
      !(c->immutable_options().compaction_pri == kRoundRobin &&
572
0
        c->immutable_options().compaction_style == kCompactionStyleLevel)) {
573
0
    return;
574
0
  }
575
0
  auto* cfd = c->column_family_data();
576
0
  const Comparator* cfd_comparator = cfd->user_comparator();
577
0
  const InternalKeyComparator& icomp = cfd->internal_comparator();
578
579
0
  auto* v = compact_->compaction->input_version();
580
0
  int base_level = v->storage_info()->base_level();
581
0
  InstrumentedMutexUnlock unlock_guard(db_mutex_);
582
583
0
  uint64_t total_size = 0;
584
0
  std::vector<TableReader::Anchor> all_anchors;
585
0
  int start_lvl = c->start_level();
586
0
  int out_lvl = c->output_level();
587
588
0
  for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
589
0
    int lvl = c->level(lvl_idx);
590
0
    if (lvl >= start_lvl && lvl <= out_lvl) {
591
0
      const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
592
0
      size_t num_files = flevel->num_files;
593
594
0
      if (num_files == 0) {
595
0
        continue;
596
0
      }
597
598
0
      for (size_t i = 0; i < num_files; i++) {
599
0
        FileMetaData* f = flevel->files[i].file_metadata;
600
0
        std::vector<TableReader::Anchor> my_anchors;
601
0
        Status s = cfd->table_cache()->ApproximateKeyAnchors(
602
0
            read_options, icomp, *f, c->mutable_cf_options(), my_anchors);
603
0
        if (!s.ok() || my_anchors.empty()) {
604
0
          my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
605
0
        }
606
0
        for (auto& ac : my_anchors) {
607
          // Can be optimize to avoid this loop.
608
0
          total_size += ac.range_size;
609
0
        }
610
611
0
        all_anchors.insert(all_anchors.end(), my_anchors.begin(),
612
0
                           my_anchors.end());
613
0
      }
614
0
    }
615
0
  }
616
  // Here we total sort all the anchor points across all files and go through
617
  // them in the sorted order to find partitioning boundaries.
618
  // Not the most efficient implementation. A much more efficient algorithm
619
  // probably exists. But they are more complex. If performance turns out to
620
  // be a problem, we can optimize.
621
0
  std::sort(
622
0
      all_anchors.begin(), all_anchors.end(),
623
0
      [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
624
0
        return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
625
0
               0;
626
0
      });
627
628
  // Remove duplicated entries from boundaries.
629
0
  all_anchors.erase(
630
0
      std::unique(all_anchors.begin(), all_anchors.end(),
631
0
                  [cfd_comparator](TableReader::Anchor& a,
632
0
                                   TableReader::Anchor& b) -> bool {
633
0
                    return cfd_comparator->CompareWithoutTimestamp(
634
0
                               a.user_key, b.user_key) == 0;
635
0
                  }),
636
0
      all_anchors.end());
637
638
  // Get the number of planned subcompactions, may update reserve threads
639
  // and update extra_num_subcompaction_threads_reserved_ for round-robin
640
0
  uint64_t num_planned_subcompactions;
641
0
  if (c->immutable_options().compaction_pri == kRoundRobin &&
642
0
      c->immutable_options().compaction_style == kCompactionStyleLevel) {
643
    // For round-robin compaction prioity, we need to employ more
644
    // subcompactions (may exceed the max_subcompaction limit). The extra
645
    // subcompactions will be executed using reserved threads and taken into
646
    // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
647
648
    // Initialized by the number of input files
649
0
    num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
650
0
    uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
651
0
    if (max_subcompactions_limit < num_planned_subcompactions) {
652
      // Assert two pointers are not empty so that we can use extra
653
      // subcompactions against db compaction limits
654
0
      assert(bg_bottom_compaction_scheduled_ != nullptr);
655
0
      assert(bg_compaction_scheduled_ != nullptr);
656
      // Reserve resources when max_subcompaction is not sufficient
657
0
      AcquireSubcompactionResources(
658
0
          (int)(num_planned_subcompactions - max_subcompactions_limit));
659
      // Subcompactions limit changes after acquiring additional resources.
660
      // Need to call GetSubcompactionsLimit() again to update the number
661
      // of planned subcompactions
662
0
      num_planned_subcompactions =
663
0
          std::min(num_planned_subcompactions, GetSubcompactionsLimit());
664
0
    } else {
665
0
      num_planned_subcompactions = max_subcompactions_limit;
666
0
    }
667
0
  } else {
668
0
    num_planned_subcompactions = GetSubcompactionsLimit();
669
0
  }
670
671
0
  TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
672
0
                           &num_planned_subcompactions);
673
0
  if (num_planned_subcompactions == 1) {
674
0
    return;
675
0
  }
676
677
  // Group the ranges into subcompactions
678
0
  uint64_t target_range_size = std::max(
679
0
      total_size / num_planned_subcompactions,
680
0
      MaxFileSizeForLevel(
681
0
          c->mutable_cf_options(), out_lvl,
682
0
          c->immutable_options().compaction_style, base_level,
683
0
          c->immutable_options().level_compaction_dynamic_level_bytes));
684
685
0
  if (target_range_size >= total_size) {
686
0
    return;
687
0
  }
688
689
0
  uint64_t next_threshold = target_range_size;
690
0
  uint64_t cumulative_size = 0;
691
0
  uint64_t num_actual_subcompactions = 1U;
692
0
  for (TableReader::Anchor& anchor : all_anchors) {
693
0
    cumulative_size += anchor.range_size;
694
0
    if (cumulative_size > next_threshold) {
695
0
      next_threshold += target_range_size;
696
0
      num_actual_subcompactions++;
697
0
      boundaries_.push_back(anchor.user_key);
698
0
    }
699
0
    if (num_actual_subcompactions == num_planned_subcompactions) {
700
0
      break;
701
0
    }
702
0
  }
703
0
  TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
704
0
                           &num_actual_subcompactions);
705
  // Shrink extra subcompactions resources when extra resrouces are acquired
706
0
  ShrinkSubcompactionResources(
707
0
      std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
708
0
               extra_num_subcompaction_threads_reserved_));
709
0
}
710
711
4.85k
void CompactionJob::InitializeCompactionRun() {
712
4.85k
  AutoThreadOperationStageUpdater stage_updater(
713
4.85k
      ThreadStatus::STAGE_COMPACTION_RUN);
714
4.85k
  TEST_SYNC_POINT("CompactionJob::Run():Start");
715
4.85k
  log_buffer_->FlushBufferToLog();
716
4.85k
  LogCompaction();
717
4.85k
}
718
719
4.85k
void CompactionJob::RunSubcompactions() {
720
4.85k
  TEST_SYNC_POINT("CompactionJob::RunSubcompactions:BeforeStart");
721
4.85k
  const size_t num_threads = compact_->sub_compact_states.size();
722
4.85k
  assert(num_threads > 0);
723
4.85k
  compact_->compaction->GetOrInitInputTableProperties();
724
725
  // Launch a thread for each of subcompactions 1...num_threads-1
726
4.85k
  std::vector<port::Thread> thread_pool;
727
4.85k
  thread_pool.reserve(num_threads - 1);
728
4.85k
  for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
729
0
    thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
730
0
                             &compact_->sub_compact_states[i]);
731
0
  }
732
733
  // Always schedule the first subcompaction (whether or not there are also
734
  // others) in the current thread to be efficient with resources
735
4.85k
  ProcessKeyValueCompaction(compact_->sub_compact_states.data());
736
737
  // Wait for all other threads (if there are any) to finish execution
738
4.85k
  for (auto& thread : thread_pool) {
739
0
    thread.join();
740
0
  }
741
4.85k
  RemoveEmptyOutputs();
742
743
4.85k
  ReleaseSubcompactionResources();
744
4.85k
  TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources");
745
4.85k
}
746
747
4.85k
void CompactionJob::UpdateTimingStats(uint64_t start_micros) {
748
4.85k
  internal_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
749
750
4.85k
  for (auto& state : compact_->sub_compact_states) {
751
4.85k
    internal_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
752
4.85k
  }
753
754
4.85k
  RecordTimeToHistogram(stats_, COMPACTION_TIME,
755
4.85k
                        internal_stats_.output_level_stats.micros);
756
4.85k
  RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
757
4.85k
                        internal_stats_.output_level_stats.cpu_micros);
758
4.85k
}
759
760
4.85k
void CompactionJob::RemoveEmptyOutputs() {
761
4.85k
  for (auto& state : compact_->sub_compact_states) {
762
4.85k
    state.RemoveLastEmptyOutput();
763
4.85k
  }
764
4.85k
}
765
766
0
void CompactionJob::CleanupAbortedSubcompactions() {
767
0
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
768
769
0
  uint64_t total_sst_files_deleted = 0;
770
0
  uint64_t total_blob_files_deleted = 0;
771
772
  // Track the first file deletion error to report at the end
773
0
  Status first_error;
774
0
  int deletion_errors = 0;
775
776
  // Mark all subcompactions as aborted and delete their output files
777
0
  for (auto& sub_compact : compact_->sub_compact_states) {
778
    // Mark this subcompaction as aborted
779
0
    sub_compact.status =
780
0
        Status::Incomplete(Status::SubCode::kCompactionAborted);
781
782
    // Delete all files (SST and blob) tracked during compaction.
783
    // GetOutputFilePaths() contains ALL file paths created, including
784
    // in-progress files that may have been removed from outputs_ or
785
    // blob_file_additions_.
786
0
    for (const bool is_proximal_level : {false, true}) {
787
0
      if (is_proximal_level &&
788
0
          !compact_->compaction->SupportsPerKeyPlacement()) {
789
0
        continue;
790
0
      }
791
0
      for (const std::string& file_path :
792
0
           sub_compact.Outputs(is_proximal_level)->GetOutputFilePaths()) {
793
0
        Status s = env_->DeleteFile(file_path);
794
0
        if (s.ok()) {
795
          // Count SST vs blob files by checking extension
796
0
          if (file_path.find(".sst") != std::string::npos) {
797
0
            total_sst_files_deleted++;
798
0
          } else if (file_path.find(".blob") != std::string::npos) {
799
0
            total_blob_files_deleted++;
800
0
          }
801
0
        } else if (!s.IsNotFound()) {
802
0
          if (first_error.ok()) {
803
0
            first_error = s;
804
0
          }
805
0
          deletion_errors++;
806
0
        }
807
0
      }
808
0
    }
809
0
    sub_compact.CleanupOutputs();
810
0
  }
811
812
0
  if (stats_) {
813
0
    RecordTick(stats_, COMPACTION_ABORTED);
814
0
  }
815
816
0
  ROCKS_LOG_INFO(db_options_.info_log,
817
0
                 "[%s] [JOB %d] Compaction aborted: deleted %" PRIu64
818
0
                 " SST files and %" PRIu64 " blob files",
819
0
                 cfd->GetName().c_str(), job_id_, total_sst_files_deleted,
820
0
                 total_blob_files_deleted);
821
822
0
  if (!first_error.ok()) {
823
0
    ROCKS_LOG_ERROR(db_options_.info_log,
824
0
                    "[%s] [JOB %d] Cleanup completed with %d file deletion "
825
0
                    "errors. First error: %s",
826
0
                    cfd->GetName().c_str(), job_id_, deletion_errors,
827
0
                    first_error.ToString().c_str());
828
0
  }
829
0
}
830
831
2.72k
bool CompactionJob::HasNewBlobFiles() const {
832
2.72k
  for (const auto& state : compact_->sub_compact_states) {
833
2.72k
    if (state.Current().HasBlobFileAdditions()) {
834
0
      return true;
835
0
    }
836
2.72k
  }
837
2.72k
  return false;
838
2.72k
}
839
840
4.85k
Status CompactionJob::CollectSubcompactionErrors() {
841
4.85k
  Status status;
842
4.85k
  IOStatus io_s;
843
844
4.85k
  for (const auto& state : compact_->sub_compact_states) {
845
4.85k
    if (!state.status.ok()) {
846
2.12k
      status = state.status;
847
2.12k
      io_s = state.io_status;
848
2.12k
      break;
849
2.12k
    }
850
4.85k
  }
851
852
4.85k
  if (io_status_.ok()) {
853
4.85k
    io_status_ = io_s;
854
4.85k
  }
855
856
4.85k
  return status;
857
4.85k
}
858
859
2.72k
Status CompactionJob::SyncOutputDirectories() {
860
2.72k
  Status status;
861
2.72k
  IOStatus io_s;
862
2.72k
  constexpr IODebugContext* dbg = nullptr;
863
2.72k
  const bool wrote_new_blob_files = HasNewBlobFiles();
864
2.72k
  if (output_directory_) {
865
2.72k
    io_s = output_directory_->FsyncWithDirOptions(
866
2.72k
        IOOptions(), dbg,
867
2.72k
        DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
868
2.72k
  }
869
870
2.72k
  if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
871
0
      blob_output_directory_ != output_directory_) {
872
0
    io_s = blob_output_directory_->FsyncWithDirOptions(
873
0
        IOOptions(), dbg,
874
0
        DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
875
0
  }
876
877
2.72k
  if (io_status_.ok()) {
878
2.72k
    io_status_ = io_s;
879
2.72k
  }
880
2.72k
  if (status.ok()) {
881
2.72k
    status = io_s;
882
2.72k
  }
883
884
2.72k
  return status;
885
2.72k
}
886
887
2.72k
Status CompactionJob::VerifyOutputFiles() {
888
2.72k
  Status status;
889
2.72k
  std::vector<port::Thread> thread_pool;
890
2.72k
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
891
2.72k
  VerifyOutputFlags verify_output_flags =
892
2.72k
      compact_->compaction->mutable_cf_options().verify_output_flags;
893
894
  // For backward compatibility
895
2.72k
  if (paranoid_file_checks_) {
896
0
    verify_output_flags |= VerifyOutputFlags::kVerifyIteration;
897
0
    verify_output_flags |= VerifyOutputFlags::kEnableForLocalCompaction;
898
0
    verify_output_flags |= VerifyOutputFlags::kEnableForRemoteCompaction;
899
0
  }
900
901
2.72k
  auto verify_table = [&](SubcompactionState& subcompaction_state) {
902
    // Collect file open metadata during verification when fast_sst_open
903
    // is enabled, keyed by file number.
904
2.72k
    UnorderedMap<uint64_t, std::string> file_open_metadata_map;
905
906
2.72k
    for (const auto& output_file : subcompaction_state.GetOutputs()) {
907
      // Verify that the table is usable
908
      // We set for_compaction to false and don't
909
      // OptimizeForCompactionTableRead here because this is a special case
910
      // after we finish the table building No matter whether
911
      // use_direct_io_for_flush_and_compaction is true, we will regard this
912
      // verification as user reads since the goal is to cache it here for
913
      // further user reads
914
1.57k
      ReadOptions verification_read_options(Env::IOActivity::kCompaction);
915
1.57k
      verification_read_options.verify_checksums = true;
916
1.57k
      verification_read_options.readahead_size =
917
1.57k
          file_options_for_read_.compaction_readahead_size;
918
919
1.57k
      std::unique_ptr<TableReader> table_reader_guard;
920
1.57k
      TableReader* table_reader_ptr = table_reader_guard.get();
921
1.57k
      verification_read_options.rate_limiter_priority =
922
1.57k
          GetRateLimiterPriority();
923
1.57k
      std::string file_open_metadata;
924
1.57k
      std::string* file_open_metadata_ptr =
925
1.57k
          mutable_db_options_copy_.fast_sst_open ? &file_open_metadata
926
1.57k
                                                 : nullptr;
927
1.57k
      InternalIterator* iter = cfd->table_cache()->NewIterator(
928
1.57k
          verification_read_options, file_options_, cfd->internal_comparator(),
929
1.57k
          output_file.meta,
930
1.57k
          /*range_del_agg=*/nullptr, compact_->compaction->mutable_cf_options(),
931
1.57k
          /*table_reader_ptr=*/&table_reader_ptr,
932
1.57k
          cfd->internal_stats()->GetFileReadHist(
933
1.57k
              compact_->compaction->output_level()),
934
1.57k
          TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
935
1.57k
          /*skip_filters=*/false, compact_->compaction->output_level(),
936
1.57k
          MaxFileSizeForL0MetaPin(compact_->compaction->mutable_cf_options()),
937
1.57k
          /*smallest_compaction_key=*/nullptr,
938
1.57k
          /*largest_compaction_key=*/nullptr,
939
1.57k
          /*allow_unprepared_value=*/false,
940
1.57k
          /*range_del_read_seqno=*/nullptr,
941
1.57k
          /*range_del_iter=*/nullptr,
942
1.57k
          /*maybe_pin_table_handle=*/false, file_open_metadata_ptr);
943
1.57k
      auto s = iter->status();
944
1.57k
      if (s.ok()) {
945
        // Check for remote/local compaction and verify_output_flags flags
946
1.57k
        const bool should_verify =
947
1.57k
            (subcompaction_state.compaction_job_stats.is_remote_compaction &&
948
0
             !!(verify_output_flags &
949
0
                VerifyOutputFlags::kEnableForRemoteCompaction)) ||
950
1.57k
            (!subcompaction_state.compaction_job_stats.is_remote_compaction &&
951
1.57k
             !!(verify_output_flags &
952
1.57k
                VerifyOutputFlags::kEnableForLocalCompaction));
953
954
1.57k
        if (should_verify) {
955
0
          const bool should_verify_block_checksum =
956
0
              !!(verify_output_flags & VerifyOutputFlags::kVerifyBlockChecksum);
957
0
          const bool should_verify_iteration =
958
0
              !!(verify_output_flags & VerifyOutputFlags::kVerifyIteration);
959
0
          const bool should_verify_file_checksum =
960
0
              !!(verify_output_flags &
961
0
                 VerifyOutputFlags::kVerifyFileChecksum) &&
962
0
              db_options_.file_checksum_gen_factory != nullptr &&
963
0
              output_file.meta.file_checksum != kUnknownFileChecksum;
964
0
          if (should_verify_block_checksum) {
965
0
            assert(table_reader_ptr != nullptr);
966
            // If verifying iteration as well, verify meta blocks here only to
967
            // avoid redundant checks on data blocks
968
0
            s = table_reader_ptr->VerifyChecksum(
969
0
                verification_read_options, TableReaderCaller::kCompaction,
970
0
                /*meta_blocks_only=*/should_verify_iteration);
971
0
          }
972
0
          if (s.ok() && should_verify_iteration) {
973
0
            OutputValidator validator(cfd->internal_comparator(),
974
0
                                      /*_enable_hash=*/true);
975
0
            for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
976
0
              s = validator.Add(iter->key(), iter->value());
977
0
              if (!s.ok()) {
978
0
                break;
979
0
              }
980
0
            }
981
0
            if (s.ok()) {
982
0
              s = iter->status();
983
0
            }
984
0
            if (s.ok() && !validator.CompareValidator(output_file.validator)) {
985
0
              s = Status::Corruption(
986
0
                  "Key-value checksum of compaction output doesn't match what "
987
0
                  "was computed when written");
988
0
            }
989
0
          }
990
0
          if (s.ok() && should_verify_file_checksum) {
991
0
            std::string file_checksum;
992
0
            std::string file_checksum_func_name;
993
0
            std::string fname =
994
0
                GetTableFileName(output_file.meta.fd.GetNumber());
995
0
            s = GenerateOneFileChecksum(
996
0
                fs_.get(), fname, db_options_.file_checksum_gen_factory.get(),
997
0
                output_file.meta.file_checksum_func_name, &file_checksum,
998
0
                &file_checksum_func_name,
999
0
                verification_read_options.readahead_size,
1000
0
                db_options_.allow_mmap_reads, io_tracer_,
1001
0
                db_options_.rate_limiter.get(), verification_read_options,
1002
0
                stats_, db_options_.clock, file_options_for_read_);
1003
0
            if (s.ok() && file_checksum != output_file.meta.file_checksum) {
1004
0
              s = Status::Corruption(
1005
0
                  "File checksum mismatch for compaction output file " + fname);
1006
0
            }
1007
0
          }
1008
0
        }
1009
1.57k
      }
1010
1011
1.57k
      delete iter;
1012
1013
1.57k
      if (!s.ok()) {
1014
0
        subcompaction_state.status = s;
1015
0
        break;
1016
0
      }
1017
1018
1.57k
      if (!file_open_metadata.empty()) {
1019
0
        file_open_metadata_map[output_file.meta.fd.GetNumber()] =
1020
0
            std::move(file_open_metadata);
1021
0
      }
1022
1.57k
    }
1023
1024
    // Apply collected file open metadata to mutable outputs
1025
2.72k
    if (!file_open_metadata_map.empty()) {
1026
0
      auto apply_metadata =
1027
0
          [&file_open_metadata_map](
1028
0
              std::vector<CompactionOutputs::Output>& outputs) {
1029
0
            for (auto& output : outputs) {
1030
0
              auto it = file_open_metadata_map.find(output.meta.fd.GetNumber());
1031
0
              if (it != file_open_metadata_map.end()) {
1032
0
                output.meta.file_open_metadata = std::move(it->second);
1033
0
              }
1034
0
            }
1035
0
          };
1036
0
      apply_metadata(subcompaction_state.GetMutableCompactionOutputs());
1037
0
      apply_metadata(subcompaction_state.GetMutableProximalOutputs());
1038
0
    }
1039
2.72k
  };
1040
2.72k
  for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
1041
0
    thread_pool.emplace_back(verify_table,
1042
0
                             std::ref(compact_->sub_compact_states[i]));
1043
0
  }
1044
2.72k
  verify_table(compact_->sub_compact_states[0]);
1045
2.72k
  for (auto& thread : thread_pool) {
1046
0
    thread.join();
1047
0
  }
1048
1049
2.72k
  for (const auto& state : compact_->sub_compact_states) {
1050
2.72k
    if (!state.status.ok()) {
1051
0
      status = state.status;
1052
0
      break;
1053
0
    }
1054
2.72k
  }
1055
1056
2.72k
  return status;
1057
2.72k
}
1058
1059
2.72k
void CompactionJob::SetOutputTableProperties() {
1060
2.72k
  for (const auto& state : compact_->sub_compact_states) {
1061
2.72k
    for (const auto& output : state.GetOutputs()) {
1062
1.57k
      auto fn =
1063
1.57k
          TableFileName(state.compaction->immutable_options().cf_paths,
1064
1.57k
                        output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
1065
1.57k
      compact_->compaction->SetOutputTableProperties(fn,
1066
1.57k
                                                     output.table_properties);
1067
1.57k
    }
1068
2.72k
  }
1069
2.72k
}
1070
1071
4.85k
void CompactionJob::AggregateSubcompactionOutputAndJobStats() {
1072
  // Before the compaction starts, is_remote_compaction was set to true if
1073
  // compaction_service is set. We now know whether each sub_compaction was
1074
  // done remotely or not. Reset is_remote_compaction back to false and allow
1075
  // AggregateCompactionStats() to set the right value.
1076
4.85k
  job_stats_->is_remote_compaction = false;
1077
1078
  // Finish up all bookkeeping to unify the subcompaction results.
1079
4.85k
  compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
1080
4.85k
}
1081
1082
Status CompactionJob::VerifyCompactionRecordCounts(
1083
2.72k
    bool stats_built_from_input_table_prop, uint64_t num_input_range_del) {
1084
2.72k
  Status status;
1085
2.72k
  if (stats_built_from_input_table_prop &&
1086
2.72k
      job_stats_->has_accurate_num_input_records) {
1087
2.72k
    status = VerifyInputRecordCount(num_input_range_del);
1088
2.72k
    if (!status.ok()) {
1089
0
      return status;
1090
0
    }
1091
2.72k
  }
1092
1093
2.72k
  const auto& mutable_cf_options = compact_->compaction->mutable_cf_options();
1094
2.72k
  if ((mutable_cf_options.table_factory->IsInstanceOf(
1095
2.72k
           TableFactory::kBlockBasedTableName()) ||
1096
0
       mutable_cf_options.table_factory->IsInstanceOf(
1097
2.72k
           TableFactory::kPlainTableName()))) {
1098
2.72k
    status = VerifyOutputRecordCount();
1099
2.72k
    if (!status.ok()) {
1100
0
      return status;
1101
0
    }
1102
2.72k
  }
1103
2.72k
  return status;
1104
2.72k
}
1105
1106
void CompactionJob::FinalizeCompactionRun(
1107
    const Status& input_status, bool stats_built_from_input_table_prop,
1108
4.85k
    uint64_t num_input_range_del) {
1109
4.85k
  if (stats_built_from_input_table_prop) {
1110
4.85k
    UpdateCompactionJobInputStatsFromInternalStats(internal_stats_,
1111
4.85k
                                                   num_input_range_del);
1112
4.85k
  }
1113
4.85k
  UpdateCompactionJobOutputStatsFromInternalStats(input_status,
1114
4.85k
                                                  internal_stats_);
1115
4.85k
  RecordCompactionIOStats();
1116
1117
4.85k
  LogFlush(db_options_.info_log);
1118
4.85k
  TEST_SYNC_POINT("CompactionJob::Run():End");
1119
4.85k
  compact_->status = input_status;
1120
4.85k
  TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():EndStatusSet",
1121
4.85k
                           const_cast<Status*>(&input_status));
1122
4.85k
}
1123
1124
4.85k
Status CompactionJob::Run() {
1125
4.85k
  InitializeCompactionRun();
1126
1127
4.85k
  const uint64_t start_micros = db_options_.clock->NowMicros();
1128
1129
4.85k
  RunSubcompactions();
1130
1131
4.85k
  UpdateTimingStats(start_micros);
1132
1133
4.85k
  TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
1134
1135
4.85k
  Status status = CollectSubcompactionErrors();
1136
1137
  // If compaction was aborted or manually paused, clean up any output files
1138
  // from completed subcompactions to prevent orphaned files on disk.
1139
  // Skip cleanup for resumable compaction (when progress writer is set)
1140
  // because the output files are needed for resumption.
1141
4.85k
  if ((status.IsCompactionAborted() || status.IsManualCompactionPaused()) &&
1142
0
      compaction_progress_writer_ == nullptr) {
1143
0
    CleanupAbortedSubcompactions();
1144
0
  }
1145
1146
4.85k
  if (status.ok()) {
1147
2.72k
    status = SyncOutputDirectories();
1148
2.72k
  }
1149
1150
4.85k
  if (status.ok()) {
1151
2.72k
    status = VerifyOutputFiles();
1152
2.72k
  }
1153
1154
4.85k
  TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():AfterVerifyOutputFiles",
1155
4.85k
                           &status);
1156
1157
4.85k
  if (status.ok()) {
1158
2.72k
    SetOutputTableProperties();
1159
2.72k
  }
1160
1161
4.85k
  AggregateSubcompactionOutputAndJobStats();
1162
1163
4.85k
  uint64_t num_input_range_del = 0;
1164
4.85k
  bool stats_built_from_input_table_prop =
1165
4.85k
      UpdateInternalStatsFromInputFiles(&num_input_range_del);
1166
1167
4.85k
  if (status.ok()) {
1168
2.72k
    status = VerifyCompactionRecordCounts(stats_built_from_input_table_prop,
1169
2.72k
                                          num_input_range_del);
1170
2.72k
  }
1171
1172
4.85k
  FinalizeCompactionRun(status, stats_built_from_input_table_prop,
1173
4.85k
                        num_input_range_del);
1174
1175
4.85k
  return status;
1176
4.85k
}
1177
1178
4.85k
Status CompactionJob::Install(bool* compaction_released) {
1179
4.85k
  assert(compact_);
1180
1181
4.85k
  AutoThreadOperationStageUpdater stage_updater(
1182
4.85k
      ThreadStatus::STAGE_COMPACTION_INSTALL);
1183
4.85k
  db_mutex_->AssertHeld();
1184
4.85k
  Status status = compact_->status;
1185
1186
4.85k
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
1187
4.85k
  assert(cfd);
1188
1189
4.85k
  int output_level = compact_->compaction->output_level();
1190
4.85k
  cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
1191
4.85k
                                            internal_stats_);
1192
1193
4.85k
  if (status.ok()) {
1194
2.72k
    status = InstallCompactionResults(compaction_released);
1195
2.72k
  }
1196
4.85k
  if (!versions_->io_status().ok()) {
1197
0
    io_status_ = versions_->io_status();
1198
0
  }
1199
1200
4.85k
  VersionStorageInfo::LevelSummaryStorage tmp;
1201
4.85k
  auto vstorage = cfd->current()->storage_info();
1202
4.85k
  const auto& stats = internal_stats_.output_level_stats;
1203
1204
4.85k
  double read_write_amp = 0.0;
1205
4.85k
  double write_amp = 0.0;
1206
4.85k
  double bytes_read_per_sec = 0;
1207
4.85k
  double bytes_written_per_sec = 0;
1208
1209
4.85k
  const uint64_t bytes_read_non_output_and_blob =
1210
4.85k
      stats.bytes_read_non_output_levels + stats.bytes_read_blob;
1211
4.85k
  const uint64_t bytes_read_all =
1212
4.85k
      stats.bytes_read_output_level + bytes_read_non_output_and_blob;
1213
4.85k
  const uint64_t bytes_written_all =
1214
4.85k
      stats.bytes_written + stats.bytes_written_blob;
1215
1216
4.85k
  if (bytes_read_non_output_and_blob > 0) {
1217
3.25k
    read_write_amp = (bytes_written_all + bytes_read_all) /
1218
3.25k
                     static_cast<double>(bytes_read_non_output_and_blob);
1219
3.25k
    write_amp =
1220
3.25k
        bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
1221
3.25k
  }
1222
4.85k
  if (stats.micros > 0) {
1223
4.85k
    bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
1224
4.85k
    bytes_written_per_sec =
1225
4.85k
        bytes_written_all / static_cast<double>(stats.micros);
1226
4.85k
  }
1227
1228
4.85k
  const std::string& column_family_name = cfd->GetName();
1229
1230
4.85k
  constexpr double kMB = 1048576.0;
1231
1232
4.85k
  ROCKS_LOG_BUFFER(
1233
4.85k
      log_buffer_,
1234
4.85k
      "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
1235
4.85k
      "files in(%d, %d) filtered(%d, %d) out(%d +%d blob) "
1236
4.85k
      "MB in(%.1f, %.1f +%.1f blob) filtered(%.1f, %.1f) out(%.1f +%.1f blob), "
1237
4.85k
      "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
1238
4.85k
      ", records dropped: %" PRIu64 " output_compression: %s\n",
1239
4.85k
      column_family_name.c_str(), vstorage->LevelSummary(&tmp),
1240
4.85k
      bytes_read_per_sec, bytes_written_per_sec,
1241
4.85k
      compact_->compaction->output_level(),
1242
4.85k
      stats.num_input_files_in_non_output_levels,
1243
4.85k
      stats.num_input_files_in_output_level,
1244
4.85k
      stats.num_filtered_input_files_in_non_output_levels,
1245
4.85k
      stats.num_filtered_input_files_in_output_level, stats.num_output_files,
1246
4.85k
      stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
1247
4.85k
      stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
1248
4.85k
      stats.bytes_skipped_non_output_levels / kMB,
1249
4.85k
      stats.bytes_skipped_output_level / kMB, stats.bytes_written / kMB,
1250
4.85k
      stats.bytes_written_blob / kMB, read_write_amp, write_amp,
1251
4.85k
      status.ToString().c_str(), stats.num_input_records,
1252
4.85k
      stats.num_dropped_records,
1253
4.85k
      CompressionTypeToString(compact_->compaction->output_compression())
1254
4.85k
          .c_str());
1255
1256
4.85k
  const auto& blob_files = vstorage->GetBlobFiles();
1257
4.85k
  if (!blob_files.empty()) {
1258
0
    assert(blob_files.front());
1259
0
    assert(blob_files.back());
1260
1261
0
    ROCKS_LOG_BUFFER(
1262
0
        log_buffer_,
1263
0
        "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
1264
0
        column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
1265
0
        blob_files.back()->GetBlobFileNumber());
1266
0
  }
1267
1268
4.85k
  if (internal_stats_.has_proximal_level_output) {
1269
0
    ROCKS_LOG_BUFFER(log_buffer_,
1270
0
                     "[%s] has Proximal Level output: %" PRIu64
1271
0
                     ", level %d, number of files: %" PRIu64
1272
0
                     ", number of records: %" PRIu64,
1273
0
                     column_family_name.c_str(),
1274
0
                     internal_stats_.proximal_level_stats.bytes_written,
1275
0
                     compact_->compaction->GetProximalLevel(),
1276
0
                     internal_stats_.proximal_level_stats.num_output_files,
1277
0
                     internal_stats_.proximal_level_stats.num_output_records);
1278
0
  }
1279
1280
4.85k
  TEST_SYNC_POINT_CALLBACK(
1281
4.85k
      "CompactionJob::Install:AfterUpdateCompactionJobStats", job_stats_);
1282
1283
4.85k
  auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
1284
4.85k
  stream << "job" << job_id_ << "event" << "compaction_finished"
1285
4.85k
         << "compaction_time_micros" << stats.micros
1286
4.85k
         << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
1287
4.85k
         << compact_->compaction->output_level() << "num_output_files"
1288
4.85k
         << stats.num_output_files << "total_output_size"
1289
4.85k
         << stats.bytes_written;
1290
1291
4.85k
  if (stats.num_output_files_blob > 0) {
1292
0
    stream << "num_blob_output_files" << stats.num_output_files_blob
1293
0
           << "total_blob_output_size" << stats.bytes_written_blob;
1294
0
  }
1295
1296
4.85k
  stream << "num_input_records" << stats.num_input_records
1297
4.85k
         << "num_output_records" << stats.num_output_records
1298
4.85k
         << "num_subcompactions" << compact_->sub_compact_states.size()
1299
4.85k
         << "output_compression"
1300
4.85k
         << CompressionTypeToString(compact_->compaction->output_compression());
1301
1302
4.85k
  stream << "num_single_delete_mismatches"
1303
4.85k
         << job_stats_->num_single_del_mismatch;
1304
4.85k
  stream << "num_single_delete_fallthrough"
1305
4.85k
         << job_stats_->num_single_del_fallthru;
1306
1307
4.85k
  if (measure_io_stats_) {
1308
0
    stream << "file_write_nanos" << job_stats_->file_write_nanos;
1309
0
    stream << "file_range_sync_nanos" << job_stats_->file_range_sync_nanos;
1310
0
    stream << "file_fsync_nanos" << job_stats_->file_fsync_nanos;
1311
0
    stream << "file_prepare_write_nanos"
1312
0
           << job_stats_->file_prepare_write_nanos;
1313
0
  }
1314
1315
4.85k
  stream << "lsm_state";
1316
4.85k
  stream.StartArray();
1317
38.8k
  for (int level = 0; level < vstorage->num_levels(); ++level) {
1318
33.9k
    stream << vstorage->NumLevelFiles(level);
1319
33.9k
  }
1320
4.85k
  stream.EndArray();
1321
1322
4.85k
  if (!blob_files.empty()) {
1323
0
    assert(blob_files.front());
1324
0
    stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
1325
1326
0
    assert(blob_files.back());
1327
0
    stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
1328
0
  }
1329
1330
4.85k
  if (internal_stats_.has_proximal_level_output) {
1331
0
    InternalStats::CompactionStats& pl_stats =
1332
0
        internal_stats_.proximal_level_stats;
1333
0
    stream << "proximal_level_num_output_files" << pl_stats.num_output_files;
1334
0
    stream << "proximal_level_bytes_written" << pl_stats.bytes_written;
1335
0
    stream << "proximal_level_num_output_records"
1336
0
           << pl_stats.num_output_records;
1337
0
    stream << "proximal_level_num_output_files_blob"
1338
0
           << pl_stats.num_output_files_blob;
1339
0
    stream << "proximal_level_bytes_written_blob"
1340
0
           << pl_stats.bytes_written_blob;
1341
0
  }
1342
1343
  // Propagate Install failure to compact_->status so that
1344
  // CleanupCompaction() -> SubcompactionState::Cleanup() sees the failure and
1345
  // calls ReleaseObsolete on output files' table cache entries. Without this,
1346
  // if Run() succeeds but InstallCompactionResults() fails, Cleanup would see
1347
  // overall_status = OK and skip ReleaseObsolete, leaking entries for output
1348
  // files that were never installed into any Version.
1349
4.85k
  if (!status.ok() && compact_->status.ok()) {
1350
0
    compact_->status = status;
1351
0
  }
1352
1353
4.85k
  CleanupCompaction();
1354
4.85k
  return status;
1355
4.85k
}
1356
1357
void CompactionJob::NotifyOnSubcompactionBegin(
1358
4.85k
    SubcompactionState* sub_compact) {
1359
4.85k
  Compaction* c = compact_->compaction;
1360
1361
4.85k
  if (db_options_.listeners.empty()) {
1362
4.85k
    return;
1363
4.85k
  }
1364
0
  if (shutting_down_->load(std::memory_order_acquire)) {
1365
0
    return;
1366
0
  }
1367
0
  if (c->is_manual_compaction() &&
1368
0
      manual_compaction_canceled_.load(std::memory_order_acquire)) {
1369
0
    return;
1370
0
  }
1371
1372
0
  sub_compact->notify_on_subcompaction_completion = true;
1373
1374
0
  SubcompactionJobInfo info{};
1375
0
  sub_compact->BuildSubcompactionJobInfo(info);
1376
0
  info.job_id = static_cast<int>(job_id_);
1377
0
  info.thread_id = env_->GetThreadID();
1378
1379
0
  for (const auto& listener : db_options_.listeners) {
1380
0
    listener->OnSubcompactionBegin(info);
1381
0
  }
1382
0
  info.status.PermitUncheckedError();
1383
0
}
1384
1385
void CompactionJob::NotifyOnSubcompactionCompleted(
1386
4.85k
    SubcompactionState* sub_compact) {
1387
4.85k
  if (db_options_.listeners.empty()) {
1388
4.85k
    return;
1389
4.85k
  }
1390
0
  if (shutting_down_->load(std::memory_order_acquire)) {
1391
0
    return;
1392
0
  }
1393
1394
0
  if (sub_compact->notify_on_subcompaction_completion == false) {
1395
0
    return;
1396
0
  }
1397
1398
0
  SubcompactionJobInfo info{};
1399
0
  sub_compact->BuildSubcompactionJobInfo(info);
1400
0
  info.job_id = static_cast<int>(job_id_);
1401
0
  info.thread_id = env_->GetThreadID();
1402
1403
0
  for (const auto& listener : db_options_.listeners) {
1404
0
    listener->OnSubcompactionCompleted(info);
1405
0
  }
1406
0
}
1407
1408
4.85k
bool CompactionJob::ShouldUseLocalCompaction(SubcompactionState* sub_compact) {
1409
4.85k
  if (db_options_.compaction_service) {
1410
0
    CompactionServiceJobStatus comp_status =
1411
0
        ProcessKeyValueCompactionWithCompactionService(sub_compact);
1412
0
    if (comp_status != CompactionServiceJobStatus::kUseLocal) {
1413
0
      return false;
1414
0
    }
1415
    // fallback to local compaction
1416
0
    assert(comp_status == CompactionServiceJobStatus::kUseLocal);
1417
0
    sub_compact->compaction_job_stats.is_remote_compaction = false;
1418
0
  }
1419
4.85k
  return true;
1420
4.85k
}
1421
1422
4.85k
CompactionJob::CompactionIOStatsSnapshot CompactionJob::InitializeIOStats() {
1423
4.85k
  CompactionIOStatsSnapshot io_stats;
1424
1425
4.85k
  if (measure_io_stats_) {
1426
0
    io_stats.prev_perf_level = GetPerfLevel();
1427
0
    SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1428
0
    io_stats.prev_write_nanos = IOSTATS(write_nanos);
1429
0
    io_stats.prev_fsync_nanos = IOSTATS(fsync_nanos);
1430
0
    io_stats.prev_range_sync_nanos = IOSTATS(range_sync_nanos);
1431
0
    io_stats.prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
1432
0
    io_stats.prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
1433
0
    io_stats.prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
1434
0
  }
1435
1436
4.85k
  return io_stats;
1437
4.85k
}
1438
1439
Status CompactionJob::SetupAndValidateCompactionFilter(
1440
    SubcompactionState* sub_compact,
1441
    const CompactionFilter* configured_compaction_filter,
1442
    const CompactionFilter*& compaction_filter,
1443
4.85k
    std::unique_ptr<CompactionFilter>& compaction_filter_from_factory) {
1444
4.85k
  compaction_filter = configured_compaction_filter;
1445
1446
4.85k
  if (compaction_filter == nullptr) {
1447
4.85k
    compaction_filter_from_factory =
1448
4.85k
        sub_compact->compaction->CreateCompactionFilter();
1449
4.85k
    compaction_filter = compaction_filter_from_factory.get();
1450
4.85k
  }
1451
1452
4.85k
  if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
1453
0
    return Status::NotSupported(
1454
0
        "CompactionFilter::IgnoreSnapshots() = false is not supported "
1455
0
        "anymore.");
1456
0
  }
1457
1458
4.85k
  return Status::OK();
1459
4.85k
}
1460
1461
void CompactionJob::InitializeReadOptionsAndBoundaries(
1462
    const size_t ts_sz, ReadOptions& read_options,
1463
4.85k
    SubcompactionKeyBoundaries& boundaries) {
1464
4.85k
  read_options.verify_checksums = true;
1465
4.85k
  read_options.fill_cache = false;
1466
4.85k
  read_options.rate_limiter_priority = GetRateLimiterPriority();
1467
4.85k
  read_options.io_activity = Env::IOActivity::kCompaction;
1468
  // Compaction iterators shouldn't be confined to a single prefix.
1469
  // Compactions use Seek() for
1470
  // (a) concurrent compactions,
1471
  // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
1472
4.85k
  read_options.total_order_seek = true;
1473
1474
  // Remove the timestamps from boundaries because boundaries created in
1475
  // GenSubcompactionBoundaries doesn't strip away the timestamp.
1476
4.85k
  if (boundaries.start.has_value()) {
1477
0
    read_options.iterate_lower_bound = &(*boundaries.start);
1478
0
    if (ts_sz > 0) {
1479
0
      boundaries.start_without_ts =
1480
0
          StripTimestampFromUserKey(*boundaries.start, ts_sz);
1481
0
      read_options.iterate_lower_bound = &(*boundaries.start_without_ts);
1482
0
    }
1483
0
  }
1484
4.85k
  if (boundaries.end.has_value()) {
1485
0
    read_options.iterate_upper_bound = &(*boundaries.end);
1486
0
    if (ts_sz > 0) {
1487
0
      boundaries.end_without_ts =
1488
0
          StripTimestampFromUserKey(*boundaries.end, ts_sz);
1489
0
      read_options.iterate_upper_bound = &(*boundaries.end_without_ts);
1490
0
    }
1491
0
  }
1492
1493
4.85k
  if (ts_sz > 0) {
1494
0
    if (ts_sz <= strlen(boundaries.kMaxTs)) {
1495
0
      boundaries.ts_slice = Slice(boundaries.kMaxTs, ts_sz);
1496
0
    } else {
1497
0
      boundaries.max_ts = std::string(ts_sz, '\xff');
1498
0
      boundaries.ts_slice = Slice(boundaries.max_ts);
1499
0
    }
1500
0
  }
1501
4.85k
  if (boundaries.start.has_value()) {
1502
0
    boundaries.start_ikey.SetInternalKey(*boundaries.start, kMaxSequenceNumber,
1503
0
                                         kValueTypeForSeek);
1504
0
    if (ts_sz > 0) {
1505
0
      boundaries.start_ikey.UpdateInternalKey(
1506
0
          kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice);
1507
0
    }
1508
0
    boundaries.start_internal_key = boundaries.start_ikey.GetInternalKey();
1509
0
    boundaries.start_user_key = boundaries.start_ikey.GetUserKey();
1510
0
  }
1511
4.85k
  if (boundaries.end.has_value()) {
1512
0
    boundaries.end_ikey.SetInternalKey(*boundaries.end, kMaxSequenceNumber,
1513
0
                                       kValueTypeForSeek);
1514
0
    if (ts_sz > 0) {
1515
0
      boundaries.end_ikey.UpdateInternalKey(
1516
0
          kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice);
1517
0
    }
1518
0
    boundaries.end_internal_key = boundaries.end_ikey.GetInternalKey();
1519
0
    boundaries.end_user_key = boundaries.end_ikey.GetUserKey();
1520
0
  }
1521
4.85k
}
1522
1523
InternalIterator* CompactionJob::CreateInputIterator(
1524
    SubcompactionState* sub_compact, ColumnFamilyData* cfd,
1525
    SubcompactionInternalIterators& iterators,
1526
4.85k
    SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options) {
1527
4.85k
  const size_t ts_sz = cfd->user_comparator()->timestamp_size();
1528
4.85k
  InitializeReadOptionsAndBoundaries(ts_sz, read_options, boundaries);
1529
1530
  // This is assigned after creation of SubcompactionState to simplify that
1531
  // creation across both CompactionJob and CompactionServiceCompactionJob
1532
4.85k
  sub_compact->AssignRangeDelAggregator(
1533
4.85k
      std::make_unique<CompactionRangeDelAggregator>(
1534
4.85k
          &cfd->internal_comparator(), job_context_->snapshot_seqs,
1535
4.85k
          &full_history_ts_low_, &trim_ts_));
1536
1537
  // Although the v2 aggregator is what the level iterator(s) know about,
1538
  // the AddTombstones calls will be propagated down to the v1 aggregator.
1539
4.85k
  iterators.raw_input =
1540
4.85k
      std::unique_ptr<InternalIterator>(versions_->MakeInputIterator(
1541
4.85k
          read_options, sub_compact->compaction, sub_compact->RangeDelAgg(),
1542
4.85k
          file_options_for_read_, boundaries.start, boundaries.end));
1543
4.85k
  InternalIterator* input = iterators.raw_input.get();
1544
1545
4.85k
  if (boundaries.start.has_value() || boundaries.end.has_value()) {
1546
0
    iterators.clip = std::make_unique<ClippingIterator>(
1547
0
        iterators.raw_input.get(),
1548
0
        boundaries.start.has_value() ? &boundaries.start_internal_key : nullptr,
1549
0
        boundaries.end.has_value() ? &boundaries.end_internal_key : nullptr,
1550
0
        &cfd->internal_comparator());
1551
0
    input = iterators.clip.get();
1552
0
  }
1553
1554
4.85k
  if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
1555
0
    BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter();
1556
0
    iterators.blob_counter =
1557
0
        std::make_unique<BlobCountingIterator>(input, meter);
1558
0
    input = iterators.blob_counter.get();
1559
0
  }
1560
1561
4.85k
  if (ts_sz > 0 && !trim_ts_.empty()) {
1562
0
    iterators.trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
1563
0
        input, cfd->user_comparator(), trim_ts_);
1564
0
    input = iterators.trim_history_iter.get();
1565
0
  }
1566
1567
4.85k
  return input;
1568
4.85k
}
1569
1570
void CompactionJob::CreateBlobFileBuilder(
1571
    SubcompactionState* sub_compact, ColumnFamilyData* cfd,
1572
    std::unique_ptr<BlobFileBuilder>& blob_file_builder,
1573
4.85k
    const WriteOptions& write_options) {
1574
4.85k
  const auto& mutable_cf_options =
1575
4.85k
      sub_compact->compaction->mutable_cf_options();
1576
1577
  // TODO: BlobDB to support output_to_proximal_level compaction, which needs
1578
  //  2 builders, so may need to move to `CompactionOutputs`
1579
4.85k
  if (mutable_cf_options.enable_blob_files &&
1580
0
      sub_compact->compaction->output_level() >=
1581
0
          mutable_cf_options.blob_file_starting_level) {
1582
0
    blob_file_builder = std::make_unique<BlobFileBuilder>(
1583
0
        versions_, fs_.get(), &sub_compact->compaction->immutable_options(),
1584
0
        &mutable_cf_options, &file_options_, &write_options, db_id_,
1585
0
        db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_,
1586
0
        io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction,
1587
0
        sub_compact->Current().GetOutputFilePathsPtr(),
1588
0
        sub_compact->Current().GetBlobFileAdditionsPtr());
1589
4.85k
  } else {
1590
4.85k
    blob_file_builder = nullptr;
1591
4.85k
  }
1592
4.85k
}
1593
1594
std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator(
1595
    SubcompactionState* sub_compact, ColumnFamilyData* cfd,
1596
    InternalIterator* input, const CompactionFilter* compaction_filter,
1597
    MergeHelper& merge, std::unique_ptr<BlobFileBuilder>& blob_file_builder,
1598
4.85k
    const WriteOptions& write_options) {
1599
4.85k
  CreateBlobFileBuilder(sub_compact, cfd, blob_file_builder, write_options);
1600
1601
4.85k
  const std::string* const full_history_ts_low =
1602
4.85k
      full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
1603
4.85k
  assert(job_context_);
1604
1605
4.85k
  return std::make_unique<CompactionIterator>(
1606
4.85k
      input, cfd->user_comparator(), &merge, versions_->LastSequence(),
1607
4.85k
      &(job_context_->snapshot_seqs), earliest_snapshot_,
1608
4.85k
      job_context_->earliest_write_conflict_snapshot,
1609
4.85k
      job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker,
1610
4.85k
      env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(),
1611
4.85k
      blob_file_builder.get(), db_options_.allow_data_in_errors,
1612
4.85k
      db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
1613
4.85k
      sub_compact->compaction
1614
4.85k
          ->DoesInputReferenceBlobFiles() /* must_count_input_entries */,
1615
4.85k
      sub_compact->compaction, compaction_filter, shutting_down_,
1616
4.85k
      db_options_.info_log, full_history_ts_low, preserve_seqno_after_);
1617
4.85k
}
1618
1619
std::pair<CompactionFileOpenFunc, CompactionFileCloseFunc>
1620
CompactionJob::CreateFileHandlers(SubcompactionState* sub_compact,
1621
4.85k
                                  SubcompactionKeyBoundaries& boundaries) {
1622
4.85k
  const CompactionFileOpenFunc open_file_func =
1623
4.85k
      [this, sub_compact](CompactionOutputs& outputs) {
1624
2.28k
        return this->OpenCompactionOutputFile(sub_compact, outputs);
1625
2.28k
      };
1626
1627
4.85k
  const Slice* start_user_key =
1628
4.85k
      sub_compact->start.has_value() ? &boundaries.start_user_key : nullptr;
1629
4.85k
  const Slice* end_user_key =
1630
4.85k
      sub_compact->end.has_value() ? &boundaries.end_user_key : nullptr;
1631
1632
4.85k
  const CompactionFileCloseFunc close_file_func =
1633
4.85k
      [this, sub_compact, start_user_key, end_user_key](
1634
4.85k
          const Status& status,
1635
4.85k
          const ParsedInternalKey& prev_iter_output_internal_key,
1636
4.85k
          const Slice& next_table_min_key, const CompactionIterator* c_iter,
1637
4.85k
          CompactionOutputs& outputs) {
1638
2.28k
        return this->FinishCompactionOutputFile(
1639
2.28k
            status, prev_iter_output_internal_key, next_table_min_key,
1640
2.28k
            start_user_key, end_user_key, c_iter, sub_compact, outputs);
1641
2.28k
      };
1642
1643
4.85k
  return {open_file_func, close_file_func};
1644
4.85k
}
1645
1646
Status CompactionJob::ProcessKeyValue(
1647
    SubcompactionState* sub_compact, ColumnFamilyData* cfd,
1648
    CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func,
1649
4.85k
    const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) {
1650
  // Cron interval for periodic operations: stats update, abort check,
1651
  // and sync points. Uses 1024 (power of 2) for efficient bitwise check.
1652
4.85k
  const uint64_t kCronEveryMask = (1 << 10) - 1;
1653
4.85k
  [[maybe_unused]] const std::optional<const Slice> end = sub_compact->end;
1654
1655
  // Check for abort signal before starting key processing
1656
4.85k
  if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
1657
0
    return Status::Incomplete(Status::SubCode::kCompactionAborted);
1658
0
  }
1659
1660
4.85k
  Status status;
1661
4.85k
  IterKey prev_iter_output_key;
1662
4.85k
  ParsedInternalKey prev_iter_output_internal_key;
1663
1664
4.85k
  TEST_SYNC_POINT_CALLBACK(
1665
4.85k
      "CompactionJob::ProcessKeyValueCompaction()::Processing",
1666
4.85k
      static_cast<void*>(const_cast<Compaction*>(sub_compact->compaction)));
1667
1668
7.48k
  while (status.ok() && !cfd->IsDropped() && c_iter->Valid() &&
1669
2.63k
         c_iter->status().ok()) {
1670
2.63k
    assert(!end.has_value() ||
1671
2.63k
           cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
1672
1673
2.63k
    const uint64_t num_records = c_iter->iter_stats().num_input_records;
1674
1675
    // Periodic cron operations: stats update, abort check.
1676
2.63k
    if ((num_records & kCronEveryMask) == kCronEveryMask) {
1677
      // Check for abort signal periodically
1678
0
      if (compaction_aborted_.load(std::memory_order_acquire) > 0) {
1679
0
        status = Status::Incomplete(Status::SubCode::kCompactionAborted);
1680
0
        break;
1681
0
      }
1682
1683
0
      UpdateSubcompactionJobStatsIncrementally(
1684
0
          c_iter, &sub_compact->compaction_job_stats,
1685
0
          db_options_.clock->CPUMicros(), prev_cpu_micros);
1686
0
    }
1687
1688
2.63k
    const auto& ikey = c_iter->ikey();
1689
2.63k
    bool use_proximal_output = ikey.sequence > proximal_after_seqno_;
1690
1691
#ifndef NDEBUG
1692
    if (sub_compact->compaction->SupportsPerKeyPlacement()) {
1693
      PerKeyPlacementContext context(sub_compact->compaction->output_level(),
1694
                                     ikey.user_key, c_iter->value(),
1695
                                     ikey.sequence, use_proximal_output);
1696
      TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
1697
                               &context);
1698
      if (use_proximal_output) {
1699
        // Verify that entries sent to the proximal level are within the
1700
        // allowed range (because the input key range of the last level could
1701
        // be larger than the allowed output key range of the proximal
1702
        // level). This check uses user keys (ignores sequence numbers) because
1703
        // compaction boundaries are a "clean cut" between user keys (see
1704
        // CompactionPicker::ExpandInputsToCleanCut()), which is especially
1705
        // important when preferred sequence numbers has been swapped in for
1706
        // kTypeValuePreferredSeqno / TimedPut.
1707
        sub_compact->compaction->TEST_AssertWithinProximalLevelOutputRange(
1708
            c_iter->user_key());
1709
      }
1710
    } else {
1711
      assert(proximal_after_seqno_ == kMaxSequenceNumber);
1712
      assert(!use_proximal_output);
1713
    }
1714
#endif  // NDEBUG
1715
1716
    // Add current compaction_iterator key to target compaction output, if the
1717
    // output file needs to be close or open, it will call the `open_file_func`
1718
    // and `close_file_func`.
1719
    // TODO: it would be better to have the compaction file open/close moved
1720
    // into `CompactionOutputs` which has the output file information.
1721
2.63k
    status = sub_compact->AddToOutput(*c_iter, use_proximal_output,
1722
2.63k
                                      open_file_func, close_file_func,
1723
2.63k
                                      prev_iter_output_internal_key);
1724
2.63k
    if (!status.ok()) {
1725
0
      break;
1726
0
    }
1727
1728
2.63k
    TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2",
1729
2.63k
                             static_cast<void*>(const_cast<std::atomic<bool>*>(
1730
2.63k
                                 &manual_compaction_canceled_)));
1731
1732
2.63k
    prev_iter_output_key.SetInternalKey(c_iter->key(),
1733
2.63k
                                        &prev_iter_output_internal_key);
1734
2.63k
    prev_iter_output_internal_key.sequence = ikey.sequence;
1735
2.63k
    prev_iter_output_internal_key.type = ikey.type;
1736
2.63k
    c_iter->Next();
1737
1738
#ifndef NDEBUG
1739
    bool stop = false;
1740
    TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop",
1741
                             static_cast<void*>(&stop));
1742
    if (stop) {
1743
      break;
1744
    }
1745
#endif  // NDEBUG
1746
2.63k
  }
1747
1748
4.85k
  return status;
1749
4.85k
}
1750
1751
void CompactionJob::UpdateSubcompactionJobStatsIncrementally(
1752
    CompactionIterator* c_iter, CompactionJobStats* compaction_job_stats,
1753
4.85k
    uint64_t cur_cpu_micros, uint64_t& prev_cpu_micros) {
1754
4.85k
  RecordDroppedKeys(c_iter->iter_stats(), compaction_job_stats);
1755
4.85k
  c_iter->ResetRecordCounts();
1756
4.85k
  RecordCompactionIOStats();
1757
1758
4.85k
  assert(cur_cpu_micros >= prev_cpu_micros);
1759
4.85k
  RecordTick(stats_, COMPACTION_CPU_TOTAL_TIME,
1760
4.85k
             cur_cpu_micros - prev_cpu_micros);
1761
4.85k
  prev_cpu_micros = cur_cpu_micros;
1762
4.85k
}
1763
1764
void CompactionJob::FinalizeSubcompactionJobStats(
1765
    SubcompactionState* sub_compact, CompactionIterator* c_iter,
1766
    uint64_t start_cpu_micros, uint64_t prev_cpu_micros,
1767
4.85k
    const CompactionIOStatsSnapshot& io_stats) {
1768
4.85k
  const CompactionIterationStats& c_iter_stats = c_iter->iter_stats();
1769
1770
4.85k
  assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() ||
1771
4.85k
         c_iter->HasNumInputEntryScanned());
1772
4.85k
  sub_compact->compaction_job_stats.has_accurate_num_input_records &=
1773
4.85k
      c_iter->HasNumInputEntryScanned();
1774
4.85k
  sub_compact->compaction_job_stats.num_input_records +=
1775
4.85k
      c_iter->NumInputEntryScanned();
1776
4.85k
  sub_compact->compaction_job_stats.num_blobs_read =
1777
4.85k
      c_iter_stats.num_blobs_read;
1778
4.85k
  sub_compact->compaction_job_stats.total_blob_bytes_read =
1779
4.85k
      c_iter_stats.total_blob_bytes_read;
1780
4.85k
  sub_compact->compaction_job_stats.num_input_deletion_records =
1781
4.85k
      c_iter_stats.num_input_deletion_records;
1782
4.85k
  sub_compact->compaction_job_stats.num_corrupt_keys =
1783
4.85k
      c_iter_stats.num_input_corrupt_records;
1784
4.85k
  sub_compact->compaction_job_stats.num_single_del_fallthru =
1785
4.85k
      c_iter_stats.num_single_del_fallthru;
1786
4.85k
  sub_compact->compaction_job_stats.num_single_del_mismatch =
1787
4.85k
      c_iter_stats.num_single_del_mismatch;
1788
4.85k
  sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
1789
4.85k
      c_iter_stats.total_input_raw_key_bytes;
1790
4.85k
  sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
1791
4.85k
      c_iter_stats.total_input_raw_value_bytes;
1792
1793
4.85k
  RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1794
4.85k
             c_iter_stats.total_filter_time);
1795
1796
4.85k
  if (c_iter_stats.num_blobs_relocated > 0) {
1797
0
    RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
1798
0
               c_iter_stats.num_blobs_relocated);
1799
0
  }
1800
4.85k
  if (c_iter_stats.total_blob_bytes_relocated > 0) {
1801
0
    RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
1802
0
               c_iter_stats.total_blob_bytes_relocated);
1803
0
  }
1804
1805
4.85k
  uint64_t cur_cpu_micros = db_options_.clock->CPUMicros();
1806
1807
  // Record final compaction statistics including dropped keys, I/O stats,
1808
  // and CPU time delta from the last periodic measurement
1809
4.85k
  UpdateSubcompactionJobStatsIncrementally(c_iter,
1810
4.85k
                                           &sub_compact->compaction_job_stats,
1811
4.85k
                                           cur_cpu_micros, prev_cpu_micros);
1812
1813
  // Finalize timing and I/O statistics
1814
4.85k
  sub_compact->compaction_job_stats.cpu_micros =
1815
4.85k
      cur_cpu_micros - start_cpu_micros + sub_compact->GetWorkerCPUMicros();
1816
1817
4.85k
  if (measure_io_stats_) {
1818
0
    sub_compact->compaction_job_stats.file_write_nanos +=
1819
0
        IOSTATS(write_nanos) - io_stats.prev_write_nanos;
1820
0
    sub_compact->compaction_job_stats.file_fsync_nanos +=
1821
0
        IOSTATS(fsync_nanos) - io_stats.prev_fsync_nanos;
1822
0
    sub_compact->compaction_job_stats.file_range_sync_nanos +=
1823
0
        IOSTATS(range_sync_nanos) - io_stats.prev_range_sync_nanos;
1824
0
    sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1825
0
        IOSTATS(prepare_write_nanos) - io_stats.prev_prepare_write_nanos;
1826
0
    sub_compact->compaction_job_stats.cpu_micros -=
1827
0
        (IOSTATS(cpu_write_nanos) - io_stats.prev_cpu_write_nanos +
1828
0
         IOSTATS(cpu_read_nanos) - io_stats.prev_cpu_read_nanos) /
1829
0
        1000;
1830
0
    if (io_stats.prev_perf_level !=
1831
0
        PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1832
0
      SetPerfLevel(io_stats.prev_perf_level);
1833
0
    }
1834
0
  }
1835
4.85k
}
1836
1837
Status CompactionJob::FinalizeProcessKeyValueStatus(
1838
    ColumnFamilyData* cfd, InternalIterator* input_iter,
1839
4.85k
    CompactionIterator* c_iter, Status status) {
1840
4.85k
  if (status.ok() && cfd->IsDropped()) {
1841
0
    status =
1842
0
        Status::ColumnFamilyDropped("Column family dropped during compaction");
1843
0
  }
1844
4.85k
  if (status.ok() && shutting_down_->load(std::memory_order_relaxed)) {
1845
2.12k
    status = Status::ShutdownInProgress("Database shutdown");
1846
2.12k
  }
1847
4.85k
  if (status.ok() &&
1848
2.72k
      (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
1849
0
    status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1850
0
  }
1851
4.85k
  if (status.ok()) {
1852
2.72k
    status = input_iter->status();
1853
2.72k
  }
1854
4.85k
  if (status.ok()) {
1855
2.72k
    status = c_iter->status();
1856
2.72k
  }
1857
1858
4.85k
  return status;
1859
4.85k
}
1860
1861
Status CompactionJob::CleanupCompactionFiles(
1862
    SubcompactionState* sub_compact, Status status,
1863
    const CompactionFileOpenFunc& open_file_func,
1864
4.85k
    const CompactionFileCloseFunc& close_file_func) {
1865
  // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1866
  // close the output files. Open file function is also passed, in case there's
1867
  // only range-dels, no file was opened, to save the range-dels, it need to
1868
  // create a new output file.
1869
4.85k
  return sub_compact->CloseCompactionFiles(status, open_file_func,
1870
4.85k
                                           close_file_func);
1871
4.85k
}
1872
1873
Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact,
1874
                                        BlobFileBuilder* blob_file_builder,
1875
4.85k
                                        Status status) {
1876
4.85k
  if (blob_file_builder) {
1877
0
    if (status.ok()) {
1878
0
      status = blob_file_builder->Finish();
1879
0
    } else {
1880
0
      blob_file_builder->Abandon(status);
1881
0
    }
1882
0
    sub_compact->Current().UpdateBlobStats();
1883
0
  }
1884
1885
4.85k
  return status;
1886
4.85k
}
1887
1888
4.85k
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
1889
4.85k
  TEST_SYNC_POINT("CompactionJob::ProcessKeyValueCompaction:Start");
1890
4.85k
  assert(sub_compact);
1891
4.85k
  assert(sub_compact->compaction);
1892
1893
4.85k
  if (!ShouldUseLocalCompaction(sub_compact)) {
1894
0
    return;
1895
0
  }
1896
1897
4.85k
  AutoThreadOperationStageUpdater stage_updater(
1898
4.85k
      ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
1899
1900
4.85k
  const uint64_t start_cpu_micros = db_options_.clock->CPUMicros();
1901
4.85k
  uint64_t prev_cpu_micros = start_cpu_micros;
1902
4.85k
  const CompactionIOStatsSnapshot io_stats = InitializeIOStats();
1903
4.85k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1904
4.85k
  const CompactionFilter* compaction_filter;
1905
4.85k
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
1906
4.85k
  Status filter_status = SetupAndValidateCompactionFilter(
1907
4.85k
      sub_compact, cfd->ioptions().compaction_filter, compaction_filter,
1908
4.85k
      compaction_filter_from_factory);
1909
4.85k
  if (!filter_status.ok()) {
1910
0
    sub_compact->status = filter_status;
1911
0
    return;
1912
0
  }
1913
1914
4.85k
  NotifyOnSubcompactionBegin(sub_compact);
1915
1916
4.85k
  SubcompactionKeyBoundaries boundaries(sub_compact->start, sub_compact->end);
1917
4.85k
  SubcompactionInternalIterators iterators;
1918
4.85k
  ReadOptions read_options;
1919
4.85k
  const WriteOptions write_options(Env::IOPriority::IO_LOW,
1920
4.85k
                                   Env::IOActivity::kCompaction);
1921
1922
4.85k
  InternalIterator* input_iter = CreateInputIterator(
1923
4.85k
      sub_compact, cfd, iterators, boundaries, read_options);
1924
1925
4.85k
  assert(input_iter);
1926
1927
4.85k
  Status status =
1928
4.85k
      MaybeResumeSubcompactionProgressOnInputIterator(sub_compact, input_iter);
1929
1930
4.85k
  if (status.IsNotFound()) {
1931
4.85k
    input_iter->SeekToFirst();
1932
4.85k
  } else if (!status.ok()) {
1933
0
    sub_compact->status = status;
1934
0
    return;
1935
0
  }
1936
1937
4.85k
  MergeHelper merge(
1938
4.85k
      env_, cfd->user_comparator(), cfd->ioptions().merge_operator.get(),
1939
4.85k
      compaction_filter, db_options_.info_log.get(),
1940
4.85k
      false /* internal key corruption is expected */,
1941
4.85k
      job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker,
1942
4.85k
      compact_->compaction->level(), db_options_.stats);
1943
4.85k
  std::unique_ptr<BlobFileBuilder> blob_file_builder;
1944
1945
4.85k
  auto c_iter =
1946
4.85k
      CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter,
1947
4.85k
                               merge, blob_file_builder, write_options);
1948
4.85k
  assert(c_iter);
1949
4.85k
  c_iter->SeekToFirst();
1950
1951
4.85k
  TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
1952
4.85k
  TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1",
1953
4.85k
                           static_cast<void*>(const_cast<std::atomic<bool>*>(
1954
4.85k
                               &manual_compaction_canceled_)));
1955
1956
4.85k
  auto [open_file_func, close_file_func] =
1957
4.85k
      CreateFileHandlers(sub_compact, boundaries);
1958
1959
4.85k
  status = ProcessKeyValue(sub_compact, cfd, c_iter.get(), open_file_func,
1960
4.85k
                           close_file_func, prev_cpu_micros);
1961
1962
4.85k
  status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status);
1963
1964
4.85k
  FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func,
1965
4.85k
                        blob_file_builder.get(), c_iter.get(), input_iter,
1966
4.85k
                        start_cpu_micros, prev_cpu_micros, io_stats);
1967
1968
4.85k
  NotifyOnSubcompactionCompleted(sub_compact);
1969
4.85k
}
1970
1971
void CompactionJob::FinalizeSubcompaction(
1972
    SubcompactionState* sub_compact, Status status,
1973
    const CompactionFileOpenFunc& open_file_func,
1974
    const CompactionFileCloseFunc& close_file_func,
1975
    BlobFileBuilder* blob_file_builder, CompactionIterator* c_iter,
1976
    [[maybe_unused]] InternalIterator* input_iter, uint64_t start_cpu_micros,
1977
4.85k
    uint64_t prev_cpu_micros, const CompactionIOStatsSnapshot& io_stats) {
1978
4.85k
  status = CleanupCompactionFiles(sub_compact, status, open_file_func,
1979
4.85k
                                  close_file_func);
1980
4.85k
  status = FinalizeBlobFiles(sub_compact, blob_file_builder, status);
1981
1982
4.85k
  FinalizeSubcompactionJobStats(sub_compact, c_iter, start_cpu_micros,
1983
4.85k
                                prev_cpu_micros, io_stats);
1984
1985
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1986
  if (!status.ok()) {
1987
    if (c_iter) {
1988
      c_iter->status().PermitUncheckedError();
1989
    }
1990
    if (input_iter) {
1991
      input_iter->status().PermitUncheckedError();
1992
    }
1993
  }
1994
#endif  // ROCKSDB_ASSERT_STATUS_CHECKED
1995
1996
4.85k
  sub_compact->status = status;
1997
4.85k
}
1998
1999
0
uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const {
2000
0
  return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id;
2001
0
}
2002
2003
void CompactionJob::RecordDroppedKeys(
2004
    const CompactionIterationStats& c_iter_stats,
2005
6.42k
    CompactionJobStats* compaction_job_stats) {
2006
6.42k
  if (c_iter_stats.num_record_drop_user > 0) {
2007
0
    RecordTick(stats_, COMPACTION_KEY_DROP_USER,
2008
0
               c_iter_stats.num_record_drop_user);
2009
0
  }
2010
6.42k
  if (c_iter_stats.num_record_drop_hidden > 0) {
2011
2.13k
    RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
2012
2.13k
               c_iter_stats.num_record_drop_hidden);
2013
2.13k
    if (compaction_job_stats) {
2014
2.13k
      compaction_job_stats->num_records_replaced +=
2015
2.13k
          c_iter_stats.num_record_drop_hidden;
2016
2.13k
    }
2017
2.13k
  }
2018
6.42k
  if (c_iter_stats.num_record_drop_obsolete > 0) {
2019
1.75k
    RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
2020
1.75k
               c_iter_stats.num_record_drop_obsolete);
2021
1.75k
    if (compaction_job_stats) {
2022
1.75k
      compaction_job_stats->num_expired_deletion_records +=
2023
1.75k
          c_iter_stats.num_record_drop_obsolete;
2024
1.75k
    }
2025
1.75k
  }
2026
6.42k
  if (c_iter_stats.num_record_drop_range_del > 0) {
2027
0
    RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
2028
0
               c_iter_stats.num_record_drop_range_del);
2029
0
  }
2030
6.42k
  if (c_iter_stats.num_range_del_drop_obsolete > 0) {
2031
0
    RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
2032
0
               c_iter_stats.num_range_del_drop_obsolete);
2033
0
  }
2034
6.42k
  if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
2035
0
    RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
2036
0
               c_iter_stats.num_optimized_del_drop_obsolete);
2037
0
  }
2038
6.42k
}
2039
2040
Status CompactionJob::FinishCompactionOutputFile(
2041
    const Status& input_status,
2042
    const ParsedInternalKey& prev_iter_output_internal_key,
2043
    const Slice& next_table_min_key, const Slice* comp_start_user_key,
2044
    const Slice* comp_end_user_key, const CompactionIterator* c_iter,
2045
2.28k
    SubcompactionState* sub_compact, CompactionOutputs& outputs) {
2046
2.28k
  AutoThreadOperationStageUpdater stage_updater(
2047
2.28k
      ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
2048
2.28k
  assert(sub_compact != nullptr);
2049
2.28k
  assert(outputs.HasBuilder());
2050
2051
2.28k
  FileMetaData* meta = outputs.GetMetaData();
2052
2.28k
  uint64_t output_number = meta->fd.GetNumber();
2053
2.28k
  assert(output_number != 0);
2054
2055
2.28k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
2056
2.28k
  std::string file_checksum = kUnknownFileChecksum;
2057
2.28k
  std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
2058
2059
  // Check for iterator errors
2060
2.28k
  Status s = input_status;
2061
2062
  // Add range tombstones
2063
2.28k
  if (s.ok()) {
2064
    // Inclusive lower bound, exclusive upper bound
2065
1.57k
    std::pair<SequenceNumber, SequenceNumber> keep_seqno_range{
2066
1.57k
        0, kMaxSequenceNumber};
2067
1.57k
    if (sub_compact->compaction->SupportsPerKeyPlacement()) {
2068
      // Point entries are routed to proximal output only when their seqno is
2069
      // strictly greater than `proximal_after_seqno_`. Range tombstones use a
2070
      // [lower, upper) filter, so split them at the next seqno to preserve the
2071
      // same boundary.
2072
0
      SequenceNumber range_del_split_seqno = proximal_after_seqno_;
2073
0
      if (range_del_split_seqno < kMaxSequenceNumber) {
2074
0
        range_del_split_seqno++;
2075
0
      }
2076
0
      if (outputs.IsProximalLevel()) {
2077
0
        keep_seqno_range.first = range_del_split_seqno;
2078
0
      } else {
2079
0
        keep_seqno_range.second = range_del_split_seqno;
2080
0
      }
2081
0
    }
2082
1.57k
    CompactionIterationStats range_del_out_stats;
2083
    // NOTE1: Use `bottommost_level_ = true` for both bottommost and
2084
    // output_to_proximal_level compaction here, as it's only used to decide
2085
    // if range dels could be dropped. (Logically, we are taking a single sorted
2086
    // run returned from CompactionIterator and physically splitting it between
2087
    // two output levels.)
2088
    // NOTE2: with per-key placement, range tombstones will be filtered on
2089
    // each output level based on sequence number (traversed twice). This is
2090
    // CPU-inefficient for a large number of range tombstones, but that would
2091
    // be an unusual work load.
2092
1.57k
    if (sub_compact->HasRangeDel()) {
2093
0
      s = outputs.AddRangeDels(*sub_compact->RangeDelAgg(), comp_start_user_key,
2094
0
                               comp_end_user_key, range_del_out_stats,
2095
0
                               bottommost_level_, cfd->internal_comparator(),
2096
0
                               earliest_snapshot_, keep_seqno_range,
2097
0
                               next_table_min_key, full_history_ts_low_);
2098
0
    }
2099
1.57k
    RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
2100
1.57k
    TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
2101
1.57k
  }
2102
2103
2.28k
  const uint64_t current_entries = outputs.NumEntries();
2104
2105
2.28k
  s = outputs.Finish(s, seqno_to_time_mapping_);
2106
2.28k
  TEST_SYNC_POINT_CALLBACK(
2107
2.28k
      "CompactionJob::FinishCompactionOutputFile()::AfterFinish", &s);
2108
2109
2.28k
  if (s.ok()) {
2110
    // With accurate smallest and largest key, we can get a slightly more
2111
    // accurate oldest ancester time.
2112
    // This makes oldest ancester time in manifest more accurate than in
2113
    // table properties. Not sure how to resolve it.
2114
1.57k
    if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
2115
1.57k
      uint64_t refined_oldest_ancester_time;
2116
1.57k
      Slice new_smallest = meta->smallest.user_key();
2117
1.57k
      Slice new_largest = meta->largest.user_key();
2118
1.57k
      if (!new_largest.empty() && !new_smallest.empty()) {
2119
439
        refined_oldest_ancester_time =
2120
439
            sub_compact->compaction->MinInputFileOldestAncesterTime(
2121
439
                &(meta->smallest), &(meta->largest));
2122
439
        if (refined_oldest_ancester_time !=
2123
439
            std::numeric_limits<uint64_t>::max()) {
2124
127
          meta->oldest_ancester_time = refined_oldest_ancester_time;
2125
127
        }
2126
439
      }
2127
1.57k
    }
2128
1.57k
  }
2129
2130
  // Finish and check for file errors
2131
2.28k
  IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_,
2132
2.28k
                                          db_options_.use_fsync);
2133
2134
2.28k
  if (s.ok() && io_s.ok()) {
2135
1.57k
    file_checksum = meta->file_checksum;
2136
1.57k
    file_checksum_func_name = meta->file_checksum_func_name;
2137
1.57k
  }
2138
2139
2.28k
  if (s.ok()) {
2140
1.57k
    s = io_s;
2141
1.57k
  }
2142
2.28k
  if (sub_compact->io_status.ok()) {
2143
2.28k
    sub_compact->io_status = io_s;
2144
    // Since this error is really a copy of the
2145
    // "normal" status, it does not also need to be checked
2146
2.28k
    sub_compact->io_status.PermitUncheckedError();
2147
2.28k
  }
2148
2149
2.28k
  TableProperties tp;
2150
2.28k
  if (s.ok()) {
2151
1.57k
    tp = outputs.GetTableProperties();
2152
1.57k
  }
2153
2.28k
  if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
2154
    // If there is nothing to output, no necessary to generate a sst file.
2155
    // This happens when the output level is bottom level, at the same time
2156
    // the sub_compact output nothing.
2157
0
    std::string fname = GetTableFileName(meta->fd.GetNumber());
2158
2159
    // TODO(AR) it is not clear if there are any larger implications if
2160
    // DeleteFile fails here
2161
0
    Status ds = env_->DeleteFile(fname);
2162
0
    if (!ds.ok()) {
2163
0
      ROCKS_LOG_WARN(
2164
0
          db_options_.info_log,
2165
0
          "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
2166
0
          " at bottom level%s",
2167
0
          cfd->GetName().c_str(), job_id_, output_number,
2168
0
          meta->marked_for_compaction ? " (need compaction)" : "");
2169
0
    }
2170
2171
    // Also need to remove the file from outputs, or it will be added to the
2172
    // VersionEdit.
2173
0
    outputs.RemoveLastOutput();
2174
0
    meta = nullptr;
2175
0
  }
2176
2177
2.28k
  if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
2178
    // Output to event logger and fire events.
2179
1.57k
    outputs.UpdateTableProperties();
2180
1.57k
    ROCKS_LOG_INFO(db_options_.info_log,
2181
1.57k
                   "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
2182
1.57k
                   " keys, %" PRIu64 " bytes%s, temperature: %s",
2183
1.57k
                   cfd->GetName().c_str(), job_id_, output_number,
2184
1.57k
                   current_entries, meta->fd.file_size,
2185
1.57k
                   meta->marked_for_compaction ? " (need compaction)" : "",
2186
1.57k
                   temperature_to_string[meta->temperature].c_str());
2187
1.57k
  }
2188
2.28k
  std::string fname;
2189
2.28k
  FileDescriptor output_fd;
2190
2.28k
  uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
2191
2.28k
  Status status_for_listener = s;
2192
2.28k
  if (meta != nullptr) {
2193
2.28k
    fname = GetTableFileName(meta->fd.GetNumber());
2194
2.28k
    output_fd = meta->fd;
2195
2.28k
    oldest_blob_file_number = meta->oldest_blob_file_number;
2196
2.28k
  } else {
2197
0
    fname = "(nil)";
2198
0
    if (s.ok()) {
2199
0
      status_for_listener = Status::Aborted("Empty SST file not kept");
2200
0
    }
2201
0
  }
2202
2.28k
  EventHelpers::LogAndNotifyTableFileCreationFinished(
2203
2.28k
      event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(), fname,
2204
2.28k
      job_id_, output_fd, oldest_blob_file_number, tp,
2205
2.28k
      TableFileCreationReason::kCompaction, status_for_listener, file_checksum,
2206
2.28k
      file_checksum_func_name);
2207
2208
  // Report new file to SstFileManagerImpl
2209
2.28k
  auto sfm =
2210
2.28k
      static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
2211
2.28k
  if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
2212
2.28k
    Status add_s = sfm->OnAddFile(fname);
2213
2.28k
    if (!add_s.ok() && s.ok()) {
2214
0
      s = add_s;
2215
0
    }
2216
2.28k
    if (sfm->IsMaxAllowedSpaceReached()) {
2217
      // TODO(ajkr): should we return OK() if max space was reached by the final
2218
      // compaction output file (similarly to how flush works when full)?
2219
0
      s = Status::SpaceLimit("Max allowed space was reached");
2220
0
      TEST_SYNC_POINT(
2221
0
          "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
2222
0
      InstrumentedMutexLock l(db_mutex_);
2223
0
      db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
2224
0
    }
2225
2.28k
  }
2226
2227
2.28k
  if (s.ok() && ShouldUpdateSubcompactionProgress(sub_compact, c_iter,
2228
1.57k
                                                  prev_iter_output_internal_key,
2229
1.57k
                                                  next_table_min_key, meta)) {
2230
0
    UpdateSubcompactionProgress(c_iter, next_table_min_key, sub_compact);
2231
0
    s = PersistSubcompactionProgress(sub_compact);
2232
0
  }
2233
2.28k
  outputs.ResetBuilder();
2234
2.28k
  return s;
2235
2.28k
}
2236
2237
bool CompactionJob::ShouldUpdateSubcompactionProgress(
2238
    const SubcompactionState* sub_compact, const CompactionIterator* c_iter,
2239
    const ParsedInternalKey& prev_iter_output_internal_key,
2240
1.57k
    const Slice& next_table_min_internal_key, const FileMetaData* meta) const {
2241
1.57k
  const auto* cfd = sub_compact->compaction->column_family_data();
2242
  // No need to update when the progress will not get persisted
2243
1.57k
  if (compaction_progress_writer_ == nullptr) {
2244
1.57k
    return false;
2245
1.57k
  }
2246
2247
  // No need to update for a new empty output
2248
0
  if (meta == nullptr) {
2249
0
    return false;
2250
0
  }
2251
2252
  // TODO(hx235): save progress even on the last output file
2253
0
  if (next_table_min_internal_key.empty()) {
2254
0
    return false;
2255
0
  }
2256
2257
  // LIMITATION: Persisting compaction progress with timestamp
2258
  // is not supported since the feature of persisting timestamp of the key in
2259
  // SST files itself is still experimental
2260
0
  size_t ts_sz = cfd->user_comparator()->timestamp_size();
2261
0
  if (ts_sz > 0) {
2262
0
    return false;
2263
0
  }
2264
2265
  // LIMITATION: Compaction progress persistence disabled for file boundaries
2266
  // containing range deletions. Range deletions can span file boundaries,
2267
  // making it difficult to ensure adjacent output tables have different user
2268
  // keys. See the last check for why different users keys of adjacent output
2269
  // tables are needed
2270
0
  const ValueType next_table_min_internal_key_type =
2271
0
      ExtractValueType(next_table_min_internal_key);
2272
0
  const ValueType prev_iter_output_internal_key_type =
2273
0
      prev_iter_output_internal_key.user_key.empty()
2274
0
          ? ValueType::kTypeValue
2275
0
          : prev_iter_output_internal_key.type;
2276
2277
  // Range deletes truncated to align with file boundaries may be output by the
2278
  // compaction iterator with `ValueType::kTypeMaxValid` instead of the original
2279
  // type.
2280
0
  if ((next_table_min_internal_key_type == ValueType::kTypeRangeDeletion ||
2281
0
       next_table_min_internal_key_type == ValueType::kTypeMaxValid) ||
2282
0
      (prev_iter_output_internal_key_type == ValueType::kTypeRangeDeletion ||
2283
0
       prev_iter_output_internal_key_type == ValueType::kTypeMaxValid)) {
2284
0
    return false;
2285
0
  }
2286
2287
  // LIMITATION: Compaction progress persistence disabled when adjacent output
2288
  // tables share the same user key at boundaries. This ensures a simple Seek()
2289
  // of the next key when resuming can process all versions of a user key
2290
0
  const Slice next_table_min_user_key =
2291
0
      ExtractUserKey(next_table_min_internal_key);
2292
0
  const Slice prev_table_last_user_key =
2293
0
      prev_iter_output_internal_key.user_key.empty()
2294
0
          ? Slice()
2295
0
          : prev_iter_output_internal_key.user_key;
2296
2297
0
  if (cfd->user_comparator()->EqualWithoutTimestamp(next_table_min_user_key,
2298
0
                                                    prev_table_last_user_key)) {
2299
0
    return false;
2300
0
  }
2301
2302
  // LIMITATION: Don't save progress if the current key has already been scanned
2303
  // (looked ahead) in the input but not yet output. This can happen with merge
2304
  // operations, single deletes, and deletes at the bottommost level where
2305
  // CompactionIterator needs to look ahead to process multiple entries for the
2306
  // same user key before outputting a result. If we saved progress and resumed
2307
  // at this boundary, the resumed session would see and process the same input
2308
  // key again through Seek(), leading to incorrect double-counting in
2309
  // number of processed input entries and input count verification failure
2310
  //
2311
  // TODO(hx235): Offset num_processed_input_records to avoid double counting
2312
  // instead of disabling progress persistence.
2313
0
  if (c_iter->IsCurrentKeyAlreadyScanned()) {
2314
0
    return false;
2315
0
  }
2316
2317
0
  return true;
2318
0
}
2319
2320
2.72k
Status CompactionJob::InstallCompactionResults(bool* compaction_released) {
2321
2.72k
  assert(compact_);
2322
2323
2.72k
  db_mutex_->AssertHeld();
2324
2325
2.72k
  const ReadOptions read_options(Env::IOActivity::kCompaction);
2326
2.72k
  const WriteOptions write_options(Env::IOActivity::kCompaction);
2327
2328
2.72k
  auto* compaction = compact_->compaction;
2329
2.72k
  assert(compaction);
2330
2331
2.72k
  {
2332
2.72k
    Compaction::InputLevelSummaryBuffer inputs_summary;
2333
2.72k
    if (internal_stats_.has_proximal_level_output) {
2334
0
      ROCKS_LOG_BUFFER(
2335
0
          log_buffer_,
2336
0
          "[%s] [JOB %d] Compacted %s => output_to_proximal_level: %" PRIu64
2337
0
          " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
2338
0
          compaction->column_family_data()->GetName().c_str(), job_id_,
2339
0
          compaction->InputLevelSummary(&inputs_summary),
2340
0
          internal_stats_.proximal_level_stats.bytes_written,
2341
0
          internal_stats_.output_level_stats.bytes_written,
2342
0
          internal_stats_.TotalBytesWritten());
2343
2.72k
    } else {
2344
2.72k
      ROCKS_LOG_BUFFER(log_buffer_,
2345
2.72k
                       "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
2346
2.72k
                       compaction->column_family_data()->GetName().c_str(),
2347
2.72k
                       job_id_, compaction->InputLevelSummary(&inputs_summary),
2348
2.72k
                       internal_stats_.TotalBytesWritten());
2349
2.72k
    }
2350
2.72k
  }
2351
2352
2.72k
  VersionEdit* const edit = compaction->edit();
2353
2.72k
  assert(edit);
2354
2355
  // Add compaction inputs
2356
2.72k
  compaction->AddInputDeletions(edit);
2357
2358
2.72k
  std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
2359
2360
2.72k
  for (const auto& sub_compact : compact_->sub_compact_states) {
2361
2.72k
    sub_compact.AddOutputsEdit(edit);
2362
2363
2.72k
    for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) {
2364
0
      edit->AddBlobFile(blob);
2365
0
    }
2366
2367
2.72k
    if (sub_compact.Current().GetBlobGarbageMeter()) {
2368
0
      const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows();
2369
2370
0
      for (const auto& pair : flows) {
2371
0
        const uint64_t blob_file_number = pair.first;
2372
0
        const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
2373
2374
0
        assert(flow.IsValid());
2375
0
        if (flow.HasGarbage()) {
2376
0
          blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
2377
0
                                                   flow.GetGarbageBytes());
2378
0
        }
2379
0
      }
2380
0
    }
2381
2.72k
  }
2382
2383
2.72k
  for (const auto& pair : blob_total_garbage) {
2384
0
    const uint64_t blob_file_number = pair.first;
2385
0
    const BlobGarbageMeter::BlobStats& stats = pair.second;
2386
2387
0
    edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
2388
0
                             stats.GetBytes());
2389
0
  }
2390
2391
2.72k
  if ((compaction->compaction_reason() ==
2392
2.72k
           CompactionReason::kLevelMaxLevelSize ||
2393
2.72k
       compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
2394
0
      compaction->immutable_options().compaction_pri == kRoundRobin) {
2395
0
    int start_level = compaction->start_level();
2396
0
    if (start_level > 0) {
2397
0
      auto vstorage = compaction->input_version()->storage_info();
2398
0
      edit->AddCompactCursor(start_level,
2399
0
                             vstorage->GetNextCompactCursor(
2400
0
                                 start_level, compaction->num_input_files(0)));
2401
0
    }
2402
0
  }
2403
2404
2.72k
  auto manifest_wcb = [&compaction, &compaction_released](const Status& s) {
2405
2.72k
    compaction->ReleaseCompactionFiles(s);
2406
2.72k
    *compaction_released = true;
2407
2.72k
  };
2408
2409
2.72k
  Status s;
2410
2.72k
  TEST_SYNC_POINT_CALLBACK(
2411
2.72k
      "CompactionJob::InstallCompactionResults:BeforeLogAndApply", &s);
2412
2.72k
  if (s.ok()) {
2413
2.72k
    s = versions_->LogAndApply(compaction->column_family_data(), read_options,
2414
2.72k
                               write_options, edit, db_mutex_, db_directory_,
2415
2.72k
                               /*new_descriptor_log=*/false,
2416
2.72k
                               /*column_family_options=*/nullptr, manifest_wcb);
2417
2.72k
  }
2418
2.72k
  return s;
2419
2.72k
}
2420
2421
9.70k
void CompactionJob::RecordCompactionIOStats() {
2422
9.70k
  RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
2423
9.70k
  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
2424
9.70k
  CompactionReason compaction_reason =
2425
9.70k
      compact_->compaction->compaction_reason();
2426
9.70k
  if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
2427
0
    RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
2428
0
    RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
2429
9.70k
  } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
2430
0
    RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
2431
0
    RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
2432
9.70k
  } else if (compaction_reason == CompactionReason::kTtl) {
2433
0
    RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
2434
0
    RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
2435
0
  }
2436
9.70k
  ThreadStatusUtil::IncreaseThreadOperationProperty(
2437
9.70k
      ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
2438
9.70k
  IOSTATS_RESET(bytes_read);
2439
9.70k
  ThreadStatusUtil::IncreaseThreadOperationProperty(
2440
9.70k
      ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
2441
9.70k
  IOSTATS_RESET(bytes_written);
2442
9.70k
}
2443
2444
Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
2445
2.28k
                                               CompactionOutputs& outputs) {
2446
2.28k
  assert(sub_compact != nullptr);
2447
2448
  // no need to lock because VersionSet::next_file_number_ is atomic
2449
2.28k
  uint64_t file_number = versions_->NewFileNumber();
2450
#ifndef NDEBUG
2451
  TEST_SYNC_POINT_CALLBACK(
2452
      "CompactionJob::OpenCompactionOutputFile::NewFileNumber", &file_number);
2453
#endif
2454
2.28k
  std::string fname = GetTableFileName(file_number);
2455
  // Fire events.
2456
2.28k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
2457
2.28k
  EventHelpers::NotifyTableFileCreationStarted(
2458
2.28k
      cfd->ioptions().listeners, dbname_, cfd->GetName(), fname, job_id_,
2459
2.28k
      TableFileCreationReason::kCompaction);
2460
  // Make the output file
2461
2.28k
  std::unique_ptr<FSWritableFile> writable_file;
2462
#ifndef NDEBUG
2463
  bool syncpoint_arg = file_options_.use_direct_writes;
2464
  TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
2465
                           &syncpoint_arg);
2466
#endif
2467
2468
  // Pass temperature of the last level files to FileSystem.
2469
2.28k
  FileOptions fo_copy = file_options_;
2470
2.28k
  auto temperature =
2471
2.28k
      sub_compact->compaction->GetOutputTemperature(outputs.IsProximalLevel());
2472
2.28k
  fo_copy.temperature = temperature;
2473
2.28k
  fo_copy.write_hint = write_hint_;
2474
2475
2.28k
  Status s;
2476
2.28k
  IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
2477
2.28k
  s = io_s;
2478
2.28k
  if (io_s.ok()) {
2479
    // Track the SST file path for cleanup on abort.
2480
2.28k
    outputs.AddOutputFilePath(fname);
2481
2.28k
  }
2482
2.28k
  if (sub_compact->io_status.ok()) {
2483
2.28k
    sub_compact->io_status = io_s;
2484
    // Since this error is really a copy of the io_s that is checked below as s,
2485
    // it does not also need to be checked.
2486
2.28k
    sub_compact->io_status.PermitUncheckedError();
2487
2.28k
  }
2488
2.28k
  if (!s.ok()) {
2489
0
    ROCKS_LOG_ERROR(
2490
0
        db_options_.info_log,
2491
0
        "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
2492
0
        " fails at NewWritableFile with status %s",
2493
0
        sub_compact->compaction->column_family_data()->GetName().c_str(),
2494
0
        job_id_, file_number, s.ToString().c_str());
2495
0
    LogFlush(db_options_.info_log);
2496
0
    EventHelpers::LogAndNotifyTableFileCreationFinished(
2497
0
        event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(),
2498
0
        fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
2499
0
        TableProperties(), TableFileCreationReason::kCompaction, s,
2500
0
        kUnknownFileChecksum, kUnknownFileChecksumFuncName);
2501
0
    return s;
2502
0
  }
2503
2504
  // Try to figure out the output file's oldest ancester time.
2505
2.28k
  int64_t temp_current_time = 0;
2506
2.28k
  auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
2507
  // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
2508
2.28k
  if (!get_time_status.ok()) {
2509
0
    ROCKS_LOG_WARN(db_options_.info_log,
2510
0
                   "Failed to get current time. Status: %s",
2511
0
                   get_time_status.ToString().c_str());
2512
0
  }
2513
2.28k
  uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2514
2.28k
  InternalKey tmp_start, tmp_end;
2515
2.28k
  if (sub_compact->start.has_value()) {
2516
0
    tmp_start.SetMinPossibleForUserKey(*(sub_compact->start));
2517
0
  }
2518
2.28k
  if (sub_compact->end.has_value()) {
2519
0
    tmp_end.SetMinPossibleForUserKey(*(sub_compact->end));
2520
0
  }
2521
2.28k
  uint64_t oldest_ancester_time =
2522
2.28k
      sub_compact->compaction->MinInputFileOldestAncesterTime(
2523
2.28k
          sub_compact->start.has_value() ? &tmp_start : nullptr,
2524
2.28k
          sub_compact->end.has_value() ? &tmp_end : nullptr);
2525
2.28k
  if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) {
2526
    // TODO: fix DBSSTTest.GetTotalSstFilesSize and use
2527
    //  kUnknownOldestAncesterTime
2528
0
    oldest_ancester_time = current_time;
2529
0
  }
2530
2531
2.28k
  uint64_t newest_key_time = sub_compact->compaction->MaxInputFileNewestKeyTime(
2532
2.28k
      sub_compact->start.has_value() ? &tmp_start : nullptr,
2533
2.28k
      sub_compact->end.has_value() ? &tmp_end : nullptr);
2534
2535
  // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
2536
2.28k
  uint64_t epoch_number = sub_compact->compaction->MinInputFileEpochNumber();
2537
2.28k
  {
2538
2.28k
    FileMetaData meta;
2539
2.28k
    meta.fd = FileDescriptor(file_number,
2540
2.28k
                             sub_compact->compaction->output_path_id(), 0);
2541
2.28k
    meta.oldest_ancester_time = oldest_ancester_time;
2542
2.28k
    meta.file_creation_time = current_time;
2543
2.28k
    meta.epoch_number = epoch_number;
2544
2.28k
    meta.temperature = temperature;
2545
2.28k
    assert(!db_id_.empty());
2546
2.28k
    assert(!db_session_id_.empty());
2547
2.28k
    s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
2548
2.28k
                               &meta.unique_id);
2549
2.28k
    if (!s.ok()) {
2550
0
      ROCKS_LOG_ERROR(db_options_.info_log,
2551
0
                      "[%s] [JOB %d] file #%" PRIu64
2552
0
                      " failed to generate unique id: %s.",
2553
0
                      cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
2554
0
                      s.ToString().c_str());
2555
0
      return s;
2556
0
    }
2557
2558
    // Enable hash computation if paranoid_file_checks is on or if
2559
    // verify_output_flags includes kVerifyIteration, so that
2560
    // VerifyOutputFiles() can compare the hash of the written data
2561
    // against a re-read of the output file.
2562
2.28k
    bool enable_output_hash =
2563
2.28k
        paranoid_file_checks_ ||
2564
2.28k
        !!(sub_compact->compaction->mutable_cf_options().verify_output_flags &
2565
2.28k
           VerifyOutputFlags::kVerifyIteration);
2566
2.28k
    outputs.AddOutput(std::move(meta), cfd->internal_comparator(),
2567
2.28k
                      enable_output_hash);
2568
2.28k
  }
2569
2570
0
  writable_file->SetIOPriority(GetRateLimiterPriority());
2571
  // Subsequent attempts to override the hint via SetWriteLifeTimeHint
2572
  // with the very same value will be ignored by the fs.
2573
2.28k
  writable_file->SetWriteLifeTimeHint(fo_copy.write_hint);
2574
2.28k
  FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
2575
2.28k
  writable_file->SetPreallocationBlockSize(static_cast<size_t>(
2576
2.28k
      sub_compact->compaction->OutputFilePreallocationSize()));
2577
2.28k
  const auto& listeners =
2578
2.28k
      sub_compact->compaction->immutable_options().listeners;
2579
2.28k
  outputs.AssignFileWriter(new WritableFileWriter(
2580
2.28k
      std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
2581
2.28k
      db_options_.stats, Histograms::SST_WRITE_MICROS, listeners,
2582
2.28k
      db_options_.file_checksum_gen_factory.get(),
2583
2.28k
      tmp_set.Contains(FileType::kTableFile), false));
2584
2585
  // TODO(hx235): pass in the correct `oldest_key_time` instead of `0`
2586
2.28k
  const ReadOptions read_options(Env::IOActivity::kCompaction);
2587
2.28k
  const WriteOptions write_options(Env::IOActivity::kCompaction);
2588
2.28k
  TableBuilderOptions tboptions(
2589
2.28k
      cfd->ioptions(), sub_compact->compaction->mutable_cf_options(),
2590
2.28k
      read_options, write_options, cfd->internal_comparator(),
2591
2.28k
      cfd->internal_tbl_prop_coll_factories(),
2592
2.28k
      sub_compact->compaction->output_compression(),
2593
2.28k
      sub_compact->compaction->output_compression_opts(), cfd->GetID(),
2594
2.28k
      cfd->GetName(), sub_compact->compaction->output_level(), newest_key_time,
2595
2.28k
      bottommost_level_, TableFileCreationReason::kCompaction,
2596
2.28k
      0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
2597
2.28k
      sub_compact->compaction->max_output_file_size(), file_number,
2598
2.28k
      proximal_after_seqno_ /*last_level_inclusive_max_seqno_threshold*/);
2599
2600
2.28k
  outputs.NewBuilder(tboptions);
2601
2602
2.28k
  LogFlush(db_options_.info_log);
2603
2.28k
  return s;
2604
2.28k
}
2605
2606
4.85k
void CompactionJob::CleanupCompaction() {
2607
4.85k
  for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
2608
4.85k
    sub_compact.Cleanup(table_cache_.get(), compact_->status);
2609
4.85k
  }
2610
4.85k
  delete compact_;
2611
4.85k
  compact_ = nullptr;
2612
4.85k
}
2613
2614
namespace {
2615
3.15k
void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
2616
3.15k
  assert(prefix_length > 0);
2617
3.15k
  size_t length = src.size() > prefix_length ? prefix_length : src.size();
2618
3.15k
  dst->assign(src.data(), length);
2619
3.15k
}
2620
}  // namespace
2621
2622
bool CompactionJob::UpdateInternalStatsFromInputFiles(
2623
4.85k
    uint64_t* num_input_range_del) {
2624
4.85k
  assert(compact_);
2625
2626
4.85k
  Compaction* compaction = compact_->compaction;
2627
4.85k
  internal_stats_.output_level_stats.num_input_files_in_non_output_levels = 0;
2628
4.85k
  internal_stats_.output_level_stats.num_input_files_in_output_level = 0;
2629
2630
4.85k
  bool has_error = false;
2631
4.85k
  const ReadOptions read_options(Env::IOActivity::kCompaction);
2632
4.85k
  const auto& input_table_properties = compaction->GetInputTableProperties();
2633
2634
  // Check all input files for old block-based SST format_version. Why? Old
2635
  // block-based SST files from roughly version 5.0 to 5.18 could produce
2636
  // inaccurate num_entries counts due to the evolution of its handling along
2637
  // with num_range_deletions. We have to disable some paranoid checks when
2638
  // compacting files from such an old release. However, we don't have great
2639
  // information to identify those files, so we heuristically over-approximate
2640
  // that set of files using
2641
  // (a) format_version < 5, which will be true for any files from RocksDB <
2642
  // 6.6.0 and should not be true for any recent production files
2643
  // (b) to avoid including non-block-based SST files (which still use older
2644
  // format_version markers, and do not support DeleteRange), we also require
2645
  // the presence of the user property "rocksdb.block.based.table.index.type",
2646
  // which was added in RocksDB 2.8 and is always present in block-based tables.
2647
22.4k
  for (const auto& tp_pair : input_table_properties) {
2648
22.4k
    if (tp_pair.second && tp_pair.second->format_version < 5) {
2649
      // Check for block-based table by looking for its index type property
2650
0
      const auto& user_props = tp_pair.second->user_collected_properties;
2651
0
      if (user_props.find(BlockBasedTablePropertyNames::kIndexType) !=
2652
0
          user_props.end()) {
2653
0
        job_stats_->has_accurate_num_input_records = false;
2654
0
        break;
2655
0
      }
2656
0
    }
2657
22.4k
  }
2658
2659
4.85k
  for (int input_level = 0;
2660
12.4k
       input_level < static_cast<int>(compaction->num_input_levels());
2661
7.57k
       ++input_level) {
2662
7.57k
    const LevelFilesBrief* flevel = compaction->input_levels(input_level);
2663
7.57k
    size_t num_input_files = flevel->num_files;
2664
7.57k
    uint64_t* bytes_read;
2665
7.57k
    if (compaction->level(input_level) != compaction->output_level()) {
2666
3.25k
      internal_stats_.output_level_stats.num_input_files_in_non_output_levels +=
2667
3.25k
          static_cast<int>(num_input_files);
2668
3.25k
      bytes_read =
2669
3.25k
          &internal_stats_.output_level_stats.bytes_read_non_output_levels;
2670
4.31k
    } else {
2671
4.31k
      internal_stats_.output_level_stats.num_input_files_in_output_level +=
2672
4.31k
          static_cast<int>(num_input_files);
2673
4.31k
      bytes_read = &internal_stats_.output_level_stats.bytes_read_output_level;
2674
4.31k
    }
2675
30.0k
    for (size_t i = 0; i < num_input_files; ++i) {
2676
22.4k
      const FileMetaData* file_meta = flevel->files[i].file_metadata;
2677
22.4k
      *bytes_read += file_meta->fd.GetFileSize();
2678
22.4k
      uint64_t file_input_entries = file_meta->num_entries;
2679
22.4k
      uint64_t file_num_range_del = file_meta->num_range_deletions;
2680
22.4k
      if (file_input_entries == 0) {
2681
0
        uint64_t file_number = file_meta->fd.GetNumber();
2682
        // Try getting info from table property
2683
0
        std::string fn = TableFileName(compaction->immutable_options().cf_paths,
2684
0
                                       file_number, file_meta->fd.GetPathId());
2685
0
        const auto& tp = input_table_properties.find(fn);
2686
0
        if (tp != input_table_properties.end()) {
2687
0
          file_input_entries = tp->second->num_entries;
2688
0
          file_num_range_del = tp->second->num_range_deletions;
2689
0
        } else {
2690
0
          has_error = true;
2691
0
        }
2692
0
      }
2693
22.4k
      internal_stats_.output_level_stats.num_input_records +=
2694
22.4k
          file_input_entries;
2695
22.4k
      if (num_input_range_del) {
2696
22.4k
        *num_input_range_del += file_num_range_del;
2697
22.4k
      }
2698
22.4k
    }
2699
2700
7.57k
    const std::vector<FileMetaData*>& filtered_flevel =
2701
7.57k
        compaction->filtered_input_levels(input_level);
2702
7.57k
    size_t num_filtered_input_files = filtered_flevel.size();
2703
7.57k
    uint64_t* bytes_skipped;
2704
7.57k
    if (compaction->level(input_level) != compaction->output_level()) {
2705
3.25k
      internal_stats_.output_level_stats
2706
3.25k
          .num_filtered_input_files_in_non_output_levels +=
2707
3.25k
          static_cast<int>(num_filtered_input_files);
2708
3.25k
      bytes_skipped =
2709
3.25k
          &internal_stats_.output_level_stats.bytes_skipped_non_output_levels;
2710
4.31k
    } else {
2711
4.31k
      internal_stats_.output_level_stats
2712
4.31k
          .num_filtered_input_files_in_output_level +=
2713
4.31k
          static_cast<int>(num_filtered_input_files);
2714
4.31k
      bytes_skipped =
2715
4.31k
          &internal_stats_.output_level_stats.bytes_skipped_output_level;
2716
4.31k
    }
2717
7.57k
    for (const FileMetaData* filtered_file_meta : filtered_flevel) {
2718
0
      *bytes_skipped += filtered_file_meta->fd.GetFileSize();
2719
0
    }
2720
7.57k
  }
2721
2722
  // TODO - find a better place to set these two
2723
4.85k
  assert(job_stats_);
2724
4.85k
  internal_stats_.output_level_stats.bytes_read_blob =
2725
4.85k
      job_stats_->total_blob_bytes_read;
2726
4.85k
  internal_stats_.output_level_stats.num_dropped_records =
2727
4.85k
      internal_stats_.DroppedRecords();
2728
4.85k
  return !has_error;
2729
4.85k
}
2730
2731
void CompactionJob::UpdateCompactionJobInputStatsFromInternalStats(
2732
    const InternalStats::CompactionStatsFull& internal_stats,
2733
4.85k
    uint64_t num_input_range_del) const {
2734
4.85k
  assert(job_stats_);
2735
  // input information
2736
4.85k
  job_stats_->total_input_bytes =
2737
4.85k
      internal_stats.output_level_stats.bytes_read_non_output_levels +
2738
4.85k
      internal_stats.output_level_stats.bytes_read_output_level;
2739
4.85k
  job_stats_->num_input_records =
2740
4.85k
      internal_stats.output_level_stats.num_input_records - num_input_range_del;
2741
4.85k
  job_stats_->num_input_files =
2742
4.85k
      internal_stats.output_level_stats.num_input_files_in_non_output_levels +
2743
4.85k
      internal_stats.output_level_stats.num_input_files_in_output_level;
2744
4.85k
  job_stats_->num_input_files_at_output_level =
2745
4.85k
      internal_stats.output_level_stats.num_input_files_in_output_level;
2746
4.85k
  job_stats_->num_filtered_input_files =
2747
4.85k
      internal_stats.output_level_stats
2748
4.85k
          .num_filtered_input_files_in_non_output_levels +
2749
4.85k
      internal_stats.output_level_stats
2750
4.85k
          .num_filtered_input_files_in_output_level;
2751
4.85k
  job_stats_->num_filtered_input_files_at_output_level =
2752
4.85k
      internal_stats.output_level_stats
2753
4.85k
          .num_filtered_input_files_in_output_level;
2754
4.85k
  job_stats_->total_skipped_input_bytes =
2755
4.85k
      internal_stats.output_level_stats.bytes_skipped_non_output_levels +
2756
4.85k
      internal_stats.output_level_stats.bytes_skipped_output_level;
2757
2758
4.85k
  if (internal_stats.has_proximal_level_output) {
2759
0
    job_stats_->total_input_bytes +=
2760
0
        internal_stats.proximal_level_stats.bytes_read_non_output_levels +
2761
0
        internal_stats.proximal_level_stats.bytes_read_output_level;
2762
0
    job_stats_->num_input_records +=
2763
0
        internal_stats.proximal_level_stats.num_input_records;
2764
0
    job_stats_->num_input_files +=
2765
0
        internal_stats.proximal_level_stats
2766
0
            .num_input_files_in_non_output_levels +
2767
0
        internal_stats.proximal_level_stats.num_input_files_in_output_level;
2768
0
    job_stats_->num_input_files_at_output_level +=
2769
0
        internal_stats.proximal_level_stats.num_input_files_in_output_level;
2770
0
    job_stats_->num_filtered_input_files +=
2771
0
        internal_stats.proximal_level_stats
2772
0
            .num_filtered_input_files_in_non_output_levels +
2773
0
        internal_stats.proximal_level_stats
2774
0
            .num_filtered_input_files_in_output_level;
2775
0
    job_stats_->num_filtered_input_files_at_output_level +=
2776
0
        internal_stats.proximal_level_stats
2777
0
            .num_filtered_input_files_in_output_level;
2778
0
    job_stats_->total_skipped_input_bytes +=
2779
0
        internal_stats.proximal_level_stats.bytes_skipped_non_output_levels +
2780
0
        internal_stats.proximal_level_stats.bytes_skipped_output_level;
2781
0
  }
2782
4.85k
}
2783
2784
void CompactionJob::UpdateCompactionJobOutputStatsFromInternalStats(
2785
    const Status& status,
2786
4.85k
    const InternalStats::CompactionStatsFull& internal_stats) const {
2787
4.85k
  assert(job_stats_);
2788
4.85k
  job_stats_->elapsed_micros = internal_stats.output_level_stats.micros;
2789
4.85k
  job_stats_->cpu_micros = internal_stats.output_level_stats.cpu_micros;
2790
2791
  // output information
2792
4.85k
  job_stats_->total_output_bytes =
2793
4.85k
      internal_stats.output_level_stats.bytes_written;
2794
4.85k
  job_stats_->total_output_bytes_blob =
2795
4.85k
      internal_stats.output_level_stats.bytes_written_blob;
2796
4.85k
  job_stats_->num_output_records =
2797
4.85k
      internal_stats.output_level_stats.num_output_records;
2798
4.85k
  job_stats_->num_output_files =
2799
4.85k
      internal_stats.output_level_stats.num_output_files;
2800
4.85k
  job_stats_->num_output_files_blob =
2801
4.85k
      internal_stats.output_level_stats.num_output_files_blob;
2802
2803
4.85k
  if (internal_stats.has_proximal_level_output) {
2804
0
    job_stats_->total_output_bytes +=
2805
0
        internal_stats.proximal_level_stats.bytes_written;
2806
0
    job_stats_->total_output_bytes_blob +=
2807
0
        internal_stats.proximal_level_stats.bytes_written_blob;
2808
0
    job_stats_->num_output_records +=
2809
0
        internal_stats.proximal_level_stats.num_output_records;
2810
0
    job_stats_->num_output_files +=
2811
0
        internal_stats.proximal_level_stats.num_output_files;
2812
0
    job_stats_->num_output_files_blob +=
2813
0
        internal_stats.proximal_level_stats.num_output_files_blob;
2814
0
  }
2815
2816
4.85k
  if (status.ok() && job_stats_->num_output_files > 0) {
2817
1.57k
    CopyPrefix(compact_->SmallestUserKey(),
2818
1.57k
               CompactionJobStats::kMaxPrefixLength,
2819
1.57k
               &job_stats_->smallest_output_key_prefix);
2820
1.57k
    CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
2821
1.57k
               &job_stats_->largest_output_key_prefix);
2822
1.57k
  }
2823
4.85k
}
2824
2825
4.85k
void CompactionJob::LogCompaction() {
2826
4.85k
  Compaction* compaction = compact_->compaction;
2827
4.85k
  ColumnFamilyData* cfd = compaction->column_family_data();
2828
  // Let's check if anything will get logged. Don't prepare all the info if
2829
  // we're not logging
2830
4.85k
  if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
2831
4.85k
    Compaction::InputLevelSummaryBuffer inputs_summary;
2832
4.85k
    ROCKS_LOG_INFO(
2833
4.85k
        db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
2834
4.85k
        cfd->GetName().c_str(), job_id_,
2835
4.85k
        compaction->InputLevelSummary(&inputs_summary), compaction->score());
2836
4.85k
    char scratch[2345];
2837
4.85k
    compaction->Summary(scratch, sizeof(scratch));
2838
4.85k
    ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
2839
4.85k
                   cfd->GetName().c_str(), scratch);
2840
    // build event logger report
2841
4.85k
    auto stream = event_logger_->Log();
2842
4.85k
    stream << "job" << job_id_ << "event" << "compaction_started" << "cf_name"
2843
4.85k
           << cfd->GetName() << "compaction_reason"
2844
4.85k
           << GetCompactionReasonString(compaction->compaction_reason());
2845
12.4k
    for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
2846
7.57k
      stream << ("files_L" + std::to_string(compaction->level(i)));
2847
7.57k
      stream.StartArray();
2848
22.4k
      for (auto f : *compaction->inputs(i)) {
2849
22.4k
        stream << f->fd.GetNumber();
2850
22.4k
      }
2851
7.57k
      stream.EndArray();
2852
7.57k
    }
2853
4.85k
    stream << "score" << compaction->score() << "input_data_size"
2854
4.85k
           << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno"
2855
4.85k
           << (job_context_->snapshot_seqs.empty()
2856
4.85k
                   ? int64_t{-1}  // Use -1 for "none"
2857
4.85k
                   : static_cast<int64_t>(
2858
7
                         job_context_->GetEarliestSnapshotSequence()));
2859
4.85k
    if (compaction->SupportsPerKeyPlacement()) {
2860
0
      stream << "proximal_after_seqno" << proximal_after_seqno_;
2861
0
      stream << "preserve_seqno_after" << preserve_seqno_after_;
2862
0
      stream << "proximal_output_level" << compaction->GetProximalLevel();
2863
0
      stream << "proximal_output_range"
2864
0
             << GetCompactionProximalOutputRangeTypeString(
2865
0
                    compaction->GetProximalOutputRangeType());
2866
2867
0
      if (compaction->GetProximalOutputRangeType() ==
2868
0
          Compaction::ProximalOutputRangeType::kDisabled) {
2869
0
        ROCKS_LOG_WARN(
2870
0
            db_options_.info_log,
2871
0
            "[%s] [JOB %d] Proximal level output is disabled, likely "
2872
0
            "because of the range conflict in the proximal level",
2873
0
            cfd->GetName().c_str(), job_id_);
2874
0
      }
2875
0
    }
2876
4.85k
  }
2877
4.85k
}
2878
2879
4.57k
std::string CompactionJob::GetTableFileName(uint64_t file_number) {
2880
4.57k
  return TableFileName(compact_->compaction->immutable_options().cf_paths,
2881
4.57k
                       file_number, compact_->compaction->output_path_id());
2882
4.57k
}
2883
2884
8.71k
Env::IOPriority CompactionJob::GetRateLimiterPriority() {
2885
8.71k
  if (versions_ && versions_->GetColumnFamilySet() &&
2886
8.71k
      versions_->GetColumnFamilySet()->write_controller()) {
2887
8.71k
    WriteController* write_controller =
2888
8.71k
        versions_->GetColumnFamilySet()->write_controller();
2889
8.71k
    if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
2890
61
      return Env::IO_USER;
2891
61
    }
2892
8.71k
  }
2893
2894
8.65k
  return Env::IO_LOW;
2895
8.71k
}
2896
2897
Status CompactionJob::ReadTablePropertiesDirectly(
2898
    const ImmutableOptions& ioptions, const MutableCFOptions& moptions,
2899
    const FileMetaData* file_meta, const ReadOptions& read_options,
2900
0
    std::shared_ptr<const TableProperties>* tp) {
2901
0
  std::unique_ptr<FSRandomAccessFile> file;
2902
0
  std::string file_name = GetTableFileName(file_meta->fd.GetNumber());
2903
0
  FileOptions fopts = file_options_;
2904
0
  fopts.file_checksum = file_meta->file_checksum;
2905
0
  fopts.file_checksum_func_name = file_meta->file_checksum_func_name;
2906
0
  Status s = ioptions.fs->NewRandomAccessFile(file_name, fopts, &file,
2907
0
                                              nullptr /* dbg */);
2908
0
  if (!s.ok()) {
2909
0
    return s;
2910
0
  }
2911
2912
0
  std::unique_ptr<RandomAccessFileReader> file_reader(
2913
0
      new RandomAccessFileReader(
2914
0
          std::move(file), file_name, ioptions.clock, io_tracer_,
2915
0
          ioptions.stats, Histograms::SST_READ_MICROS /* hist_type */,
2916
0
          nullptr /* file_read_hist */, ioptions.rate_limiter.get(),
2917
0
          ioptions.listeners));
2918
2919
0
  std::unique_ptr<TableProperties> props;
2920
2921
0
  uint64_t magic_number = kBlockBasedTableMagicNumber;
2922
2923
0
  const auto* table_factory = moptions.table_factory.get();
2924
0
  if (table_factory == nullptr) {
2925
0
    return Status::Incomplete("Table factory is not set");
2926
0
  } else {
2927
0
    const auto& table_factory_name = table_factory->Name();
2928
0
    if (table_factory_name == TableFactory::kPlainTableName()) {
2929
0
      magic_number = kPlainTableMagicNumber;
2930
0
    } else if (table_factory_name == TableFactory::kCuckooTableName()) {
2931
0
      magic_number = kCuckooTableMagicNumber;
2932
0
    }
2933
0
  }
2934
2935
0
  s = ReadTableProperties(file_reader.get(), file_meta->fd.GetFileSize(),
2936
0
                          magic_number, ioptions, read_options, &props);
2937
0
  if (!s.ok()) {
2938
0
    return s;
2939
0
  }
2940
2941
0
  *tp = std::move(props);
2942
0
  return s;
2943
0
}
2944
2945
Status CompactionJob::ReadOutputFilesTableProperties(
2946
    const autovector<FileMetaData>& output_files,
2947
    const ReadOptions& read_options,
2948
    std::vector<std::shared_ptr<const TableProperties>>&
2949
        output_files_table_properties,
2950
0
    bool is_proximal_level) {
2951
0
  assert(!output_files.empty());
2952
2953
0
  static const char* level_type =
2954
0
      is_proximal_level ? "proximal output" : "output";
2955
2956
0
  output_files_table_properties.reserve(output_files.size());
2957
2958
0
  Status s;
2959
2960
0
  for (const FileMetaData& metadata : output_files) {
2961
0
    std::shared_ptr<const TableProperties> tp;
2962
0
    s = ReadTablePropertiesDirectly(compact_->compaction->immutable_options(),
2963
0
                                    compact_->compaction->mutable_cf_options(),
2964
0
                                    &metadata, read_options, &tp);
2965
0
    if (!s.ok()) {
2966
0
      ROCKS_LOG_ERROR(
2967
0
          db_options_.info_log,
2968
0
          "Failed to read table properties for %s level output file #%" PRIu64
2969
0
          ": %s",
2970
0
          level_type, metadata.fd.GetNumber(), s.ToString().c_str());
2971
0
      return s;
2972
0
    }
2973
2974
0
    if (tp == nullptr) {
2975
0
      ROCKS_LOG_ERROR(db_options_.info_log,
2976
0
                      "Empty table property for %s level output file #%" PRIu64
2977
0
                      "",
2978
0
                      level_type, metadata.fd.GetNumber());
2979
2980
0
      s = Status::Corruption("Empty table property for " +
2981
0
                             std::string(level_type) +
2982
0
                             " level output files during resuming");
2983
0
      return s;
2984
0
    }
2985
0
    output_files_table_properties.push_back(tp);
2986
0
  }
2987
0
  return s;
2988
0
}
2989
2990
void CompactionJob::RestoreCompactionOutputs(
2991
    const ColumnFamilyData* cfd,
2992
    const std::vector<std::shared_ptr<const TableProperties>>&
2993
        output_files_table_properties,
2994
    SubcompactionProgressPerLevel& subcompaction_progress_per_level,
2995
0
    CompactionOutputs* outputs_to_restore) {
2996
0
  assert(outputs_to_restore->GetOutputs().size() == 0);
2997
2998
0
  const auto& output_files = subcompaction_progress_per_level.GetOutputFiles();
2999
3000
0
  const bool enable_output_hash =
3001
0
      paranoid_file_checks_ ||
3002
0
      !!(compact_->compaction->mutable_cf_options().verify_output_flags &
3003
0
         VerifyOutputFlags::kVerifyIteration);
3004
3005
0
  for (size_t i = 0; i < output_files.size(); i++) {
3006
0
    FileMetaData file_copy = output_files[i];
3007
3008
0
    outputs_to_restore->AddOutput(std::move(file_copy),
3009
0
                                  cfd->internal_comparator(),
3010
0
                                  enable_output_hash, true /* finished */);
3011
3012
0
    outputs_to_restore->UpdateTableProperties(
3013
0
        *output_files_table_properties[i]);
3014
0
  }
3015
3016
0
  outputs_to_restore->SetNumOutputRecords(
3017
0
      subcompaction_progress_per_level.GetNumProcessedOutputRecords());
3018
0
}
3019
3020
// Attempt to resume compaction from a previously persisted compaction progress.
3021
//
3022
// RETURNS:
3023
// - Status::OK():
3024
// * Input iterator positioned at next unprocessed key
3025
// * CompactionOutputs objects fully restored for both output and proximal
3026
// output levels in SubcompactionState
3027
// * Compaction job statistics accurately reflect input and output records
3028
// processed for record count verification
3029
// * File number generation advanced to prevent conflicts with existing outputs
3030
// - Status::NotFound(): No valid progress to resume from
3031
// - Status::Corruption(): Resume key is invalid, beyond input range, or output
3032
// restoration failed
3033
Status CompactionJob::MaybeResumeSubcompactionProgressOnInputIterator(
3034
4.85k
    SubcompactionState* sub_compact, InternalIterator* input_iter) {
3035
4.85k
  const ReadOptions read_options(Env::IOActivity::kCompaction);
3036
4.85k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
3037
4.85k
  SubcompactionProgress& subcompaction_progress =
3038
4.85k
      sub_compact->GetSubcompactionProgressRef();
3039
3040
4.85k
  if (subcompaction_progress.output_level_progress
3041
4.85k
              .GetNumProcessedOutputRecords() == 0 &&
3042
4.85k
      subcompaction_progress.proximal_output_level_progress
3043
4.85k
              .GetNumProcessedOutputRecords() == 0) {
3044
4.85k
    return Status::NotFound("No subcompaction progress to resume");
3045
4.85k
  }
3046
3047
0
  ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Resuming compaction : %s",
3048
0
                 cfd->GetName().c_str(), job_id_,
3049
0
                 subcompaction_progress.ToString().c_str());
3050
3051
0
  input_iter->Seek(subcompaction_progress.next_internal_key_to_compact);
3052
3053
0
  if (!input_iter->Valid()) {
3054
0
    ROCKS_LOG_ERROR(db_options_.info_log,
3055
0
                    "[%s] [JOB %d] Iterator is invalid after "
3056
0
                    "seeking to the key to resume. This indicates the key is "
3057
0
                    "incorrectly beyond the input data range.",
3058
0
                    cfd->GetName().c_str(), job_id_);
3059
0
    return Status::Corruption(
3060
0
        "The key to resume is beyond the input data range");
3061
0
  } else if (!input_iter->status().ok()) {
3062
0
    ROCKS_LOG_ERROR(db_options_.info_log,
3063
0
                    "[%s] [JOB %d] Iterator has error after seeking to "
3064
0
                    "the key to resume: %s",
3065
0
                    cfd->GetName().c_str(), job_id_,
3066
0
                    input_iter->status().ToString().c_str());
3067
0
    return Status::Corruption(
3068
0
        "Iterator has error status after seeking to the key: " +
3069
0
        input_iter->status().ToString());
3070
0
  }
3071
3072
0
  sub_compact->compaction_job_stats.has_accurate_num_input_records =
3073
0
      subcompaction_progress.num_processed_input_records != 0;
3074
3075
0
  sub_compact->compaction_job_stats.num_input_records =
3076
0
      subcompaction_progress.num_processed_input_records;
3077
3078
0
  for (const bool& is_proximal_level : {false, true}) {
3079
0
    if (is_proximal_level &&
3080
0
        !sub_compact->compaction->SupportsPerKeyPlacement()) {
3081
0
      continue;
3082
0
    }
3083
3084
0
    Status s;
3085
0
    SubcompactionProgressPerLevel& subcompaction_progress_per_level =
3086
0
        is_proximal_level
3087
0
            ? subcompaction_progress.proximal_output_level_progress
3088
0
            : subcompaction_progress.output_level_progress;
3089
3090
0
    const auto& output_files =
3091
0
        subcompaction_progress_per_level.GetOutputFiles();
3092
3093
0
    std::vector<std::shared_ptr<const TableProperties>>
3094
0
        output_files_table_properties;
3095
3096
    // TODO(hx235): investigate if we can skip reading properties to save read
3097
    // IO
3098
0
    s = ReadOutputFilesTableProperties(output_files, read_options,
3099
0
                                       output_files_table_properties);
3100
0
    if (!s.ok()) {
3101
0
      ROCKS_LOG_ERROR(
3102
0
          db_options_.info_log,
3103
0
          "[%s] [JOB %d] Failed to read table properties for %s output level"
3104
0
          "files "
3105
0
          "during resume: %s.",
3106
0
          cfd->GetName().c_str(), job_id_, is_proximal_level ? "proximal" : "",
3107
0
          s.ToString().c_str());
3108
0
      return Status::Corruption(
3109
0
          "Not able to resume due to table property reading error " +
3110
0
          s.ToString());
3111
0
    }
3112
3113
0
    RestoreCompactionOutputs(cfd, output_files_table_properties,
3114
0
                             subcompaction_progress_per_level,
3115
0
                             sub_compact->Outputs(is_proximal_level));
3116
3117
    // Skip past all the used file numbers to avoid creating new output files
3118
    // after resumption that conflict with the existing output files
3119
0
    for (const auto& file_meta : output_files) {
3120
0
      uint64_t file_number = file_meta.fd.GetNumber();
3121
0
      while (versions_->NewFileNumber() <= file_number) {
3122
0
        versions_->FetchAddFileNumber(1);
3123
0
      }
3124
0
    }
3125
0
  }
3126
3127
0
  return Status::OK();
3128
0
}
3129
3130
void CompactionJob::UpdateSubcompactionProgress(
3131
    const CompactionIterator* c_iter, const Slice next_table_min_key,
3132
0
    SubcompactionState* sub_compact) {
3133
0
  assert(c_iter);
3134
0
  SubcompactionProgress& subcompaction_progress =
3135
0
      sub_compact->GetSubcompactionProgressRef();
3136
3137
0
  IterKey next_ikey_to_compact;
3138
0
  next_ikey_to_compact.SetInternalKey(ExtractUserKey(next_table_min_key),
3139
0
                                      kMaxSequenceNumber, kValueTypeForSeek);
3140
0
  subcompaction_progress.next_internal_key_to_compact =
3141
0
      next_ikey_to_compact.GetInternalKey().ToString();
3142
3143
  // Track total processed input records for progress reporting by combining:
3144
  // - Resumed count: records already processed before compaction was
3145
  // interrupted
3146
  // - Current count: records scanned in the current compaction session
3147
  // Only update when both tracking mechanisms provide accurate counts to ensure
3148
  // reliability.
3149
0
  subcompaction_progress.num_processed_input_records =
3150
0
      c_iter->HasNumInputEntryScanned() &&
3151
0
              sub_compact->compaction_job_stats.has_accurate_num_input_records
3152
0
          ? c_iter->NumInputEntryScanned() +
3153
0
                sub_compact->compaction_job_stats.num_input_records
3154
0
          : 0;
3155
3156
0
  UpdateSubcompactionProgressPerLevel(
3157
0
      sub_compact, false /* is_proximal_level */, subcompaction_progress);
3158
3159
0
  if (sub_compact->compaction->SupportsPerKeyPlacement()) {
3160
0
    UpdateSubcompactionProgressPerLevel(
3161
0
        sub_compact, true /* is_proximal_level */, subcompaction_progress);
3162
0
  }
3163
0
}
3164
3165
void CompactionJob::UpdateSubcompactionProgressPerLevel(
3166
    SubcompactionState* sub_compact, bool is_proximal_level,
3167
0
    SubcompactionProgress& subcompaction_progress) {
3168
0
  SubcompactionProgressPerLevel& subcompaction_progress_per_level =
3169
0
      is_proximal_level ? subcompaction_progress.proximal_output_level_progress
3170
0
                        : subcompaction_progress.output_level_progress;
3171
3172
0
  subcompaction_progress_per_level.SetNumProcessedOutputRecords(
3173
0
      sub_compact->OutputStats(is_proximal_level)->num_output_records);
3174
3175
0
  const auto& prev_output_files =
3176
0
      subcompaction_progress_per_level.GetOutputFiles();
3177
3178
0
  const auto& current_output_files =
3179
0
      sub_compact->Outputs(is_proximal_level)->GetOutputs();
3180
3181
0
  for (size_t i = prev_output_files.size(); i < current_output_files.size();
3182
0
       i++) {
3183
0
    subcompaction_progress_per_level.AddToOutputFiles(
3184
0
        current_output_files[i].meta);
3185
0
  }
3186
0
}
3187
3188
Status CompactionJob::PersistSubcompactionProgress(
3189
0
    SubcompactionState* sub_compact) {
3190
0
  SubcompactionProgress& subcompaction_progress =
3191
0
      sub_compact->GetSubcompactionProgressRef();
3192
3193
0
  assert(compaction_progress_writer_);
3194
3195
0
  VersionEdit edit;
3196
0
  edit.SetSubcompactionProgress(subcompaction_progress);
3197
3198
0
  std::string record;
3199
0
  if (!edit.EncodeTo(&record)) {
3200
0
    ROCKS_LOG_ERROR(
3201
0
        db_options_.info_log,
3202
0
        "[%s] [JOB %d] Failed to encode subcompaction "
3203
0
        "progress",
3204
0
        compact_->compaction->column_family_data()->GetName().c_str(), job_id_);
3205
0
    return Status::Corruption("Failed to encode subcompaction progress");
3206
0
  }
3207
3208
0
  WriteOptions write_options(Env::IOActivity::kCompaction);
3209
0
  Status s = compaction_progress_writer_->AddRecord(write_options, record);
3210
0
  IOOptions opts;
3211
0
  if (s.ok()) {
3212
0
    s = WritableFileWriter::PrepareIOOptions(write_options, opts);
3213
0
  }
3214
0
  if (s.ok()) {
3215
0
    s = compaction_progress_writer_->file()->Sync(opts, db_options_.use_fsync);
3216
0
  }
3217
3218
0
  if (!s.ok()) {
3219
0
    ROCKS_LOG_ERROR(
3220
0
        db_options_.info_log,
3221
0
        "[%s] [JOB %d] Failed to persist subcompaction "
3222
0
        "progress: %s",
3223
0
        compact_->compaction->column_family_data()->GetName().c_str(), job_id_,
3224
0
        s.ToString().c_str());
3225
0
    return s;
3226
0
  }
3227
3228
0
  subcompaction_progress.output_level_progress
3229
0
      .UpdateLastPersistedOutputFilesCount();
3230
3231
0
  subcompaction_progress.proximal_output_level_progress
3232
0
      .UpdateLastPersistedOutputFilesCount();
3233
3234
0
  return Status::OK();
3235
0
}
3236
3237
Status CompactionJob::VerifyInputRecordCount(
3238
2.72k
    uint64_t num_input_range_del) const {
3239
2.72k
  size_t ts_sz = compact_->compaction->column_family_data()
3240
2.72k
                     ->user_comparator()
3241
2.72k
                     ->timestamp_size();
3242
  // When trim_ts_ is non-empty, CompactionIterator takes
3243
  // HistoryTrimmingIterator as input iterator and sees a trimmed view of
3244
  // input keys. So the number of keys it processed is not suitable for
3245
  // verification here.
3246
  // TODO: support verification when trim_ts_ is non-empty.
3247
2.72k
  if (!(ts_sz > 0 && !trim_ts_.empty())) {
3248
2.72k
    assert(internal_stats_.output_level_stats.num_input_records > 0);
3249
    // TODO: verify the number of range deletion entries.
3250
2.72k
    uint64_t expected = internal_stats_.output_level_stats.num_input_records -
3251
2.72k
                        num_input_range_del;
3252
2.72k
    uint64_t actual = job_stats_->num_input_records;
3253
2.72k
    if (expected != actual) {
3254
0
      char scratch[2345];
3255
0
      compact_->compaction->Summary(scratch, sizeof(scratch));
3256
0
      std::string msg =
3257
0
          "Compaction number of input keys does not match "
3258
0
          "number of keys processed. Expected " +
3259
0
          std::to_string(expected) + " but processed " +
3260
0
          std::to_string(actual) + ". Compaction summary: " + scratch;
3261
0
      ROCKS_LOG_WARN(
3262
0
          db_options_.info_log,
3263
0
          "[%s] [JOB %d] VerifyInputRecordCount() Status: %s",
3264
0
          compact_->compaction->column_family_data()->GetName().c_str(),
3265
0
          job_context_->job_id, msg.c_str());
3266
0
      if (db_options_.compaction_verify_record_count) {
3267
0
        return Status::Corruption(msg);
3268
0
      }
3269
0
    }
3270
2.72k
  }
3271
2.72k
  return Status::OK();
3272
2.72k
}
3273
3274
2.72k
Status CompactionJob::VerifyOutputRecordCount() const {
3275
2.72k
  uint64_t total_output_num = 0;
3276
2.72k
  for (const auto& state : compact_->sub_compact_states) {
3277
2.72k
    for (const auto& output : state.GetOutputs()) {
3278
1.57k
      total_output_num += output.table_properties->num_entries -
3279
1.57k
                          output.table_properties->num_range_deletions;
3280
1.57k
    }
3281
2.72k
  }
3282
3283
2.72k
  uint64_t expected = internal_stats_.output_level_stats.num_output_records;
3284
2.72k
  if (internal_stats_.has_proximal_level_output) {
3285
0
    expected += internal_stats_.proximal_level_stats.num_output_records;
3286
0
  }
3287
2.72k
  if (expected != total_output_num) {
3288
0
    char scratch[2345];
3289
0
    compact_->compaction->Summary(scratch, sizeof(scratch));
3290
0
    std::string msg =
3291
0
        "Number of keys in compaction output SST files does not match "
3292
0
        "number of keys added. Expected " +
3293
0
        std::to_string(expected) + " but there are " +
3294
0
        std::to_string(total_output_num) +
3295
0
        " in output SST files. Compaction summary: " + scratch;
3296
0
    ROCKS_LOG_WARN(
3297
0
        db_options_.info_log,
3298
0
        "[%s] [JOB %d] VerifyOutputRecordCount() status: %s",
3299
0
        compact_->compaction->column_family_data()->GetName().c_str(),
3300
0
        job_context_->job_id, msg.c_str());
3301
0
    if (db_options_.compaction_verify_record_count) {
3302
0
      return Status::Corruption(msg);
3303
0
    }
3304
0
  }
3305
2.72k
  return Status::OK();
3306
2.72k
}
3307
}  // namespace ROCKSDB_NAMESPACE