Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/db/compaction/compaction_job.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/compaction/compaction_job.h"
11
12
#include <algorithm>
13
#include <cinttypes>
14
#include <memory>
15
#include <optional>
16
#include <set>
17
#include <utility>
18
#include <vector>
19
20
#include "db/blob/blob_counting_iterator.h"
21
#include "db/blob/blob_file_addition.h"
22
#include "db/blob/blob_file_builder.h"
23
#include "db/builder.h"
24
#include "db/compaction/clipping_iterator.h"
25
#include "db/compaction/compaction_state.h"
26
#include "db/db_impl/db_impl.h"
27
#include "db/dbformat.h"
28
#include "db/error_handler.h"
29
#include "db/event_helpers.h"
30
#include "db/history_trimming_iterator.h"
31
#include "db/log_writer.h"
32
#include "db/merge_helper.h"
33
#include "db/range_del_aggregator.h"
34
#include "db/version_edit.h"
35
#include "db/version_set.h"
36
#include "file/filename.h"
37
#include "file/read_write_util.h"
38
#include "file/sst_file_manager_impl.h"
39
#include "file/writable_file_writer.h"
40
#include "logging/log_buffer.h"
41
#include "logging/logging.h"
42
#include "monitoring/iostats_context_imp.h"
43
#include "monitoring/thread_status_util.h"
44
#include "options/configurable_helper.h"
45
#include "options/options_helper.h"
46
#include "port/port.h"
47
#include "rocksdb/db.h"
48
#include "rocksdb/env.h"
49
#include "rocksdb/options.h"
50
#include "rocksdb/statistics.h"
51
#include "rocksdb/status.h"
52
#include "rocksdb/table.h"
53
#include "rocksdb/utilities/options_type.h"
54
#include "table/merging_iterator.h"
55
#include "table/table_builder.h"
56
#include "table/unique_id_impl.h"
57
#include "test_util/sync_point.h"
58
#include "util/stop_watch.h"
59
60
namespace ROCKSDB_NAMESPACE {
61
62
5.33k
const char* GetCompactionReasonString(CompactionReason compaction_reason) {
63
5.33k
  switch (compaction_reason) {
64
0
    case CompactionReason::kUnknown:
65
0
      return "Unknown";
66
2.51k
    case CompactionReason::kLevelL0FilesNum:
67
2.51k
      return "LevelL0FilesNum";
68
0
    case CompactionReason::kLevelMaxLevelSize:
69
0
      return "LevelMaxLevelSize";
70
0
    case CompactionReason::kUniversalSizeAmplification:
71
0
      return "UniversalSizeAmplification";
72
0
    case CompactionReason::kUniversalSizeRatio:
73
0
      return "UniversalSizeRatio";
74
0
    case CompactionReason::kUniversalSortedRunNum:
75
0
      return "UniversalSortedRunNum";
76
0
    case CompactionReason::kFIFOMaxSize:
77
0
      return "FIFOMaxSize";
78
0
    case CompactionReason::kFIFOReduceNumFiles:
79
0
      return "FIFOReduceNumFiles";
80
0
    case CompactionReason::kFIFOTtl:
81
0
      return "FIFOTtl";
82
1.14k
    case CompactionReason::kManualCompaction:
83
1.14k
      return "ManualCompaction";
84
0
    case CompactionReason::kFilesMarkedForCompaction:
85
0
      return "FilesMarkedForCompaction";
86
1.68k
    case CompactionReason::kBottommostFiles:
87
1.68k
      return "BottommostFiles";
88
0
    case CompactionReason::kTtl:
89
0
      return "Ttl";
90
0
    case CompactionReason::kFlush:
91
0
      return "Flush";
92
0
    case CompactionReason::kExternalSstIngestion:
93
0
      return "ExternalSstIngestion";
94
0
    case CompactionReason::kPeriodicCompaction:
95
0
      return "PeriodicCompaction";
96
0
    case CompactionReason::kChangeTemperature:
97
0
      return "ChangeTemperature";
98
0
    case CompactionReason::kForcedBlobGC:
99
0
      return "ForcedBlobGC";
100
0
    case CompactionReason::kRoundRobinTtl:
101
0
      return "RoundRobinTtl";
102
0
    case CompactionReason::kRefitLevel:
103
0
      return "RefitLevel";
104
0
    case CompactionReason::kNumOfReasons:
105
      // fall through
106
0
    default:
107
0
      assert(false);
108
0
      return "Invalid";
109
5.33k
  }
110
5.33k
}
111
112
const char* GetCompactionProximalOutputRangeTypeString(
113
0
    Compaction::ProximalOutputRangeType range_type) {
114
0
  switch (range_type) {
115
0
    case Compaction::ProximalOutputRangeType::kNotSupported:
116
0
      return "NotSupported";
117
0
    case Compaction::ProximalOutputRangeType::kFullRange:
118
0
      return "FullRange";
119
0
    case Compaction::ProximalOutputRangeType::kNonLastRange:
120
0
      return "NonLastRange";
121
0
    case Compaction::ProximalOutputRangeType::kDisabled:
122
0
      return "Disabled";
123
0
    default:
124
0
      assert(false);
125
0
      return "Invalid";
126
0
  }
127
0
}
128
129
CompactionJob::CompactionJob(
130
    int job_id, Compaction* compaction, const ImmutableDBOptions& db_options,
131
    const MutableDBOptions& mutable_db_options, const FileOptions& file_options,
132
    VersionSet* versions, const std::atomic<bool>* shutting_down,
133
    LogBuffer* log_buffer, FSDirectory* db_directory,
134
    FSDirectory* output_directory, FSDirectory* blob_output_directory,
135
    Statistics* stats, InstrumentedMutex* db_mutex,
136
    ErrorHandler* db_error_handler, JobContext* job_context,
137
    std::shared_ptr<Cache> table_cache, EventLogger* event_logger,
138
    bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname,
139
    CompactionJobStats* compaction_job_stats, Env::Priority thread_pri,
140
    const std::shared_ptr<IOTracer>& io_tracer,
141
    const std::atomic<bool>& manual_compaction_canceled,
142
    const std::string& db_id, const std::string& db_session_id,
143
    std::string full_history_ts_low, std::string trim_ts,
144
    BlobFileCompletionCallback* blob_callback, int* bg_compaction_scheduled,
145
    int* bg_bottom_compaction_scheduled)
146
5.33k
    : compact_(new CompactionState(compaction)),
147
5.33k
      internal_stats_(compaction->compaction_reason(), 1),
148
5.33k
      db_options_(db_options),
149
5.33k
      mutable_db_options_copy_(mutable_db_options),
150
5.33k
      log_buffer_(log_buffer),
151
5.33k
      output_directory_(output_directory),
152
5.33k
      stats_(stats),
153
5.33k
      bottommost_level_(false),
154
5.33k
      write_hint_(Env::WLTH_NOT_SET),
155
5.33k
      job_stats_(compaction_job_stats),
156
5.33k
      job_id_(job_id),
157
5.33k
      dbname_(dbname),
158
5.33k
      db_id_(db_id),
159
5.33k
      db_session_id_(db_session_id),
160
5.33k
      file_options_(file_options),
161
5.33k
      env_(db_options.env),
162
5.33k
      io_tracer_(io_tracer),
163
5.33k
      fs_(db_options.fs, io_tracer),
164
      file_options_for_read_(
165
5.33k
          fs_->OptimizeForCompactionTableRead(file_options, db_options_)),
166
5.33k
      versions_(versions),
167
5.33k
      shutting_down_(shutting_down),
168
5.33k
      manual_compaction_canceled_(manual_compaction_canceled),
169
5.33k
      db_directory_(db_directory),
170
5.33k
      blob_output_directory_(blob_output_directory),
171
5.33k
      db_mutex_(db_mutex),
172
5.33k
      db_error_handler_(db_error_handler),
173
5.33k
      earliest_snapshot_(job_context->GetEarliestSnapshotSequence()),
174
5.33k
      job_context_(job_context),
175
5.33k
      table_cache_(std::move(table_cache)),
176
5.33k
      event_logger_(event_logger),
177
5.33k
      paranoid_file_checks_(paranoid_file_checks),
178
5.33k
      measure_io_stats_(measure_io_stats),
179
5.33k
      thread_pri_(thread_pri),
180
5.33k
      full_history_ts_low_(std::move(full_history_ts_low)),
181
5.33k
      trim_ts_(std::move(trim_ts)),
182
5.33k
      blob_callback_(blob_callback),
183
5.33k
      extra_num_subcompaction_threads_reserved_(0),
184
5.33k
      bg_compaction_scheduled_(bg_compaction_scheduled),
185
5.33k
      bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) {
186
5.33k
  assert(job_stats_ != nullptr);
187
5.33k
  assert(log_buffer_ != nullptr);
188
5.33k
  assert(job_context->snapshot_context_initialized);
189
190
5.33k
  const auto* cfd = compact_->compaction->column_family_data();
191
5.33k
  ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking);
192
5.33k
  ThreadStatusUtil::SetColumnFamily(cfd);
193
5.33k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
194
5.33k
  ReportStartedCompaction(compaction);
195
5.33k
}
196
197
5.33k
CompactionJob::~CompactionJob() {
198
5.33k
  assert(compact_ == nullptr);
199
5.33k
  ThreadStatusUtil::ResetThreadStatus();
200
5.33k
}
201
202
5.33k
void CompactionJob::ReportStartedCompaction(Compaction* compaction) {
203
5.33k
  ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID,
204
5.33k
                                               job_id_);
205
206
5.33k
  ThreadStatusUtil::SetThreadOperationProperty(
207
5.33k
      ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL,
208
5.33k
      (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) +
209
5.33k
          compact_->compaction->output_level());
210
211
  // In the current design, a CompactionJob is always created
212
  // for non-trivial compaction.
213
5.33k
  assert(compaction->IsTrivialMove() == false ||
214
5.33k
         compaction->is_manual_compaction() == true);
215
216
5.33k
  ThreadStatusUtil::SetThreadOperationProperty(
217
5.33k
      ThreadStatus::COMPACTION_PROP_FLAGS,
218
5.33k
      compaction->is_manual_compaction() +
219
5.33k
          (compaction->deletion_compaction() << 1));
220
5.33k
  auto total_input_bytes = compaction->CalculateTotalInputSize();
221
5.33k
  ThreadStatusUtil::SetThreadOperationProperty(
222
5.33k
      ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, total_input_bytes);
223
224
5.33k
  IOSTATS_RESET(bytes_written);
225
5.33k
  IOSTATS_RESET(bytes_read);
226
5.33k
  ThreadStatusUtil::SetThreadOperationProperty(
227
5.33k
      ThreadStatus::COMPACTION_BYTES_WRITTEN, 0);
228
5.33k
  ThreadStatusUtil::SetThreadOperationProperty(
229
5.33k
      ThreadStatus::COMPACTION_BYTES_READ, 0);
230
231
  // Set the thread operation after operation properties
232
  // to ensure GetThreadList() can always show them all together.
233
5.33k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION);
234
235
5.33k
  job_stats_->is_manual_compaction = compaction->is_manual_compaction();
236
5.33k
  job_stats_->is_full_compaction = compaction->is_full_compaction();
237
  // populate compaction stats num_input_files and total_num_of_bytes
238
5.33k
  size_t num_input_files = 0;
239
5.33k
  for (int input_level = 0;
240
13.8k
       input_level < static_cast<int>(compaction->num_input_levels());
241
8.55k
       ++input_level) {
242
8.55k
    const LevelFilesBrief* flevel = compaction->input_levels(input_level);
243
8.55k
    num_input_files += flevel->num_files;
244
8.55k
  }
245
5.33k
  job_stats_->CompactionJobStats::num_input_files = num_input_files;
246
5.33k
  job_stats_->total_input_bytes = total_input_bytes;
247
5.33k
}
248
249
void CompactionJob::Prepare(
250
    std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>>
251
5.33k
        known_single_subcompact) {
252
5.33k
  db_mutex_->AssertHeld();
253
5.33k
  AutoThreadOperationStageUpdater stage_updater(
254
5.33k
      ThreadStatus::STAGE_COMPACTION_PREPARE);
255
256
  // Generate file_levels_ for compaction before making Iterator
257
5.33k
  auto* c = compact_->compaction;
258
5.33k
  [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data();
259
5.33k
  assert(cfd != nullptr);
260
5.33k
  const VersionStorageInfo* storage_info = c->input_version()->storage_info();
261
5.33k
  assert(storage_info);
262
5.33k
  assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0);
263
264
5.33k
  write_hint_ = storage_info->CalculateSSTWriteHint(
265
5.33k
      c->output_level(), db_options_.calculate_sst_write_lifetime_hint_set);
266
5.33k
  bottommost_level_ = c->bottommost_level();
267
268
5.33k
  if (!known_single_subcompact.has_value() && c->ShouldFormSubcompactions()) {
269
0
    StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
270
0
    GenSubcompactionBoundaries();
271
0
  }
272
5.33k
  if (boundaries_.size() >= 1) {
273
0
    assert(!known_single_subcompact.has_value());
274
0
    for (size_t i = 0; i <= boundaries_.size(); i++) {
275
0
      compact_->sub_compact_states.emplace_back(
276
0
          c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt,
277
0
          (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i])
278
0
                                    : std::nullopt,
279
0
          static_cast<uint32_t>(i));
280
      // assert to validate that boundaries don't have same user keys (without
281
      // timestamp part).
282
0
      assert(i == 0 || i == boundaries_.size() ||
283
0
             cfd->user_comparator()->CompareWithoutTimestamp(
284
0
                 boundaries_[i - 1], boundaries_[i]) < 0);
285
0
    }
286
0
    RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED,
287
0
                      compact_->sub_compact_states.size());
288
5.33k
  } else {
289
5.33k
    std::optional<Slice> start_key;
290
5.33k
    std::optional<Slice> end_key;
291
5.33k
    if (known_single_subcompact.has_value()) {
292
0
      start_key = known_single_subcompact.value().first;
293
0
      end_key = known_single_subcompact.value().second;
294
5.33k
    } else {
295
5.33k
      assert(!start_key.has_value() && !end_key.has_value());
296
5.33k
    }
297
5.33k
    compact_->sub_compact_states.emplace_back(c, start_key, end_key,
298
5.33k
                                              /*sub_job_id*/ 0);
299
5.33k
  }
300
301
  // collect all seqno->time information from the input files which will be used
302
  // to encode seqno->time to the output files.
303
5.33k
  SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber;
304
5.33k
  SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber;
305
5.33k
  uint64_t preserve_time_duration =
306
5.33k
      MinAndMaxPreserveSeconds(c->mutable_cf_options()).max_preserve_seconds;
307
308
5.33k
  if (preserve_time_duration > 0) {
309
0
    const ReadOptions read_options(Env::IOActivity::kCompaction);
310
    // Setup seqno_to_time_mapping_ with relevant time range.
311
0
    seqno_to_time_mapping_.SetMaxTimeSpan(preserve_time_duration);
312
0
    for (const auto& each_level : *c->inputs()) {
313
0
      for (const auto& fmd : each_level.files) {
314
0
        std::shared_ptr<const TableProperties> tp;
315
0
        Status s = c->input_version()->GetTableProperties(read_options, &tp,
316
0
                                                          fmd, nullptr);
317
0
        if (s.ok()) {
318
0
          s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping);
319
0
        }
320
0
        if (!s.ok()) {
321
0
          ROCKS_LOG_WARN(
322
0
              db_options_.info_log,
323
0
              "Problem reading or processing seqno-to-time mapping: %s",
324
0
              s.ToString().c_str());
325
0
        }
326
0
      }
327
0
    }
328
329
0
    int64_t _current_time = 0;
330
0
    Status s = db_options_.clock->GetCurrentTime(&_current_time);
331
0
    if (!s.ok()) {
332
0
      ROCKS_LOG_WARN(db_options_.info_log,
333
0
                     "Failed to get current time in compaction: Status: %s",
334
0
                     s.ToString().c_str());
335
      // preserve all time information
336
0
      preserve_time_min_seqno = 0;
337
0
      preclude_last_level_min_seqno = 0;
338
0
      seqno_to_time_mapping_.Enforce();
339
0
    } else {
340
0
      seqno_to_time_mapping_.Enforce(_current_time);
341
0
      seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos(
342
0
          static_cast<uint64_t>(_current_time),
343
0
          c->mutable_cf_options().preserve_internal_time_seconds,
344
0
          c->mutable_cf_options().preclude_last_level_data_seconds,
345
0
          &preserve_time_min_seqno, &preclude_last_level_min_seqno);
346
0
    }
347
    // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only
348
    // limit the capacity after them.
349
    // Here If we set capacity to the per-SST limit, we could be throwing away
350
    // fidelity when a compaction output file has a narrower seqno range than
351
    // all the inputs. If we only limit capacity for each compaction output, we
352
    // could be doing a lot of unnecessary recomputation in a large compaction
353
    // (up to quadratic in number of files). Thus, we do soemthing in the
354
    // middle: enforce a resonably large constant size limit substantially
355
    // larger than kMaxSeqnoTimePairsPerSST.
356
0
    seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries);
357
0
  }
358
#ifndef NDEBUG
359
  assert(preserve_time_min_seqno <= preclude_last_level_min_seqno);
360
  TEST_SYNC_POINT_CALLBACK(
361
      "CompactionJob::PrepareTimes():preclude_last_level_min_seqno",
362
      static_cast<void*>(&preclude_last_level_min_seqno));
363
  // Restore the invariant asserted above, in case it was broken under the
364
  // callback
365
  preserve_time_min_seqno =
366
      std::min(preclude_last_level_min_seqno, preserve_time_min_seqno);
367
#endif
368
369
  // Preserve sequence numbers for preserved write times and snapshots, though
370
  // the specific sequence number of the earliest snapshot can be zeroed.
371
5.33k
  preserve_seqno_after_ =
372
5.33k
      std::max(preserve_time_min_seqno, SequenceNumber{1}) - 1;
373
5.33k
  preserve_seqno_after_ = std::min(preserve_seqno_after_, earliest_snapshot_);
374
  // If using preclude feature, also preclude snapshots from last level, just
375
  // because they are heuristically more likely to be accessed than non-snapshot
376
  // data.
377
5.33k
  if (preclude_last_level_min_seqno < kMaxSequenceNumber &&
378
5.33k
      earliest_snapshot_ < preclude_last_level_min_seqno) {
379
0
    preclude_last_level_min_seqno = earliest_snapshot_;
380
0
  }
381
  // Now combine what we would like to preclude from last level with what we
382
  // can safely support without dangerously moving data back up the LSM tree,
383
  // to get the final seqno threshold for proximal vs. last. In particular,
384
  // when the reserved output key range for the proximal level does not
385
  // include the entire last level input key range, we need to keep entries
386
  // already in the last level there. (Even allowing within-range entries to
387
  // move back up could cause problems with range tombstones. Perhaps it
388
  // would be better in some rare cases to keep entries in the last level
389
  // one-by-one rather than based on sequence number, but that would add extra
390
  // tracking and complexity to CompactionIterator that is probably not
391
  // worthwhile overall. Correctness is also more clear when splitting by
392
  // seqno threshold.)
393
5.33k
  proximal_after_seqno_ = std::max(preclude_last_level_min_seqno,
394
5.33k
                                   c->GetKeepInLastLevelThroughSeqno());
395
396
5.33k
  options_file_number_ = versions_->options_file_number();
397
5.33k
}
398
399
0
uint64_t CompactionJob::GetSubcompactionsLimit() {
400
0
  return extra_num_subcompaction_threads_reserved_ +
401
0
         std::max(
402
0
             std::uint64_t(1),
403
0
             static_cast<uint64_t>(compact_->compaction->max_subcompactions()));
404
0
}
405
406
void CompactionJob::AcquireSubcompactionResources(
407
0
    int num_extra_required_subcompactions) {
408
0
  TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0");
409
0
  TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1");
410
0
  int max_db_compactions =
411
0
      DBImpl::GetBGJobLimits(
412
0
          mutable_db_options_copy_.max_background_flushes,
413
0
          mutable_db_options_copy_.max_background_compactions,
414
0
          mutable_db_options_copy_.max_background_jobs,
415
0
          versions_->GetColumnFamilySet()
416
0
              ->write_controller()
417
0
              ->NeedSpeedupCompaction())
418
0
          .max_compactions;
419
0
  InstrumentedMutexLock l(db_mutex_);
420
  // Apply min function first since We need to compute the extra subcompaction
421
  // against compaction limits. And then try to reserve threads for extra
422
  // subcompactions. The actual number of reserved threads could be less than
423
  // the desired number.
424
0
  int available_bg_compactions_against_db_limit =
425
0
      std::max(max_db_compactions - *bg_compaction_scheduled_ -
426
0
                   *bg_bottom_compaction_scheduled_,
427
0
               0);
428
  // Reservation only supports backgrdoun threads of which the priority is
429
  // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the
430
  // origin thread_pri_ is higher than that. Similar to ReleaseThreads().
431
0
  extra_num_subcompaction_threads_reserved_ =
432
0
      env_->ReserveThreads(std::min(num_extra_required_subcompactions,
433
0
                                    available_bg_compactions_against_db_limit),
434
0
                           std::min(thread_pri_, Env::Priority::HIGH));
435
436
  // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
437
  // depending on if this compaction has the bottommost priority
438
0
  if (thread_pri_ == Env::Priority::BOTTOM) {
439
0
    *bg_bottom_compaction_scheduled_ +=
440
0
        extra_num_subcompaction_threads_reserved_;
441
0
  } else {
442
0
    *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_;
443
0
  }
444
0
}
445
446
0
void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) {
447
  // Do nothing when we have zero resources to shrink
448
0
  if (num_extra_resources == 0) {
449
0
    return;
450
0
  }
451
0
  db_mutex_->Lock();
452
  // We cannot release threads more than what we reserved before
453
0
  int extra_num_subcompaction_threads_released = env_->ReleaseThreads(
454
0
      (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH));
455
  // Update the number of reserved threads and the number of background
456
  // scheduled compactions for this compaction job
457
0
  extra_num_subcompaction_threads_reserved_ -=
458
0
      extra_num_subcompaction_threads_released;
459
  // TODO (zichen): design a test case with new subcompaction partitioning
460
  // when the number of actual partitions is less than the number of planned
461
  // partitions
462
0
  assert(extra_num_subcompaction_threads_released == (int)num_extra_resources);
463
  // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_
464
  // depending on if this compaction has the bottommost priority
465
0
  if (thread_pri_ == Env::Priority::BOTTOM) {
466
0
    *bg_bottom_compaction_scheduled_ -=
467
0
        extra_num_subcompaction_threads_released;
468
0
  } else {
469
0
    *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released;
470
0
  }
471
0
  db_mutex_->Unlock();
472
0
  TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0");
473
0
}
474
475
5.33k
void CompactionJob::ReleaseSubcompactionResources() {
476
5.33k
  if (extra_num_subcompaction_threads_reserved_ == 0) {
477
5.33k
    return;
478
5.33k
  }
479
0
  {
480
0
    InstrumentedMutexLock l(db_mutex_);
481
    // The number of reserved threads becomes larger than 0 only if the
482
    // compaction prioity is round robin and there is no sufficient
483
    // sub-compactions available
484
485
    // The scheduled compaction must be no less than 1 + extra number
486
    // subcompactions using acquired resources since this compaction job has not
487
    // finished yet
488
0
    assert(*bg_bottom_compaction_scheduled_ >=
489
0
               1 + extra_num_subcompaction_threads_reserved_ ||
490
0
           *bg_compaction_scheduled_ >=
491
0
               1 + extra_num_subcompaction_threads_reserved_);
492
0
  }
493
0
  ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_);
494
0
}
495
496
0
void CompactionJob::GenSubcompactionBoundaries() {
497
  // The goal is to find some boundary keys so that we can evenly partition
498
  // the compaction input data into max_subcompactions ranges.
499
  // For every input file, we ask TableReader to estimate 128 anchor points
500
  // that evenly partition the input file into 128 ranges and the range
501
  // sizes. This can be calculated by scanning index blocks of the file.
502
  // Once we have the anchor points for all the input files, we merge them
503
  // together and try to find keys dividing ranges evenly.
504
  // For example, if we have two input files, and each returns following
505
  // ranges:
506
  //   File1: (a1, 1000), (b1, 1200), (c1, 1100)
507
  //   File2: (a2, 1100), (b2, 1000), (c2, 1000)
508
  // We total sort the keys to following:
509
  //  (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000)
510
  // We calculate the total size by adding up all ranges' size, which is 6400.
511
  // If we would like to partition into 2 subcompactions, the target of the
512
  // range size is 3200. Based on the size, we take "b1" as the partition key
513
  // since the first three ranges would hit 3200.
514
  //
515
  // Note that the ranges are actually overlapping. For example, in the example
516
  // above, the range ending with "b1" is overlapping with the range ending with
517
  // "b2". So the size 1000+1100+1200 is an underestimation of data size up to
518
  // "b1". In extreme cases where we only compact N L0 files, a range can
519
  // overlap with N-1 other ranges. Since we requested a relatively large number
520
  // (128) of ranges from each input files, even N range overlapping would
521
  // cause relatively small inaccuracy.
522
0
  ReadOptions read_options(Env::IOActivity::kCompaction);
523
0
  read_options.rate_limiter_priority = GetRateLimiterPriority();
524
0
  auto* c = compact_->compaction;
525
0
  if (c->mutable_cf_options().table_factory->Name() ==
526
0
      TableFactory::kPlainTableName()) {
527
0
    return;
528
0
  }
529
530
0
  if (c->max_subcompactions() <= 1 &&
531
0
      !(c->immutable_options().compaction_pri == kRoundRobin &&
532
0
        c->immutable_options().compaction_style == kCompactionStyleLevel)) {
533
0
    return;
534
0
  }
535
0
  auto* cfd = c->column_family_data();
536
0
  const Comparator* cfd_comparator = cfd->user_comparator();
537
0
  const InternalKeyComparator& icomp = cfd->internal_comparator();
538
539
0
  auto* v = compact_->compaction->input_version();
540
0
  int base_level = v->storage_info()->base_level();
541
0
  InstrumentedMutexUnlock unlock_guard(db_mutex_);
542
543
0
  uint64_t total_size = 0;
544
0
  std::vector<TableReader::Anchor> all_anchors;
545
0
  int start_lvl = c->start_level();
546
0
  int out_lvl = c->output_level();
547
548
0
  for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) {
549
0
    int lvl = c->level(lvl_idx);
550
0
    if (lvl >= start_lvl && lvl <= out_lvl) {
551
0
      const LevelFilesBrief* flevel = c->input_levels(lvl_idx);
552
0
      size_t num_files = flevel->num_files;
553
554
0
      if (num_files == 0) {
555
0
        continue;
556
0
      }
557
558
0
      for (size_t i = 0; i < num_files; i++) {
559
0
        FileMetaData* f = flevel->files[i].file_metadata;
560
0
        std::vector<TableReader::Anchor> my_anchors;
561
0
        Status s = cfd->table_cache()->ApproximateKeyAnchors(
562
0
            read_options, icomp, *f, c->mutable_cf_options(), my_anchors);
563
0
        if (!s.ok() || my_anchors.empty()) {
564
0
          my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize());
565
0
        }
566
0
        for (auto& ac : my_anchors) {
567
          // Can be optimize to avoid this loop.
568
0
          total_size += ac.range_size;
569
0
        }
570
571
0
        all_anchors.insert(all_anchors.end(), my_anchors.begin(),
572
0
                           my_anchors.end());
573
0
      }
574
0
    }
575
0
  }
576
  // Here we total sort all the anchor points across all files and go through
577
  // them in the sorted order to find partitioning boundaries.
578
  // Not the most efficient implementation. A much more efficient algorithm
579
  // probably exists. But they are more complex. If performance turns out to
580
  // be a problem, we can optimize.
581
0
  std::sort(
582
0
      all_anchors.begin(), all_anchors.end(),
583
0
      [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool {
584
0
        return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) <
585
0
               0;
586
0
      });
587
588
  // Remove duplicated entries from boundaries.
589
0
  all_anchors.erase(
590
0
      std::unique(all_anchors.begin(), all_anchors.end(),
591
0
                  [cfd_comparator](TableReader::Anchor& a,
592
0
                                   TableReader::Anchor& b) -> bool {
593
0
                    return cfd_comparator->CompareWithoutTimestamp(
594
0
                               a.user_key, b.user_key) == 0;
595
0
                  }),
596
0
      all_anchors.end());
597
598
  // Get the number of planned subcompactions, may update reserve threads
599
  // and update extra_num_subcompaction_threads_reserved_ for round-robin
600
0
  uint64_t num_planned_subcompactions;
601
0
  if (c->immutable_options().compaction_pri == kRoundRobin &&
602
0
      c->immutable_options().compaction_style == kCompactionStyleLevel) {
603
    // For round-robin compaction prioity, we need to employ more
604
    // subcompactions (may exceed the max_subcompaction limit). The extra
605
    // subcompactions will be executed using reserved threads and taken into
606
    // account bg_compaction_scheduled or bg_bottom_compaction_scheduled.
607
608
    // Initialized by the number of input files
609
0
    num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0));
610
0
    uint64_t max_subcompactions_limit = GetSubcompactionsLimit();
611
0
    if (max_subcompactions_limit < num_planned_subcompactions) {
612
      // Assert two pointers are not empty so that we can use extra
613
      // subcompactions against db compaction limits
614
0
      assert(bg_bottom_compaction_scheduled_ != nullptr);
615
0
      assert(bg_compaction_scheduled_ != nullptr);
616
      // Reserve resources when max_subcompaction is not sufficient
617
0
      AcquireSubcompactionResources(
618
0
          (int)(num_planned_subcompactions - max_subcompactions_limit));
619
      // Subcompactions limit changes after acquiring additional resources.
620
      // Need to call GetSubcompactionsLimit() again to update the number
621
      // of planned subcompactions
622
0
      num_planned_subcompactions =
623
0
          std::min(num_planned_subcompactions, GetSubcompactionsLimit());
624
0
    } else {
625
0
      num_planned_subcompactions = max_subcompactions_limit;
626
0
    }
627
0
  } else {
628
0
    num_planned_subcompactions = GetSubcompactionsLimit();
629
0
  }
630
631
0
  TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0",
632
0
                           &num_planned_subcompactions);
633
0
  if (num_planned_subcompactions == 1) {
634
0
    return;
635
0
  }
636
637
  // Group the ranges into subcompactions
638
0
  uint64_t target_range_size = std::max(
639
0
      total_size / num_planned_subcompactions,
640
0
      MaxFileSizeForLevel(
641
0
          c->mutable_cf_options(), out_lvl,
642
0
          c->immutable_options().compaction_style, base_level,
643
0
          c->immutable_options().level_compaction_dynamic_level_bytes));
644
645
0
  if (target_range_size >= total_size) {
646
0
    return;
647
0
  }
648
649
0
  uint64_t next_threshold = target_range_size;
650
0
  uint64_t cumulative_size = 0;
651
0
  uint64_t num_actual_subcompactions = 1U;
652
0
  for (TableReader::Anchor& anchor : all_anchors) {
653
0
    cumulative_size += anchor.range_size;
654
0
    if (cumulative_size > next_threshold) {
655
0
      next_threshold += target_range_size;
656
0
      num_actual_subcompactions++;
657
0
      boundaries_.push_back(anchor.user_key);
658
0
    }
659
0
    if (num_actual_subcompactions == num_planned_subcompactions) {
660
0
      break;
661
0
    }
662
0
  }
663
0
  TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1",
664
0
                           &num_actual_subcompactions);
665
  // Shrink extra subcompactions resources when extra resrouces are acquired
666
0
  ShrinkSubcompactionResources(
667
0
      std::min((int)(num_planned_subcompactions - num_actual_subcompactions),
668
0
               extra_num_subcompaction_threads_reserved_));
669
0
}
670
671
5.33k
Status CompactionJob::Run() {
672
5.33k
  AutoThreadOperationStageUpdater stage_updater(
673
5.33k
      ThreadStatus::STAGE_COMPACTION_RUN);
674
5.33k
  TEST_SYNC_POINT("CompactionJob::Run():Start");
675
5.33k
  log_buffer_->FlushBufferToLog();
676
5.33k
  LogCompaction();
677
678
5.33k
  const size_t num_threads = compact_->sub_compact_states.size();
679
5.33k
  assert(num_threads > 0);
680
5.33k
  const uint64_t start_micros = db_options_.clock->NowMicros();
681
5.33k
  compact_->compaction->GetOrInitInputTableProperties();
682
683
  // Launch a thread for each of subcompactions 1...num_threads-1
684
5.33k
  std::vector<port::Thread> thread_pool;
685
5.33k
  thread_pool.reserve(num_threads - 1);
686
5.33k
  for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
687
0
    thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this,
688
0
                             &compact_->sub_compact_states[i]);
689
0
  }
690
691
  // Always schedule the first subcompaction (whether or not there are also
692
  // others) in the current thread to be efficient with resources
693
5.33k
  ProcessKeyValueCompaction(compact_->sub_compact_states.data());
694
695
  // Wait for all other threads (if there are any) to finish execution
696
5.33k
  for (auto& thread : thread_pool) {
697
0
    thread.join();
698
0
  }
699
700
5.33k
  internal_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros);
701
702
5.33k
  for (auto& state : compact_->sub_compact_states) {
703
5.33k
    internal_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros);
704
5.33k
    state.RemoveLastEmptyOutput();
705
5.33k
  }
706
707
5.33k
  RecordTimeToHistogram(stats_, COMPACTION_TIME,
708
5.33k
                        internal_stats_.output_level_stats.micros);
709
5.33k
  RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME,
710
5.33k
                        internal_stats_.output_level_stats.cpu_micros);
711
712
5.33k
  TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify");
713
714
  // Check if any thread encountered an error during execution
715
5.33k
  Status status;
716
5.33k
  IOStatus io_s;
717
5.33k
  bool wrote_new_blob_files = false;
718
719
5.33k
  for (const auto& state : compact_->sub_compact_states) {
720
5.33k
    if (!state.status.ok()) {
721
2.62k
      status = state.status;
722
2.62k
      io_s = state.io_status;
723
2.62k
      break;
724
2.62k
    }
725
726
2.71k
    if (state.Current().HasBlobFileAdditions()) {
727
0
      wrote_new_blob_files = true;
728
0
    }
729
2.71k
  }
730
731
5.33k
  if (io_status_.ok()) {
732
5.33k
    io_status_ = io_s;
733
5.33k
  }
734
5.33k
  if (status.ok()) {
735
2.71k
    constexpr IODebugContext* dbg = nullptr;
736
737
2.71k
    if (output_directory_) {
738
2.71k
      io_s = output_directory_->FsyncWithDirOptions(
739
2.71k
          IOOptions(), dbg,
740
2.71k
          DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
741
2.71k
    }
742
743
2.71k
    if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ &&
744
2.71k
        blob_output_directory_ != output_directory_) {
745
0
      io_s = blob_output_directory_->FsyncWithDirOptions(
746
0
          IOOptions(), dbg,
747
0
          DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced));
748
0
    }
749
2.71k
  }
750
5.33k
  if (io_status_.ok()) {
751
5.33k
    io_status_ = io_s;
752
5.33k
  }
753
5.33k
  if (status.ok()) {
754
2.71k
    status = io_s;
755
2.71k
  }
756
5.33k
  if (status.ok()) {
757
2.71k
    thread_pool.clear();
758
2.71k
    std::vector<const CompactionOutputs::Output*> files_output;
759
2.71k
    for (const auto& state : compact_->sub_compact_states) {
760
2.71k
      for (const auto& output : state.GetOutputs()) {
761
1.67k
        files_output.emplace_back(&output);
762
1.67k
      }
763
2.71k
    }
764
2.71k
    ColumnFamilyData* cfd = compact_->compaction->column_family_data();
765
2.71k
    std::atomic<size_t> next_file_idx(0);
766
2.71k
    auto verify_table = [&](Status& output_status) {
767
4.38k
      while (true) {
768
4.38k
        size_t file_idx = next_file_idx.fetch_add(1);
769
4.38k
        if (file_idx >= files_output.size()) {
770
2.71k
          break;
771
2.71k
        }
772
        // Verify that the table is usable
773
        // We set for_compaction to false and don't
774
        // OptimizeForCompactionTableRead here because this is a special case
775
        // after we finish the table building No matter whether
776
        // use_direct_io_for_flush_and_compaction is true, we will regard this
777
        // verification as user reads since the goal is to cache it here for
778
        // further user reads
779
1.67k
        ReadOptions verify_table_read_options(Env::IOActivity::kCompaction);
780
1.67k
        verify_table_read_options.rate_limiter_priority =
781
1.67k
            GetRateLimiterPriority();
782
1.67k
        InternalIterator* iter = cfd->table_cache()->NewIterator(
783
1.67k
            verify_table_read_options, file_options_,
784
1.67k
            cfd->internal_comparator(), files_output[file_idx]->meta,
785
1.67k
            /*range_del_agg=*/nullptr,
786
1.67k
            compact_->compaction->mutable_cf_options(),
787
1.67k
            /*table_reader_ptr=*/nullptr,
788
1.67k
            cfd->internal_stats()->GetFileReadHist(
789
1.67k
                compact_->compaction->output_level()),
790
1.67k
            TableReaderCaller::kCompactionRefill, /*arena=*/nullptr,
791
1.67k
            /*skip_filters=*/false, compact_->compaction->output_level(),
792
1.67k
            MaxFileSizeForL0MetaPin(compact_->compaction->mutable_cf_options()),
793
1.67k
            /*smallest_compaction_key=*/nullptr,
794
1.67k
            /*largest_compaction_key=*/nullptr,
795
1.67k
            /*allow_unprepared_value=*/false);
796
1.67k
        auto s = iter->status();
797
798
1.67k
        if (s.ok() && paranoid_file_checks_) {
799
0
          OutputValidator validator(cfd->internal_comparator(),
800
0
                                    /*_enable_hash=*/true);
801
0
          for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
802
0
            s = validator.Add(iter->key(), iter->value());
803
0
            if (!s.ok()) {
804
0
              break;
805
0
            }
806
0
          }
807
0
          if (s.ok()) {
808
0
            s = iter->status();
809
0
          }
810
0
          if (s.ok() &&
811
0
              !validator.CompareValidator(files_output[file_idx]->validator)) {
812
0
            s = Status::Corruption("Paranoid checksums do not match");
813
0
          }
814
0
        }
815
816
1.67k
        delete iter;
817
818
1.67k
        if (!s.ok()) {
819
0
          output_status = s;
820
0
          break;
821
0
        }
822
1.67k
      }
823
2.71k
    };
824
2.71k
    for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) {
825
0
      thread_pool.emplace_back(
826
0
          verify_table, std::ref(compact_->sub_compact_states[i].status));
827
0
    }
828
2.71k
    verify_table(compact_->sub_compact_states[0].status);
829
2.71k
    for (auto& thread : thread_pool) {
830
0
      thread.join();
831
0
    }
832
833
2.71k
    for (const auto& state : compact_->sub_compact_states) {
834
2.71k
      if (!state.status.ok()) {
835
0
        status = state.status;
836
0
        break;
837
0
      }
838
2.71k
    }
839
2.71k
  }
840
841
5.33k
  ReleaseSubcompactionResources();
842
5.33k
  TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources");
843
844
5.33k
  for (const auto& state : compact_->sub_compact_states) {
845
5.33k
    for (const auto& output : state.GetOutputs()) {
846
1.67k
      auto fn =
847
1.67k
          TableFileName(state.compaction->immutable_options().cf_paths,
848
1.67k
                        output.meta.fd.GetNumber(), output.meta.fd.GetPathId());
849
1.67k
      compact_->compaction->SetOutputTableProperties(fn,
850
1.67k
                                                     output.table_properties);
851
1.67k
    }
852
5.33k
  }
853
854
  // Before the compaction starts, is_remote_compaction was set to true if
855
  // compaction_service is set. We now know whether each sub_compaction was
856
  // done remotely or not. Reset is_remote_compaction back to false and allow
857
  // AggregateCompactionStats() to set the right value.
858
5.33k
  job_stats_->is_remote_compaction = false;
859
860
  // Finish up all bookkeeping to unify the subcompaction results.
861
5.33k
  compact_->AggregateCompactionStats(internal_stats_, *job_stats_);
862
863
5.33k
  uint64_t num_input_range_del = 0;
864
5.33k
  bool ok = BuildStatsFromInputTableProperties(&num_input_range_del);
865
  // (Sub)compactions returned ok, do sanity check on the number of input
866
  // keys.
867
5.33k
  if (status.ok() && ok) {
868
2.71k
    if (job_stats_->has_num_input_records) {
869
2.71k
      status = VerifyInputRecordCount(num_input_range_del);
870
2.71k
      if (!status.ok()) {
871
0
        ROCKS_LOG_WARN(
872
0
            db_options_.info_log, "[%s] [JOB %d] Compaction with status: %s",
873
0
            compact_->compaction->column_family_data()->GetName().c_str(),
874
0
            job_context_->job_id, status.ToString().c_str());
875
0
      }
876
2.71k
    }
877
2.71k
    UpdateCompactionJobInputStats(internal_stats_, num_input_range_del);
878
2.71k
  }
879
5.33k
  UpdateCompactionJobOutputStats(internal_stats_);
880
881
  // Verify number of output records
882
  // Only verify on table with format collects table properties
883
5.33k
  const auto& mutable_cf_options = compact_->compaction->mutable_cf_options();
884
5.33k
  if (status.ok() &&
885
5.33k
      (mutable_cf_options.table_factory->IsInstanceOf(
886
2.71k
           TableFactory::kBlockBasedTableName()) ||
887
2.71k
       mutable_cf_options.table_factory->IsInstanceOf(
888
0
           TableFactory::kPlainTableName())) &&
889
5.33k
      db_options_.compaction_verify_record_count) {
890
2.71k
    uint64_t total_output_num = 0;
891
2.71k
    for (const auto& state : compact_->sub_compact_states) {
892
2.71k
      for (const auto& output : state.GetOutputs()) {
893
1.67k
        total_output_num += output.table_properties->num_entries -
894
1.67k
                            output.table_properties->num_range_deletions;
895
1.67k
      }
896
2.71k
    }
897
898
2.71k
    uint64_t expected = internal_stats_.output_level_stats.num_output_records;
899
2.71k
    if (internal_stats_.has_proximal_level_output) {
900
0
      expected += internal_stats_.proximal_level_stats.num_output_records;
901
0
    }
902
2.71k
    if (expected != total_output_num) {
903
0
      char scratch[2345];
904
0
      compact_->compaction->Summary(scratch, sizeof(scratch));
905
0
      std::string msg =
906
0
          "Number of keys in compaction output SST files does not match "
907
0
          "number of keys added. Expected " +
908
0
          std::to_string(expected) + " but there are " +
909
0
          std::to_string(total_output_num) +
910
0
          " in output SST files. Compaction summary: " + scratch;
911
0
      ROCKS_LOG_WARN(
912
0
          db_options_.info_log, "[%s] [JOB %d] Compaction with status: %s",
913
0
          compact_->compaction->column_family_data()->GetName().c_str(),
914
0
          job_context_->job_id, msg.c_str());
915
0
      status = Status::Corruption(msg);
916
0
    }
917
2.71k
  }
918
919
5.33k
  RecordCompactionIOStats();
920
5.33k
  LogFlush(db_options_.info_log);
921
5.33k
  TEST_SYNC_POINT("CompactionJob::Run():End");
922
5.33k
  compact_->status = status;
923
5.33k
  TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():EndStatusSet", &status);
924
5.33k
  return status;
925
5.33k
}
926
927
5.33k
Status CompactionJob::Install(bool* compaction_released) {
928
5.33k
  assert(compact_);
929
930
5.33k
  AutoThreadOperationStageUpdater stage_updater(
931
5.33k
      ThreadStatus::STAGE_COMPACTION_INSTALL);
932
5.33k
  db_mutex_->AssertHeld();
933
5.33k
  Status status = compact_->status;
934
935
5.33k
  ColumnFamilyData* cfd = compact_->compaction->column_family_data();
936
5.33k
  assert(cfd);
937
938
5.33k
  int output_level = compact_->compaction->output_level();
939
5.33k
  cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_,
940
5.33k
                                            internal_stats_);
941
942
5.33k
  if (status.ok()) {
943
2.71k
    status = InstallCompactionResults(compaction_released);
944
2.71k
  }
945
5.33k
  if (!versions_->io_status().ok()) {
946
0
    io_status_ = versions_->io_status();
947
0
  }
948
949
5.33k
  VersionStorageInfo::LevelSummaryStorage tmp;
950
5.33k
  auto vstorage = cfd->current()->storage_info();
951
5.33k
  const auto& stats = internal_stats_.output_level_stats;
952
953
5.33k
  double read_write_amp = 0.0;
954
5.33k
  double write_amp = 0.0;
955
5.33k
  double bytes_read_per_sec = 0;
956
5.33k
  double bytes_written_per_sec = 0;
957
958
5.33k
  const uint64_t bytes_read_non_output_and_blob =
959
5.33k
      stats.bytes_read_non_output_levels + stats.bytes_read_blob;
960
5.33k
  const uint64_t bytes_read_all =
961
5.33k
      stats.bytes_read_output_level + bytes_read_non_output_and_blob;
962
5.33k
  const uint64_t bytes_written_all =
963
5.33k
      stats.bytes_written + stats.bytes_written_blob;
964
965
5.33k
  if (bytes_read_non_output_and_blob > 0) {
966
3.65k
    read_write_amp = (bytes_written_all + bytes_read_all) /
967
3.65k
                     static_cast<double>(bytes_read_non_output_and_blob);
968
3.65k
    write_amp =
969
3.65k
        bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob);
970
3.65k
  }
971
5.33k
  if (stats.micros > 0) {
972
5.33k
    bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros);
973
5.33k
    bytes_written_per_sec =
974
5.33k
        bytes_written_all / static_cast<double>(stats.micros);
975
5.33k
  }
976
977
5.33k
  const std::string& column_family_name = cfd->GetName();
978
979
5.33k
  constexpr double kMB = 1048576.0;
980
981
5.33k
  ROCKS_LOG_BUFFER(
982
5.33k
      log_buffer_,
983
5.33k
      "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, "
984
5.33k
      "files in(%d, %d) filtered(%d, %d) out(%d +%d blob) "
985
5.33k
      "MB in(%.1f, %.1f +%.1f blob) filtered(%.1f, %.1f) out(%.1f +%.1f blob), "
986
5.33k
      "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64
987
5.33k
      ", records dropped: %" PRIu64 " output_compression: %s\n",
988
5.33k
      column_family_name.c_str(), vstorage->LevelSummary(&tmp),
989
5.33k
      bytes_read_per_sec, bytes_written_per_sec,
990
5.33k
      compact_->compaction->output_level(),
991
5.33k
      stats.num_input_files_in_non_output_levels,
992
5.33k
      stats.num_input_files_in_output_level,
993
5.33k
      stats.num_filtered_input_files_in_non_output_levels,
994
5.33k
      stats.num_filtered_input_files_in_output_level, stats.num_output_files,
995
5.33k
      stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB,
996
5.33k
      stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB,
997
5.33k
      stats.bytes_skipped_non_output_levels / kMB,
998
5.33k
      stats.bytes_skipped_output_level / kMB, stats.bytes_written / kMB,
999
5.33k
      stats.bytes_written_blob / kMB, read_write_amp, write_amp,
1000
5.33k
      status.ToString().c_str(), stats.num_input_records,
1001
5.33k
      stats.num_dropped_records,
1002
5.33k
      CompressionTypeToString(compact_->compaction->output_compression())
1003
5.33k
          .c_str());
1004
1005
5.33k
  const auto& blob_files = vstorage->GetBlobFiles();
1006
5.33k
  if (!blob_files.empty()) {
1007
0
    assert(blob_files.front());
1008
0
    assert(blob_files.back());
1009
1010
0
    ROCKS_LOG_BUFFER(
1011
0
        log_buffer_,
1012
0
        "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n",
1013
0
        column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(),
1014
0
        blob_files.back()->GetBlobFileNumber());
1015
0
  }
1016
1017
5.33k
  if (internal_stats_.has_proximal_level_output) {
1018
0
    ROCKS_LOG_BUFFER(log_buffer_,
1019
0
                     "[%s] has Proximal Level output: %" PRIu64
1020
0
                     ", level %d, number of files: %" PRIu64
1021
0
                     ", number of records: %" PRIu64,
1022
0
                     column_family_name.c_str(),
1023
0
                     internal_stats_.proximal_level_stats.bytes_written,
1024
0
                     compact_->compaction->GetProximalLevel(),
1025
0
                     internal_stats_.proximal_level_stats.num_output_files,
1026
0
                     internal_stats_.proximal_level_stats.num_output_records);
1027
0
  }
1028
1029
5.33k
  TEST_SYNC_POINT_CALLBACK(
1030
5.33k
      "CompactionJob::Install:AfterUpdateCompactionJobStats", job_stats_);
1031
1032
5.33k
  auto stream = event_logger_->LogToBuffer(log_buffer_, 8192);
1033
5.33k
  stream << "job" << job_id_ << "event" << "compaction_finished"
1034
5.33k
         << "compaction_time_micros" << stats.micros
1035
5.33k
         << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level"
1036
5.33k
         << compact_->compaction->output_level() << "num_output_files"
1037
5.33k
         << stats.num_output_files << "total_output_size"
1038
5.33k
         << stats.bytes_written;
1039
1040
5.33k
  if (stats.num_output_files_blob > 0) {
1041
0
    stream << "num_blob_output_files" << stats.num_output_files_blob
1042
0
           << "total_blob_output_size" << stats.bytes_written_blob;
1043
0
  }
1044
1045
5.33k
  stream << "num_input_records" << stats.num_input_records
1046
5.33k
         << "num_output_records" << stats.num_output_records
1047
5.33k
         << "num_subcompactions" << compact_->sub_compact_states.size()
1048
5.33k
         << "output_compression"
1049
5.33k
         << CompressionTypeToString(compact_->compaction->output_compression());
1050
1051
5.33k
  stream << "num_single_delete_mismatches"
1052
5.33k
         << job_stats_->num_single_del_mismatch;
1053
5.33k
  stream << "num_single_delete_fallthrough"
1054
5.33k
         << job_stats_->num_single_del_fallthru;
1055
1056
5.33k
  if (measure_io_stats_) {
1057
0
    stream << "file_write_nanos" << job_stats_->file_write_nanos;
1058
0
    stream << "file_range_sync_nanos" << job_stats_->file_range_sync_nanos;
1059
0
    stream << "file_fsync_nanos" << job_stats_->file_fsync_nanos;
1060
0
    stream << "file_prepare_write_nanos"
1061
0
           << job_stats_->file_prepare_write_nanos;
1062
0
  }
1063
1064
5.33k
  stream << "lsm_state";
1065
5.33k
  stream.StartArray();
1066
42.7k
  for (int level = 0; level < vstorage->num_levels(); ++level) {
1067
37.3k
    stream << vstorage->NumLevelFiles(level);
1068
37.3k
  }
1069
5.33k
  stream.EndArray();
1070
1071
5.33k
  if (!blob_files.empty()) {
1072
0
    assert(blob_files.front());
1073
0
    stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber();
1074
1075
0
    assert(blob_files.back());
1076
0
    stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber();
1077
0
  }
1078
1079
5.33k
  if (internal_stats_.has_proximal_level_output) {
1080
0
    InternalStats::CompactionStats& pl_stats =
1081
0
        internal_stats_.proximal_level_stats;
1082
0
    stream << "proximal_level_num_output_files" << pl_stats.num_output_files;
1083
0
    stream << "proximal_level_bytes_written" << pl_stats.bytes_written;
1084
0
    stream << "proximal_level_num_output_records"
1085
0
           << pl_stats.num_output_records;
1086
0
    stream << "proximal_level_num_output_files_blob"
1087
0
           << pl_stats.num_output_files_blob;
1088
0
    stream << "proximal_level_bytes_written_blob"
1089
0
           << pl_stats.bytes_written_blob;
1090
0
  }
1091
1092
5.33k
  CleanupCompaction();
1093
5.33k
  return status;
1094
5.33k
}
1095
1096
void CompactionJob::NotifyOnSubcompactionBegin(
1097
5.33k
    SubcompactionState* sub_compact) {
1098
5.33k
  Compaction* c = compact_->compaction;
1099
1100
5.33k
  if (db_options_.listeners.empty()) {
1101
5.33k
    return;
1102
5.33k
  }
1103
0
  if (shutting_down_->load(std::memory_order_acquire)) {
1104
0
    return;
1105
0
  }
1106
0
  if (c->is_manual_compaction() &&
1107
0
      manual_compaction_canceled_.load(std::memory_order_acquire)) {
1108
0
    return;
1109
0
  }
1110
1111
0
  sub_compact->notify_on_subcompaction_completion = true;
1112
1113
0
  SubcompactionJobInfo info{};
1114
0
  sub_compact->BuildSubcompactionJobInfo(info);
1115
0
  info.job_id = static_cast<int>(job_id_);
1116
0
  info.thread_id = env_->GetThreadID();
1117
1118
0
  for (const auto& listener : db_options_.listeners) {
1119
0
    listener->OnSubcompactionBegin(info);
1120
0
  }
1121
0
  info.status.PermitUncheckedError();
1122
0
}
1123
1124
void CompactionJob::NotifyOnSubcompactionCompleted(
1125
5.33k
    SubcompactionState* sub_compact) {
1126
5.33k
  if (db_options_.listeners.empty()) {
1127
5.33k
    return;
1128
5.33k
  }
1129
0
  if (shutting_down_->load(std::memory_order_acquire)) {
1130
0
    return;
1131
0
  }
1132
1133
0
  if (sub_compact->notify_on_subcompaction_completion == false) {
1134
0
    return;
1135
0
  }
1136
1137
0
  SubcompactionJobInfo info{};
1138
0
  sub_compact->BuildSubcompactionJobInfo(info);
1139
0
  info.job_id = static_cast<int>(job_id_);
1140
0
  info.thread_id = env_->GetThreadID();
1141
1142
0
  for (const auto& listener : db_options_.listeners) {
1143
0
    listener->OnSubcompactionCompleted(info);
1144
0
  }
1145
0
}
1146
1147
5.33k
void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
1148
5.33k
  assert(sub_compact);
1149
5.33k
  assert(sub_compact->compaction);
1150
5.33k
  if (db_options_.compaction_service) {
1151
0
    CompactionServiceJobStatus comp_status =
1152
0
        ProcessKeyValueCompactionWithCompactionService(sub_compact);
1153
0
    if (comp_status != CompactionServiceJobStatus::kUseLocal) {
1154
0
      return;
1155
0
    }
1156
    // fallback to local compaction
1157
0
    assert(comp_status == CompactionServiceJobStatus::kUseLocal);
1158
0
    sub_compact->compaction_job_stats.is_remote_compaction = false;
1159
0
  }
1160
1161
5.33k
  uint64_t prev_cpu_micros = db_options_.clock->CPUMicros();
1162
1163
5.33k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1164
1165
  // Create compaction filter and fail the compaction if
1166
  // IgnoreSnapshots() = false because it is not supported anymore
1167
5.33k
  const CompactionFilter* compaction_filter = cfd->ioptions().compaction_filter;
1168
5.33k
  std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr;
1169
5.33k
  if (compaction_filter == nullptr) {
1170
5.33k
    compaction_filter_from_factory =
1171
5.33k
        sub_compact->compaction->CreateCompactionFilter();
1172
5.33k
    compaction_filter = compaction_filter_from_factory.get();
1173
5.33k
  }
1174
5.33k
  if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) {
1175
0
    sub_compact->status = Status::NotSupported(
1176
0
        "CompactionFilter::IgnoreSnapshots() = false is not supported "
1177
0
        "anymore.");
1178
0
    return;
1179
0
  }
1180
1181
5.33k
  NotifyOnSubcompactionBegin(sub_compact);
1182
1183
  // This is assigned after creation of SubcompactionState to simplify that
1184
  // creation across both CompactionJob and CompactionServiceCompactionJob
1185
5.33k
  sub_compact->AssignRangeDelAggregator(
1186
5.33k
      std::make_unique<CompactionRangeDelAggregator>(
1187
5.33k
          &cfd->internal_comparator(), job_context_->snapshot_seqs,
1188
5.33k
          &full_history_ts_low_, &trim_ts_));
1189
1190
  // TODO: since we already use C++17, should use
1191
  // std::optional<const Slice> instead.
1192
5.33k
  const std::optional<Slice> start = sub_compact->start;
1193
5.33k
  const std::optional<Slice> end = sub_compact->end;
1194
1195
5.33k
  std::optional<Slice> start_without_ts;
1196
5.33k
  std::optional<Slice> end_without_ts;
1197
1198
5.33k
  ReadOptions read_options;
1199
5.33k
  read_options.verify_checksums = true;
1200
5.33k
  read_options.fill_cache = false;
1201
5.33k
  read_options.rate_limiter_priority = GetRateLimiterPriority();
1202
5.33k
  read_options.io_activity = Env::IOActivity::kCompaction;
1203
  // Compaction iterators shouldn't be confined to a single prefix.
1204
  // Compactions use Seek() for
1205
  // (a) concurrent compactions,
1206
  // (b) CompactionFilter::Decision::kRemoveAndSkipUntil.
1207
5.33k
  read_options.total_order_seek = true;
1208
1209
5.33k
  const WriteOptions write_options(Env::IOPriority::IO_LOW,
1210
5.33k
                                   Env::IOActivity::kCompaction);
1211
1212
  // Remove the timestamps from boundaries because boundaries created in
1213
  // GenSubcompactionBoundaries doesn't strip away the timestamp.
1214
5.33k
  size_t ts_sz = cfd->user_comparator()->timestamp_size();
1215
5.33k
  if (start.has_value()) {
1216
0
    read_options.iterate_lower_bound = &(*start);
1217
0
    if (ts_sz > 0) {
1218
0
      start_without_ts = StripTimestampFromUserKey(*start, ts_sz);
1219
0
      read_options.iterate_lower_bound = &(*start_without_ts);
1220
0
    }
1221
0
  }
1222
5.33k
  if (end.has_value()) {
1223
0
    read_options.iterate_upper_bound = &(*end);
1224
0
    if (ts_sz > 0) {
1225
0
      end_without_ts = StripTimestampFromUserKey(*end, ts_sz);
1226
0
      read_options.iterate_upper_bound = &(*end_without_ts);
1227
0
    }
1228
0
  }
1229
1230
  // Although the v2 aggregator is what the level iterator(s) know about,
1231
  // the AddTombstones calls will be propagated down to the v1 aggregator.
1232
5.33k
  std::unique_ptr<InternalIterator> raw_input(versions_->MakeInputIterator(
1233
5.33k
      read_options, sub_compact->compaction, sub_compact->RangeDelAgg(),
1234
5.33k
      file_options_for_read_, start, end));
1235
5.33k
  InternalIterator* input = raw_input.get();
1236
1237
5.33k
  IterKey start_ikey;
1238
5.33k
  IterKey end_ikey;
1239
5.33k
  Slice start_slice;
1240
5.33k
  Slice end_slice;
1241
5.33k
  Slice start_user_key{};
1242
5.33k
  Slice end_user_key{};
1243
1244
5.33k
  static constexpr char kMaxTs[] =
1245
5.33k
      "\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff";
1246
5.33k
  Slice ts_slice;
1247
5.33k
  std::string max_ts;
1248
5.33k
  if (ts_sz > 0) {
1249
0
    if (ts_sz <= strlen(kMaxTs)) {
1250
0
      ts_slice = Slice(kMaxTs, ts_sz);
1251
0
    } else {
1252
0
      max_ts = std::string(ts_sz, '\xff');
1253
0
      ts_slice = Slice(max_ts);
1254
0
    }
1255
0
  }
1256
1257
5.33k
  if (start.has_value()) {
1258
0
    start_ikey.SetInternalKey(*start, kMaxSequenceNumber, kValueTypeForSeek);
1259
0
    if (ts_sz > 0) {
1260
0
      start_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
1261
0
                                   &ts_slice);
1262
0
    }
1263
0
    start_slice = start_ikey.GetInternalKey();
1264
0
    start_user_key = start_ikey.GetUserKey();
1265
0
  }
1266
5.33k
  if (end.has_value()) {
1267
0
    end_ikey.SetInternalKey(*end, kMaxSequenceNumber, kValueTypeForSeek);
1268
0
    if (ts_sz > 0) {
1269
0
      end_ikey.UpdateInternalKey(kMaxSequenceNumber, kValueTypeForSeek,
1270
0
                                 &ts_slice);
1271
0
    }
1272
0
    end_slice = end_ikey.GetInternalKey();
1273
0
    end_user_key = end_ikey.GetUserKey();
1274
0
  }
1275
1276
5.33k
  std::unique_ptr<InternalIterator> clip;
1277
5.33k
  if (start.has_value() || end.has_value()) {
1278
0
    clip = std::make_unique<ClippingIterator>(
1279
0
        raw_input.get(), start.has_value() ? &start_slice : nullptr,
1280
0
        end.has_value() ? &end_slice : nullptr, &cfd->internal_comparator());
1281
0
    input = clip.get();
1282
0
  }
1283
1284
5.33k
  std::unique_ptr<InternalIterator> blob_counter;
1285
1286
5.33k
  if (sub_compact->compaction->DoesInputReferenceBlobFiles()) {
1287
0
    BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter();
1288
0
    blob_counter = std::make_unique<BlobCountingIterator>(input, meter);
1289
0
    input = blob_counter.get();
1290
0
  }
1291
1292
5.33k
  std::unique_ptr<InternalIterator> trim_history_iter;
1293
5.33k
  if (ts_sz > 0 && !trim_ts_.empty()) {
1294
0
    trim_history_iter = std::make_unique<HistoryTrimmingIterator>(
1295
0
        input, cfd->user_comparator(), trim_ts_);
1296
0
    input = trim_history_iter.get();
1297
0
  }
1298
1299
5.33k
  input->SeekToFirst();
1300
1301
5.33k
  AutoThreadOperationStageUpdater stage_updater(
1302
5.33k
      ThreadStatus::STAGE_COMPACTION_PROCESS_KV);
1303
1304
  // I/O measurement variables
1305
5.33k
  PerfLevel prev_perf_level = PerfLevel::kEnableTime;
1306
5.33k
  const uint64_t kRecordStatsEvery = 1000;
1307
5.33k
  uint64_t prev_write_nanos = 0;
1308
5.33k
  uint64_t prev_fsync_nanos = 0;
1309
5.33k
  uint64_t prev_range_sync_nanos = 0;
1310
5.33k
  uint64_t prev_prepare_write_nanos = 0;
1311
5.33k
  uint64_t prev_cpu_write_nanos = 0;
1312
5.33k
  uint64_t prev_cpu_read_nanos = 0;
1313
5.33k
  if (measure_io_stats_) {
1314
0
    prev_perf_level = GetPerfLevel();
1315
0
    SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex);
1316
0
    prev_write_nanos = IOSTATS(write_nanos);
1317
0
    prev_fsync_nanos = IOSTATS(fsync_nanos);
1318
0
    prev_range_sync_nanos = IOSTATS(range_sync_nanos);
1319
0
    prev_prepare_write_nanos = IOSTATS(prepare_write_nanos);
1320
0
    prev_cpu_write_nanos = IOSTATS(cpu_write_nanos);
1321
0
    prev_cpu_read_nanos = IOSTATS(cpu_read_nanos);
1322
0
  }
1323
1324
5.33k
  MergeHelper merge(
1325
5.33k
      env_, cfd->user_comparator(), cfd->ioptions().merge_operator.get(),
1326
5.33k
      compaction_filter, db_options_.info_log.get(),
1327
5.33k
      false /* internal key corruption is expected */,
1328
5.33k
      job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker,
1329
5.33k
      compact_->compaction->level(), db_options_.stats);
1330
1331
5.33k
  const auto& mutable_cf_options =
1332
5.33k
      sub_compact->compaction->mutable_cf_options();
1333
1334
5.33k
  std::vector<std::string> blob_file_paths;
1335
1336
  // TODO: BlobDB to support output_to_proximal_level compaction, which needs
1337
  //  2 builders, so may need to move to `CompactionOutputs`
1338
5.33k
  std::unique_ptr<BlobFileBuilder> blob_file_builder(
1339
5.33k
      (mutable_cf_options.enable_blob_files &&
1340
5.33k
       sub_compact->compaction->output_level() >=
1341
0
           mutable_cf_options.blob_file_starting_level)
1342
5.33k
          ? new BlobFileBuilder(
1343
0
                versions_, fs_.get(),
1344
0
                &sub_compact->compaction->immutable_options(),
1345
0
                &mutable_cf_options, &file_options_, &write_options, db_id_,
1346
0
                db_session_id_, job_id_, cfd->GetID(), cfd->GetName(),
1347
0
                write_hint_, io_tracer_, blob_callback_,
1348
0
                BlobFileCreationReason::kCompaction, &blob_file_paths,
1349
0
                sub_compact->Current().GetBlobFileAdditionsPtr())
1350
5.33k
          : nullptr);
1351
1352
5.33k
  TEST_SYNC_POINT("CompactionJob::Run():Inprogress");
1353
5.33k
  TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1",
1354
5.33k
                           static_cast<void*>(const_cast<std::atomic<bool>*>(
1355
5.33k
                               &manual_compaction_canceled_)));
1356
1357
5.33k
  const std::string* const full_history_ts_low =
1358
5.33k
      full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_;
1359
5.33k
  const SequenceNumber job_snapshot_seq =
1360
5.33k
      job_context_ ? job_context_->GetJobSnapshotSequence()
1361
5.33k
                   : kMaxSequenceNumber;
1362
1363
5.33k
  auto c_iter = std::make_unique<CompactionIterator>(
1364
5.33k
      input, cfd->user_comparator(), &merge, versions_->LastSequence(),
1365
5.33k
      &(job_context_->snapshot_seqs), earliest_snapshot_,
1366
5.33k
      job_context_->earliest_write_conflict_snapshot, job_snapshot_seq,
1367
5.33k
      job_context_->snapshot_checker, env_,
1368
5.33k
      ShouldReportDetailedTime(env_, stats_),
1369
5.33k
      /*expect_valid_internal_key=*/true, sub_compact->RangeDelAgg(),
1370
5.33k
      blob_file_builder.get(), db_options_.allow_data_in_errors,
1371
5.33k
      db_options_.enforce_single_del_contracts, manual_compaction_canceled_,
1372
5.33k
      sub_compact->compaction
1373
5.33k
          ->DoesInputReferenceBlobFiles() /* must_count_input_entries */,
1374
5.33k
      sub_compact->compaction, compaction_filter, shutting_down_,
1375
5.33k
      db_options_.info_log, full_history_ts_low, preserve_seqno_after_);
1376
5.33k
  c_iter->SeekToFirst();
1377
1378
5.33k
  const auto& c_iter_stats = c_iter->iter_stats();
1379
1380
  // define the open and close functions for the compaction files, which will be
1381
  // used open/close output files when needed.
1382
5.33k
  const CompactionFileOpenFunc open_file_func =
1383
5.33k
      [this, sub_compact](CompactionOutputs& outputs) {
1384
2.33k
        return this->OpenCompactionOutputFile(sub_compact, outputs);
1385
2.33k
      };
1386
1387
5.33k
  const CompactionFileCloseFunc close_file_func =
1388
5.33k
      [this, sub_compact, start_user_key, end_user_key](
1389
5.33k
          CompactionOutputs& outputs, const Status& status,
1390
5.33k
          const Slice& next_table_min_key) {
1391
2.33k
        return this->FinishCompactionOutputFile(
1392
2.33k
            status, sub_compact, outputs, next_table_min_key,
1393
2.33k
            sub_compact->start.has_value() ? &start_user_key : nullptr,
1394
2.33k
            sub_compact->end.has_value() ? &end_user_key : nullptr);
1395
2.33k
      };
1396
1397
5.33k
  Status status;
1398
5.33k
  TEST_SYNC_POINT_CALLBACK(
1399
5.33k
      "CompactionJob::ProcessKeyValueCompaction()::Processing",
1400
5.33k
      static_cast<void*>(const_cast<Compaction*>(sub_compact->compaction)));
1401
5.33k
  uint64_t last_cpu_micros = prev_cpu_micros;
1402
8.01k
  while (status.ok() && !cfd->IsDropped() && c_iter->Valid()) {
1403
    // Invariant: c_iter.status() is guaranteed to be OK if c_iter->Valid()
1404
    // returns true.
1405
2.68k
    assert(!end.has_value() ||
1406
2.68k
           cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0);
1407
1408
2.68k
    if (c_iter_stats.num_input_records % kRecordStatsEvery ==
1409
2.68k
        kRecordStatsEvery - 1) {
1410
0
      RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1411
0
      c_iter->ResetRecordCounts();
1412
0
      RecordCompactionIOStats();
1413
1414
0
      uint64_t cur_cpu_micros = db_options_.clock->CPUMicros();
1415
0
      assert(cur_cpu_micros >= last_cpu_micros);
1416
0
      RecordTick(stats_, COMPACTION_CPU_TOTAL_TIME,
1417
0
                 cur_cpu_micros - last_cpu_micros);
1418
0
      last_cpu_micros = cur_cpu_micros;
1419
0
    }
1420
1421
2.68k
    const auto& ikey = c_iter->ikey();
1422
2.68k
    bool use_proximal_output = ikey.sequence > proximal_after_seqno_;
1423
#ifndef NDEBUG
1424
    if (sub_compact->compaction->SupportsPerKeyPlacement()) {
1425
      // Could be overridden by unittest
1426
      PerKeyPlacementContext context(sub_compact->compaction->output_level(),
1427
                                     ikey.user_key, c_iter->value(),
1428
                                     ikey.sequence, use_proximal_output);
1429
      TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context",
1430
                               &context);
1431
      if (use_proximal_output) {
1432
        // Verify that entries sent to the proximal level are within the
1433
        // allowed range (because the input key range of the last level could
1434
        // be larger than the allowed output key range of the proximal
1435
        // level). This check uses user keys (ignores sequence numbers) because
1436
        // compaction boundaries are a "clean cut" between user keys (see
1437
        // CompactionPicker::ExpandInputsToCleanCut()), which is especially
1438
        // important when preferred sequence numbers has been swapped in for
1439
        // kTypeValuePreferredSeqno / TimedPut.
1440
        sub_compact->compaction->TEST_AssertWithinProximalLevelOutputRange(
1441
            c_iter->user_key());
1442
      }
1443
    } else {
1444
      assert(proximal_after_seqno_ == kMaxSequenceNumber);
1445
      assert(!use_proximal_output);
1446
    }
1447
#endif  // NDEBUG
1448
1449
    // Add current compaction_iterator key to target compaction output, if the
1450
    // output file needs to be close or open, it will call the `open_file_func`
1451
    // and `close_file_func`.
1452
    // TODO: it would be better to have the compaction file open/close moved
1453
    // into `CompactionOutputs` which has the output file information.
1454
2.68k
    status = sub_compact->AddToOutput(*c_iter, use_proximal_output,
1455
2.68k
                                      open_file_func, close_file_func);
1456
2.68k
    if (!status.ok()) {
1457
0
      break;
1458
0
    }
1459
1460
2.68k
    TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2",
1461
2.68k
                             static_cast<void*>(const_cast<std::atomic<bool>*>(
1462
2.68k
                                 &manual_compaction_canceled_)));
1463
2.68k
    c_iter->Next();
1464
2.68k
    if (c_iter->status().IsManualCompactionPaused()) {
1465
0
      break;
1466
0
    }
1467
1468
#ifndef NDEBUG
1469
    bool stop = false;
1470
    TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop",
1471
                             static_cast<void*>(&stop));
1472
    if (stop) {
1473
      break;
1474
    }
1475
#endif  // NDEBUG
1476
2.68k
  }
1477
1478
  // This number may not be accurate when CompactionIterator was created
1479
  // with `must_count_input_entries=false`.
1480
5.33k
  assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() ||
1481
5.33k
         c_iter->HasNumInputEntryScanned());
1482
5.33k
  sub_compact->compaction_job_stats.has_num_input_records =
1483
5.33k
      c_iter->HasNumInputEntryScanned();
1484
5.33k
  sub_compact->compaction_job_stats.num_input_records =
1485
5.33k
      c_iter->NumInputEntryScanned();
1486
5.33k
  sub_compact->compaction_job_stats.num_blobs_read =
1487
5.33k
      c_iter_stats.num_blobs_read;
1488
5.33k
  sub_compact->compaction_job_stats.total_blob_bytes_read =
1489
5.33k
      c_iter_stats.total_blob_bytes_read;
1490
5.33k
  sub_compact->compaction_job_stats.num_input_deletion_records =
1491
5.33k
      c_iter_stats.num_input_deletion_records;
1492
5.33k
  sub_compact->compaction_job_stats.num_corrupt_keys =
1493
5.33k
      c_iter_stats.num_input_corrupt_records;
1494
5.33k
  sub_compact->compaction_job_stats.num_single_del_fallthru =
1495
5.33k
      c_iter_stats.num_single_del_fallthru;
1496
5.33k
  sub_compact->compaction_job_stats.num_single_del_mismatch =
1497
5.33k
      c_iter_stats.num_single_del_mismatch;
1498
5.33k
  sub_compact->compaction_job_stats.total_input_raw_key_bytes +=
1499
5.33k
      c_iter_stats.total_input_raw_key_bytes;
1500
5.33k
  sub_compact->compaction_job_stats.total_input_raw_value_bytes +=
1501
5.33k
      c_iter_stats.total_input_raw_value_bytes;
1502
1503
5.33k
  RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME,
1504
5.33k
             c_iter_stats.total_filter_time);
1505
1506
5.33k
  if (c_iter_stats.num_blobs_relocated > 0) {
1507
0
    RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED,
1508
0
               c_iter_stats.num_blobs_relocated);
1509
0
  }
1510
5.33k
  if (c_iter_stats.total_blob_bytes_relocated > 0) {
1511
0
    RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED,
1512
0
               c_iter_stats.total_blob_bytes_relocated);
1513
0
  }
1514
1515
5.33k
  RecordDroppedKeys(c_iter_stats, &sub_compact->compaction_job_stats);
1516
5.33k
  RecordCompactionIOStats();
1517
1518
5.33k
  if (status.ok() && cfd->IsDropped()) {
1519
0
    status =
1520
0
        Status::ColumnFamilyDropped("Column family dropped during compaction");
1521
0
  }
1522
5.33k
  if ((status.ok() || status.IsColumnFamilyDropped()) &&
1523
5.33k
      shutting_down_->load(std::memory_order_relaxed)) {
1524
2.62k
    status = Status::ShutdownInProgress("Database shutdown");
1525
2.62k
  }
1526
5.33k
  if ((status.ok() || status.IsColumnFamilyDropped()) &&
1527
5.33k
      (manual_compaction_canceled_.load(std::memory_order_relaxed))) {
1528
0
    status = Status::Incomplete(Status::SubCode::kManualCompactionPaused);
1529
0
  }
1530
5.33k
  if (status.ok()) {
1531
2.71k
    status = input->status();
1532
2.71k
  }
1533
5.33k
  if (status.ok()) {
1534
2.71k
    status = c_iter->status();
1535
2.71k
  }
1536
1537
  // Call FinishCompactionOutputFile() even if status is not ok: it needs to
1538
  // close the output files. Open file function is also passed, in case there's
1539
  // only range-dels, no file was opened, to save the range-dels, it need to
1540
  // create a new output file.
1541
5.33k
  status = sub_compact->CloseCompactionFiles(status, open_file_func,
1542
5.33k
                                             close_file_func);
1543
1544
5.33k
  if (blob_file_builder) {
1545
0
    if (status.ok()) {
1546
0
      status = blob_file_builder->Finish();
1547
0
    } else {
1548
0
      blob_file_builder->Abandon(status);
1549
0
    }
1550
0
    blob_file_builder.reset();
1551
0
    sub_compact->Current().UpdateBlobStats();
1552
0
  }
1553
1554
5.33k
  uint64_t cur_cpu_micros = db_options_.clock->CPUMicros();
1555
5.33k
  sub_compact->compaction_job_stats.cpu_micros =
1556
5.33k
      cur_cpu_micros - prev_cpu_micros;
1557
5.33k
  RecordTick(stats_, COMPACTION_CPU_TOTAL_TIME,
1558
5.33k
             cur_cpu_micros - last_cpu_micros);
1559
1560
5.33k
  if (measure_io_stats_) {
1561
0
    sub_compact->compaction_job_stats.file_write_nanos +=
1562
0
        IOSTATS(write_nanos) - prev_write_nanos;
1563
0
    sub_compact->compaction_job_stats.file_fsync_nanos +=
1564
0
        IOSTATS(fsync_nanos) - prev_fsync_nanos;
1565
0
    sub_compact->compaction_job_stats.file_range_sync_nanos +=
1566
0
        IOSTATS(range_sync_nanos) - prev_range_sync_nanos;
1567
0
    sub_compact->compaction_job_stats.file_prepare_write_nanos +=
1568
0
        IOSTATS(prepare_write_nanos) - prev_prepare_write_nanos;
1569
0
    sub_compact->compaction_job_stats.cpu_micros -=
1570
0
        (IOSTATS(cpu_write_nanos) - prev_cpu_write_nanos +
1571
0
         IOSTATS(cpu_read_nanos) - prev_cpu_read_nanos) /
1572
0
        1000;
1573
0
    if (prev_perf_level != PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) {
1574
0
      SetPerfLevel(prev_perf_level);
1575
0
    }
1576
0
  }
1577
#ifdef ROCKSDB_ASSERT_STATUS_CHECKED
1578
  if (!status.ok()) {
1579
    if (c_iter) {
1580
      c_iter->status().PermitUncheckedError();
1581
    }
1582
    if (input) {
1583
      input->status().PermitUncheckedError();
1584
    }
1585
  }
1586
#endif  // ROCKSDB_ASSERT_STATUS_CHECKED
1587
1588
5.33k
  blob_counter.reset();
1589
5.33k
  clip.reset();
1590
5.33k
  raw_input.reset();
1591
5.33k
  sub_compact->status = status;
1592
5.33k
  NotifyOnSubcompactionCompleted(sub_compact);
1593
5.33k
}
1594
1595
0
uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const {
1596
0
  return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id;
1597
0
}
1598
1599
void CompactionJob::RecordDroppedKeys(
1600
    const CompactionIterationStats& c_iter_stats,
1601
7.01k
    CompactionJobStats* compaction_job_stats) {
1602
7.01k
  if (c_iter_stats.num_record_drop_user > 0) {
1603
0
    RecordTick(stats_, COMPACTION_KEY_DROP_USER,
1604
0
               c_iter_stats.num_record_drop_user);
1605
0
  }
1606
7.01k
  if (c_iter_stats.num_record_drop_hidden > 0) {
1607
2.04k
    RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY,
1608
2.04k
               c_iter_stats.num_record_drop_hidden);
1609
2.04k
    if (compaction_job_stats) {
1610
2.04k
      compaction_job_stats->num_records_replaced +=
1611
2.04k
          c_iter_stats.num_record_drop_hidden;
1612
2.04k
    }
1613
2.04k
  }
1614
7.01k
  if (c_iter_stats.num_record_drop_obsolete > 0) {
1615
1.55k
    RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE,
1616
1.55k
               c_iter_stats.num_record_drop_obsolete);
1617
1.55k
    if (compaction_job_stats) {
1618
1.55k
      compaction_job_stats->num_expired_deletion_records +=
1619
1.55k
          c_iter_stats.num_record_drop_obsolete;
1620
1.55k
    }
1621
1.55k
  }
1622
7.01k
  if (c_iter_stats.num_record_drop_range_del > 0) {
1623
0
    RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL,
1624
0
               c_iter_stats.num_record_drop_range_del);
1625
0
  }
1626
7.01k
  if (c_iter_stats.num_range_del_drop_obsolete > 0) {
1627
0
    RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE,
1628
0
               c_iter_stats.num_range_del_drop_obsolete);
1629
0
  }
1630
7.01k
  if (c_iter_stats.num_optimized_del_drop_obsolete > 0) {
1631
0
    RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE,
1632
0
               c_iter_stats.num_optimized_del_drop_obsolete);
1633
0
  }
1634
7.01k
}
1635
1636
Status CompactionJob::FinishCompactionOutputFile(
1637
    const Status& input_status, SubcompactionState* sub_compact,
1638
    CompactionOutputs& outputs, const Slice& next_table_min_key,
1639
2.33k
    const Slice* comp_start_user_key, const Slice* comp_end_user_key) {
1640
2.33k
  AutoThreadOperationStageUpdater stage_updater(
1641
2.33k
      ThreadStatus::STAGE_COMPACTION_SYNC_FILE);
1642
2.33k
  assert(sub_compact != nullptr);
1643
2.33k
  assert(outputs.HasBuilder());
1644
1645
2.33k
  FileMetaData* meta = outputs.GetMetaData();
1646
2.33k
  uint64_t output_number = meta->fd.GetNumber();
1647
2.33k
  assert(output_number != 0);
1648
1649
2.33k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1650
2.33k
  std::string file_checksum = kUnknownFileChecksum;
1651
2.33k
  std::string file_checksum_func_name = kUnknownFileChecksumFuncName;
1652
1653
  // Check for iterator errors
1654
2.33k
  Status s = input_status;
1655
1656
  // Add range tombstones
1657
2.33k
  if (s.ok()) {
1658
    // Inclusive lower bound, exclusive upper bound
1659
1.67k
    std::pair<SequenceNumber, SequenceNumber> keep_seqno_range{
1660
1.67k
        0, kMaxSequenceNumber};
1661
1.67k
    if (sub_compact->compaction->SupportsPerKeyPlacement()) {
1662
0
      if (outputs.IsProximalLevel()) {
1663
0
        keep_seqno_range.first = proximal_after_seqno_;
1664
0
      } else {
1665
0
        keep_seqno_range.second = proximal_after_seqno_;
1666
0
      }
1667
0
    }
1668
1.67k
    CompactionIterationStats range_del_out_stats;
1669
    // NOTE1: Use `bottommost_level_ = true` for both bottommost and
1670
    // output_to_proximal_level compaction here, as it's only used to decide
1671
    // if range dels could be dropped. (Logically, we are taking a single sorted
1672
    // run returned from CompactionIterator and physically splitting it between
1673
    // two output levels.)
1674
    // NOTE2: with per-key placement, range tombstones will be filtered on
1675
    // each output level based on sequence number (traversed twice). This is
1676
    // CPU-inefficient for a large number of range tombstones, but that would
1677
    // be an unusual work load.
1678
1.67k
    if (sub_compact->HasRangeDel()) {
1679
0
      s = outputs.AddRangeDels(*sub_compact->RangeDelAgg(), comp_start_user_key,
1680
0
                               comp_end_user_key, range_del_out_stats,
1681
0
                               bottommost_level_, cfd->internal_comparator(),
1682
0
                               earliest_snapshot_, keep_seqno_range,
1683
0
                               next_table_min_key, full_history_ts_low_);
1684
0
    }
1685
1.67k
    RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats);
1686
1.67k
    TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1");
1687
1.67k
  }
1688
1689
2.33k
  const uint64_t current_entries = outputs.NumEntries();
1690
1691
2.33k
  s = outputs.Finish(s, seqno_to_time_mapping_);
1692
2.33k
  TEST_SYNC_POINT_CALLBACK(
1693
2.33k
      "CompactionJob::FinishCompactionOutputFile()::AfterFinish", &s);
1694
1695
2.33k
  if (s.ok()) {
1696
    // With accurate smallest and largest key, we can get a slightly more
1697
    // accurate oldest ancester time.
1698
    // This makes oldest ancester time in manifest more accurate than in
1699
    // table properties. Not sure how to resolve it.
1700
1.67k
    if (meta->smallest.size() > 0 && meta->largest.size() > 0) {
1701
1.67k
      uint64_t refined_oldest_ancester_time;
1702
1.67k
      Slice new_smallest = meta->smallest.user_key();
1703
1.67k
      Slice new_largest = meta->largest.user_key();
1704
1.67k
      if (!new_largest.empty() && !new_smallest.empty()) {
1705
419
        refined_oldest_ancester_time =
1706
419
            sub_compact->compaction->MinInputFileOldestAncesterTime(
1707
419
                &(meta->smallest), &(meta->largest));
1708
419
        if (refined_oldest_ancester_time !=
1709
419
            std::numeric_limits<uint64_t>::max()) {
1710
115
          meta->oldest_ancester_time = refined_oldest_ancester_time;
1711
115
        }
1712
419
      }
1713
1.67k
    }
1714
1.67k
  }
1715
1716
  // Finish and check for file errors
1717
2.33k
  IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_,
1718
2.33k
                                          db_options_.use_fsync);
1719
1720
2.33k
  if (s.ok() && io_s.ok()) {
1721
1.67k
    file_checksum = meta->file_checksum;
1722
1.67k
    file_checksum_func_name = meta->file_checksum_func_name;
1723
1.67k
  }
1724
1725
2.33k
  if (s.ok()) {
1726
1.67k
    s = io_s;
1727
1.67k
  }
1728
2.33k
  if (sub_compact->io_status.ok()) {
1729
2.33k
    sub_compact->io_status = io_s;
1730
    // Since this error is really a copy of the
1731
    // "normal" status, it does not also need to be checked
1732
2.33k
    sub_compact->io_status.PermitUncheckedError();
1733
2.33k
  }
1734
1735
2.33k
  TableProperties tp;
1736
2.33k
  if (s.ok()) {
1737
1.67k
    tp = outputs.GetTableProperties();
1738
1.67k
  }
1739
1740
2.33k
  if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) {
1741
    // If there is nothing to output, no necessary to generate a sst file.
1742
    // This happens when the output level is bottom level, at the same time
1743
    // the sub_compact output nothing.
1744
0
    std::string fname =
1745
0
        TableFileName(sub_compact->compaction->immutable_options().cf_paths,
1746
0
                      meta->fd.GetNumber(), meta->fd.GetPathId());
1747
1748
    // TODO(AR) it is not clear if there are any larger implications if
1749
    // DeleteFile fails here
1750
0
    Status ds = env_->DeleteFile(fname);
1751
0
    if (!ds.ok()) {
1752
0
      ROCKS_LOG_WARN(
1753
0
          db_options_.info_log,
1754
0
          "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64
1755
0
          " at bottom level%s",
1756
0
          cfd->GetName().c_str(), job_id_, output_number,
1757
0
          meta->marked_for_compaction ? " (need compaction)" : "");
1758
0
    }
1759
1760
    // Also need to remove the file from outputs, or it will be added to the
1761
    // VersionEdit.
1762
0
    outputs.RemoveLastOutput();
1763
0
    meta = nullptr;
1764
0
  }
1765
1766
2.33k
  if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) {
1767
    // Output to event logger and fire events.
1768
1.67k
    outputs.UpdateTableProperties();
1769
1.67k
    ROCKS_LOG_INFO(db_options_.info_log,
1770
1.67k
                   "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64
1771
1.67k
                   " keys, %" PRIu64 " bytes%s, temperature: %s",
1772
1.67k
                   cfd->GetName().c_str(), job_id_, output_number,
1773
1.67k
                   current_entries, meta->fd.file_size,
1774
1.67k
                   meta->marked_for_compaction ? " (need compaction)" : "",
1775
1.67k
                   temperature_to_string[meta->temperature].c_str());
1776
1.67k
  }
1777
2.33k
  std::string fname;
1778
2.33k
  FileDescriptor output_fd;
1779
2.33k
  uint64_t oldest_blob_file_number = kInvalidBlobFileNumber;
1780
2.33k
  Status status_for_listener = s;
1781
2.33k
  if (meta != nullptr) {
1782
2.33k
    fname = GetTableFileName(meta->fd.GetNumber());
1783
2.33k
    output_fd = meta->fd;
1784
2.33k
    oldest_blob_file_number = meta->oldest_blob_file_number;
1785
2.33k
  } else {
1786
0
    fname = "(nil)";
1787
0
    if (s.ok()) {
1788
0
      status_for_listener = Status::Aborted("Empty SST file not kept");
1789
0
    }
1790
0
  }
1791
2.33k
  EventHelpers::LogAndNotifyTableFileCreationFinished(
1792
2.33k
      event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(), fname,
1793
2.33k
      job_id_, output_fd, oldest_blob_file_number, tp,
1794
2.33k
      TableFileCreationReason::kCompaction, status_for_listener, file_checksum,
1795
2.33k
      file_checksum_func_name);
1796
1797
  // Report new file to SstFileManagerImpl
1798
2.33k
  auto sfm =
1799
2.33k
      static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get());
1800
2.33k
  if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) {
1801
2.33k
    Status add_s = sfm->OnAddFile(fname);
1802
2.33k
    if (!add_s.ok() && s.ok()) {
1803
0
      s = add_s;
1804
0
    }
1805
2.33k
    if (sfm->IsMaxAllowedSpaceReached()) {
1806
      // TODO(ajkr): should we return OK() if max space was reached by the final
1807
      // compaction output file (similarly to how flush works when full)?
1808
0
      s = Status::SpaceLimit("Max allowed space was reached");
1809
0
      TEST_SYNC_POINT(
1810
0
          "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached");
1811
0
      InstrumentedMutexLock l(db_mutex_);
1812
0
      db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction);
1813
0
    }
1814
2.33k
  }
1815
1816
2.33k
  outputs.ResetBuilder();
1817
2.33k
  return s;
1818
2.33k
}
1819
1820
2.71k
Status CompactionJob::InstallCompactionResults(bool* compaction_released) {
1821
2.71k
  assert(compact_);
1822
1823
2.71k
  db_mutex_->AssertHeld();
1824
1825
2.71k
  const ReadOptions read_options(Env::IOActivity::kCompaction);
1826
2.71k
  const WriteOptions write_options(Env::IOActivity::kCompaction);
1827
1828
2.71k
  auto* compaction = compact_->compaction;
1829
2.71k
  assert(compaction);
1830
1831
2.71k
  {
1832
2.71k
    Compaction::InputLevelSummaryBuffer inputs_summary;
1833
2.71k
    if (internal_stats_.has_proximal_level_output) {
1834
0
      ROCKS_LOG_BUFFER(
1835
0
          log_buffer_,
1836
0
          "[%s] [JOB %d] Compacted %s => output_to_proximal_level: %" PRIu64
1837
0
          " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes",
1838
0
          compaction->column_family_data()->GetName().c_str(), job_id_,
1839
0
          compaction->InputLevelSummary(&inputs_summary),
1840
0
          internal_stats_.proximal_level_stats.bytes_written,
1841
0
          internal_stats_.output_level_stats.bytes_written,
1842
0
          internal_stats_.TotalBytesWritten());
1843
2.71k
    } else {
1844
2.71k
      ROCKS_LOG_BUFFER(log_buffer_,
1845
2.71k
                       "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes",
1846
2.71k
                       compaction->column_family_data()->GetName().c_str(),
1847
2.71k
                       job_id_, compaction->InputLevelSummary(&inputs_summary),
1848
2.71k
                       internal_stats_.TotalBytesWritten());
1849
2.71k
    }
1850
2.71k
  }
1851
1852
2.71k
  VersionEdit* const edit = compaction->edit();
1853
2.71k
  assert(edit);
1854
1855
  // Add compaction inputs
1856
2.71k
  compaction->AddInputDeletions(edit);
1857
1858
2.71k
  std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage;
1859
1860
2.71k
  for (const auto& sub_compact : compact_->sub_compact_states) {
1861
2.71k
    sub_compact.AddOutputsEdit(edit);
1862
1863
2.71k
    for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) {
1864
0
      edit->AddBlobFile(blob);
1865
0
    }
1866
1867
2.71k
    if (sub_compact.Current().GetBlobGarbageMeter()) {
1868
0
      const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows();
1869
1870
0
      for (const auto& pair : flows) {
1871
0
        const uint64_t blob_file_number = pair.first;
1872
0
        const BlobGarbageMeter::BlobInOutFlow& flow = pair.second;
1873
1874
0
        assert(flow.IsValid());
1875
0
        if (flow.HasGarbage()) {
1876
0
          blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(),
1877
0
                                                   flow.GetGarbageBytes());
1878
0
        }
1879
0
      }
1880
0
    }
1881
2.71k
  }
1882
1883
2.71k
  for (const auto& pair : blob_total_garbage) {
1884
0
    const uint64_t blob_file_number = pair.first;
1885
0
    const BlobGarbageMeter::BlobStats& stats = pair.second;
1886
1887
0
    edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(),
1888
0
                             stats.GetBytes());
1889
0
  }
1890
1891
2.71k
  if ((compaction->compaction_reason() ==
1892
2.71k
           CompactionReason::kLevelMaxLevelSize ||
1893
2.71k
       compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) &&
1894
2.71k
      compaction->immutable_options().compaction_pri == kRoundRobin) {
1895
0
    int start_level = compaction->start_level();
1896
0
    if (start_level > 0) {
1897
0
      auto vstorage = compaction->input_version()->storage_info();
1898
0
      edit->AddCompactCursor(start_level,
1899
0
                             vstorage->GetNextCompactCursor(
1900
0
                                 start_level, compaction->num_input_files(0)));
1901
0
    }
1902
0
  }
1903
1904
2.71k
  auto manifest_wcb = [&compaction, &compaction_released](const Status& s) {
1905
2.71k
    compaction->ReleaseCompactionFiles(s);
1906
2.71k
    *compaction_released = true;
1907
2.71k
  };
1908
1909
2.71k
  return versions_->LogAndApply(compaction->column_family_data(), read_options,
1910
2.71k
                                write_options, edit, db_mutex_, db_directory_,
1911
2.71k
                                /*new_descriptor_log=*/false,
1912
2.71k
                                /*column_family_options=*/nullptr,
1913
2.71k
                                manifest_wcb);
1914
2.71k
}
1915
1916
10.6k
void CompactionJob::RecordCompactionIOStats() {
1917
10.6k
  RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read));
1918
10.6k
  RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written));
1919
10.6k
  CompactionReason compaction_reason =
1920
10.6k
      compact_->compaction->compaction_reason();
1921
10.6k
  if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) {
1922
0
    RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read));
1923
0
    RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written));
1924
10.6k
  } else if (compaction_reason == CompactionReason::kPeriodicCompaction) {
1925
0
    RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read));
1926
0
    RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written));
1927
10.6k
  } else if (compaction_reason == CompactionReason::kTtl) {
1928
0
    RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read));
1929
0
    RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written));
1930
0
  }
1931
10.6k
  ThreadStatusUtil::IncreaseThreadOperationProperty(
1932
10.6k
      ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read));
1933
10.6k
  IOSTATS_RESET(bytes_read);
1934
10.6k
  ThreadStatusUtil::IncreaseThreadOperationProperty(
1935
10.6k
      ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written));
1936
10.6k
  IOSTATS_RESET(bytes_written);
1937
10.6k
}
1938
1939
Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact,
1940
2.33k
                                               CompactionOutputs& outputs) {
1941
2.33k
  assert(sub_compact != nullptr);
1942
1943
  // no need to lock because VersionSet::next_file_number_ is atomic
1944
2.33k
  uint64_t file_number = versions_->NewFileNumber();
1945
2.33k
  std::string fname = GetTableFileName(file_number);
1946
  // Fire events.
1947
2.33k
  ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
1948
2.33k
  EventHelpers::NotifyTableFileCreationStarted(
1949
2.33k
      cfd->ioptions().listeners, dbname_, cfd->GetName(), fname, job_id_,
1950
2.33k
      TableFileCreationReason::kCompaction);
1951
  // Make the output file
1952
2.33k
  std::unique_ptr<FSWritableFile> writable_file;
1953
#ifndef NDEBUG
1954
  bool syncpoint_arg = file_options_.use_direct_writes;
1955
  TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile",
1956
                           &syncpoint_arg);
1957
#endif
1958
1959
  // Pass temperature of the last level files to FileSystem.
1960
2.33k
  FileOptions fo_copy = file_options_;
1961
2.33k
  Temperature temperature = sub_compact->compaction->output_temperature();
1962
2.33k
  Temperature last_level_temp =
1963
2.33k
      sub_compact->compaction->mutable_cf_options().last_level_temperature;
1964
  // Here last_level_temperature supersedes default_write_temperature, when
1965
  // enabled and applicable
1966
2.33k
  if (last_level_temp != Temperature::kUnknown &&
1967
2.33k
      sub_compact->compaction->is_last_level() && !outputs.IsProximalLevel()) {
1968
0
    temperature = last_level_temp;
1969
0
  }
1970
2.33k
  fo_copy.temperature = temperature;
1971
2.33k
  fo_copy.write_hint = write_hint_;
1972
1973
2.33k
  Status s;
1974
2.33k
  IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy);
1975
2.33k
  s = io_s;
1976
2.33k
  if (sub_compact->io_status.ok()) {
1977
2.33k
    sub_compact->io_status = io_s;
1978
    // Since this error is really a copy of the io_s that is checked below as s,
1979
    // it does not also need to be checked.
1980
2.33k
    sub_compact->io_status.PermitUncheckedError();
1981
2.33k
  }
1982
2.33k
  if (!s.ok()) {
1983
0
    ROCKS_LOG_ERROR(
1984
0
        db_options_.info_log,
1985
0
        "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64
1986
0
        " fails at NewWritableFile with status %s",
1987
0
        sub_compact->compaction->column_family_data()->GetName().c_str(),
1988
0
        job_id_, file_number, s.ToString().c_str());
1989
0
    LogFlush(db_options_.info_log);
1990
0
    EventHelpers::LogAndNotifyTableFileCreationFinished(
1991
0
        event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(),
1992
0
        fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber,
1993
0
        TableProperties(), TableFileCreationReason::kCompaction, s,
1994
0
        kUnknownFileChecksum, kUnknownFileChecksumFuncName);
1995
0
    return s;
1996
0
  }
1997
1998
  // Try to figure out the output file's oldest ancester time.
1999
2.33k
  int64_t temp_current_time = 0;
2000
2.33k
  auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
2001
  // Safe to proceed even if GetCurrentTime fails. So, log and proceed.
2002
2.33k
  if (!get_time_status.ok()) {
2003
0
    ROCKS_LOG_WARN(db_options_.info_log,
2004
0
                   "Failed to get current time. Status: %s",
2005
0
                   get_time_status.ToString().c_str());
2006
0
  }
2007
2.33k
  uint64_t current_time = static_cast<uint64_t>(temp_current_time);
2008
2.33k
  InternalKey tmp_start, tmp_end;
2009
2.33k
  if (sub_compact->start.has_value()) {
2010
0
    tmp_start.SetMinPossibleForUserKey(*(sub_compact->start));
2011
0
  }
2012
2.33k
  if (sub_compact->end.has_value()) {
2013
0
    tmp_end.SetMinPossibleForUserKey(*(sub_compact->end));
2014
0
  }
2015
2.33k
  uint64_t oldest_ancester_time =
2016
2.33k
      sub_compact->compaction->MinInputFileOldestAncesterTime(
2017
2.33k
          sub_compact->start.has_value() ? &tmp_start : nullptr,
2018
2.33k
          sub_compact->end.has_value() ? &tmp_end : nullptr);
2019
2.33k
  if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) {
2020
    // TODO: fix DBSSTTest.GetTotalSstFilesSize and use
2021
    //  kUnknownOldestAncesterTime
2022
0
    oldest_ancester_time = current_time;
2023
0
  }
2024
2025
2.33k
  uint64_t newest_key_time = sub_compact->compaction->MaxInputFileNewestKeyTime(
2026
2.33k
      sub_compact->start.has_value() ? &tmp_start : nullptr,
2027
2.33k
      sub_compact->end.has_value() ? &tmp_end : nullptr);
2028
2029
  // Initialize a SubcompactionState::Output and add it to sub_compact->outputs
2030
2.33k
  uint64_t epoch_number = sub_compact->compaction->MinInputFileEpochNumber();
2031
2.33k
  {
2032
2.33k
    FileMetaData meta;
2033
2.33k
    meta.fd = FileDescriptor(file_number,
2034
2.33k
                             sub_compact->compaction->output_path_id(), 0);
2035
2.33k
    meta.oldest_ancester_time = oldest_ancester_time;
2036
2.33k
    meta.file_creation_time = current_time;
2037
2.33k
    meta.epoch_number = epoch_number;
2038
2.33k
    meta.temperature = temperature;
2039
2.33k
    assert(!db_id_.empty());
2040
2.33k
    assert(!db_session_id_.empty());
2041
2.33k
    s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(),
2042
2.33k
                               &meta.unique_id);
2043
2.33k
    if (!s.ok()) {
2044
0
      ROCKS_LOG_ERROR(db_options_.info_log,
2045
0
                      "[%s] [JOB %d] file #%" PRIu64
2046
0
                      " failed to generate unique id: %s.",
2047
0
                      cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(),
2048
0
                      s.ToString().c_str());
2049
0
      return s;
2050
0
    }
2051
2052
2.33k
    outputs.AddOutput(std::move(meta), cfd->internal_comparator(),
2053
2.33k
                      paranoid_file_checks_);
2054
2.33k
  }
2055
2056
0
  writable_file->SetIOPriority(GetRateLimiterPriority());
2057
  // Subsequent attempts to override the hint via SetWriteLifeTimeHint
2058
  // with the very same value will be ignored by the fs.
2059
2.33k
  writable_file->SetWriteLifeTimeHint(fo_copy.write_hint);
2060
2.33k
  FileTypeSet tmp_set = db_options_.checksum_handoff_file_types;
2061
2.33k
  writable_file->SetPreallocationBlockSize(static_cast<size_t>(
2062
2.33k
      sub_compact->compaction->OutputFilePreallocationSize()));
2063
2.33k
  const auto& listeners =
2064
2.33k
      sub_compact->compaction->immutable_options().listeners;
2065
2.33k
  outputs.AssignFileWriter(new WritableFileWriter(
2066
2.33k
      std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_,
2067
2.33k
      db_options_.stats, Histograms::SST_WRITE_MICROS, listeners,
2068
2.33k
      db_options_.file_checksum_gen_factory.get(),
2069
2.33k
      tmp_set.Contains(FileType::kTableFile), false));
2070
2071
  // TODO(hx235): pass in the correct `oldest_key_time` instead of `0`
2072
2.33k
  const ReadOptions read_options(Env::IOActivity::kCompaction);
2073
2.33k
  const WriteOptions write_options(Env::IOActivity::kCompaction);
2074
2.33k
  TableBuilderOptions tboptions(
2075
2.33k
      cfd->ioptions(), sub_compact->compaction->mutable_cf_options(),
2076
2.33k
      read_options, write_options, cfd->internal_comparator(),
2077
2.33k
      cfd->internal_tbl_prop_coll_factories(),
2078
2.33k
      sub_compact->compaction->output_compression(),
2079
2.33k
      sub_compact->compaction->output_compression_opts(), cfd->GetID(),
2080
2.33k
      cfd->GetName(), sub_compact->compaction->output_level(), newest_key_time,
2081
2.33k
      bottommost_level_, TableFileCreationReason::kCompaction,
2082
2.33k
      0 /* oldest_key_time */, current_time, db_id_, db_session_id_,
2083
2.33k
      sub_compact->compaction->max_output_file_size(), file_number,
2084
2.33k
      proximal_after_seqno_ /*last_level_inclusive_max_seqno_threshold*/);
2085
2086
2.33k
  outputs.NewBuilder(tboptions);
2087
2088
2.33k
  LogFlush(db_options_.info_log);
2089
2.33k
  return s;
2090
2.33k
}
2091
2092
5.33k
void CompactionJob::CleanupCompaction() {
2093
5.33k
  for (SubcompactionState& sub_compact : compact_->sub_compact_states) {
2094
5.33k
    sub_compact.Cleanup(table_cache_.get());
2095
5.33k
  }
2096
5.33k
  delete compact_;
2097
5.33k
  compact_ = nullptr;
2098
5.33k
}
2099
2100
namespace {
2101
4.67k
void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) {
2102
4.67k
  assert(prefix_length > 0);
2103
4.67k
  size_t length = src.size() > prefix_length ? prefix_length : src.size();
2104
4.67k
  dst->assign(src.data(), length);
2105
4.67k
}
2106
}  // namespace
2107
2108
bool CompactionJob::BuildStatsFromInputTableProperties(
2109
5.33k
    uint64_t* num_input_range_del) {
2110
5.33k
  assert(compact_);
2111
2112
5.33k
  Compaction* compaction = compact_->compaction;
2113
5.33k
  internal_stats_.output_level_stats.num_input_files_in_non_output_levels = 0;
2114
5.33k
  internal_stats_.output_level_stats.num_input_files_in_output_level = 0;
2115
2116
5.33k
  bool has_error = false;
2117
5.33k
  const ReadOptions read_options(Env::IOActivity::kCompaction);
2118
5.33k
  const auto& input_table_properties = compaction->GetInputTableProperties();
2119
5.33k
  for (int input_level = 0;
2120
13.8k
       input_level < static_cast<int>(compaction->num_input_levels());
2121
8.55k
       ++input_level) {
2122
8.55k
    const LevelFilesBrief* flevel = compaction->input_levels(input_level);
2123
8.55k
    size_t num_input_files = flevel->num_files;
2124
8.55k
    uint64_t* bytes_read;
2125
8.55k
    if (compaction->level(input_level) != compaction->output_level()) {
2126
3.65k
      internal_stats_.output_level_stats.num_input_files_in_non_output_levels +=
2127
3.65k
          static_cast<int>(num_input_files);
2128
3.65k
      bytes_read =
2129
3.65k
          &internal_stats_.output_level_stats.bytes_read_non_output_levels;
2130
4.89k
    } else {
2131
4.89k
      internal_stats_.output_level_stats.num_input_files_in_output_level +=
2132
4.89k
          static_cast<int>(num_input_files);
2133
4.89k
      bytes_read = &internal_stats_.output_level_stats.bytes_read_output_level;
2134
4.89k
    }
2135
36.0k
    for (size_t i = 0; i < num_input_files; ++i) {
2136
27.4k
      const FileMetaData* file_meta = flevel->files[i].file_metadata;
2137
27.4k
      *bytes_read += file_meta->fd.GetFileSize();
2138
27.4k
      uint64_t file_input_entries = file_meta->num_entries;
2139
27.4k
      uint64_t file_num_range_del = file_meta->num_range_deletions;
2140
27.4k
      if (file_input_entries == 0) {
2141
0
        uint64_t file_number = file_meta->fd.GetNumber();
2142
        // Try getting info from table property
2143
0
        std::string fn = TableFileName(compaction->immutable_options().cf_paths,
2144
0
                                       file_number, file_meta->fd.GetPathId());
2145
0
        const auto& tp = input_table_properties.find(fn);
2146
0
        if (tp != input_table_properties.end()) {
2147
0
          file_input_entries = tp->second->num_entries;
2148
0
          file_num_range_del = tp->second->num_range_deletions;
2149
0
        } else {
2150
0
          has_error = true;
2151
0
        }
2152
0
      }
2153
27.4k
      internal_stats_.output_level_stats.num_input_records +=
2154
27.4k
          file_input_entries;
2155
27.4k
      if (num_input_range_del) {
2156
27.4k
        *num_input_range_del += file_num_range_del;
2157
27.4k
      }
2158
27.4k
    }
2159
2160
8.55k
    const std::vector<FileMetaData*>& filtered_flevel =
2161
8.55k
        compaction->filtered_input_levels(input_level);
2162
8.55k
    size_t num_filtered_input_files = filtered_flevel.size();
2163
8.55k
    uint64_t* bytes_skipped;
2164
8.55k
    if (compaction->level(input_level) != compaction->output_level()) {
2165
3.65k
      internal_stats_.output_level_stats
2166
3.65k
          .num_filtered_input_files_in_non_output_levels +=
2167
3.65k
          static_cast<int>(num_filtered_input_files);
2168
3.65k
      bytes_skipped =
2169
3.65k
          &internal_stats_.output_level_stats.bytes_skipped_non_output_levels;
2170
4.89k
    } else {
2171
4.89k
      internal_stats_.output_level_stats
2172
4.89k
          .num_filtered_input_files_in_output_level +=
2173
4.89k
          static_cast<int>(num_filtered_input_files);
2174
4.89k
      bytes_skipped =
2175
4.89k
          &internal_stats_.output_level_stats.bytes_skipped_output_level;
2176
4.89k
    }
2177
8.55k
    for (const FileMetaData* filtered_file_meta : filtered_flevel) {
2178
0
      *bytes_skipped += filtered_file_meta->fd.GetFileSize();
2179
0
    }
2180
8.55k
  }
2181
2182
  // TODO - find a better place to set these two
2183
5.33k
  assert(job_stats_);
2184
5.33k
  internal_stats_.output_level_stats.bytes_read_blob =
2185
5.33k
      job_stats_->total_blob_bytes_read;
2186
5.33k
  internal_stats_.output_level_stats.num_dropped_records =
2187
5.33k
      internal_stats_.DroppedRecords();
2188
5.33k
  return !has_error;
2189
5.33k
}
2190
2191
void CompactionJob::UpdateCompactionJobInputStats(
2192
    const InternalStats::CompactionStatsFull& internal_stats,
2193
2.71k
    uint64_t num_input_range_del) const {
2194
2.71k
  assert(job_stats_);
2195
  // input information
2196
2.71k
  job_stats_->total_input_bytes =
2197
2.71k
      internal_stats.output_level_stats.bytes_read_non_output_levels +
2198
2.71k
      internal_stats.output_level_stats.bytes_read_output_level;
2199
2.71k
  job_stats_->num_input_records =
2200
2.71k
      internal_stats.output_level_stats.num_input_records - num_input_range_del;
2201
2.71k
  job_stats_->num_input_files =
2202
2.71k
      internal_stats.output_level_stats.num_input_files_in_non_output_levels +
2203
2.71k
      internal_stats.output_level_stats.num_input_files_in_output_level;
2204
2.71k
  job_stats_->num_input_files_at_output_level =
2205
2.71k
      internal_stats.output_level_stats.num_input_files_in_output_level;
2206
2.71k
  job_stats_->num_filtered_input_files =
2207
2.71k
      internal_stats.output_level_stats
2208
2.71k
          .num_filtered_input_files_in_non_output_levels +
2209
2.71k
      internal_stats.output_level_stats
2210
2.71k
          .num_filtered_input_files_in_output_level;
2211
2.71k
  job_stats_->num_filtered_input_files_at_output_level =
2212
2.71k
      internal_stats.output_level_stats
2213
2.71k
          .num_filtered_input_files_in_output_level;
2214
2.71k
  job_stats_->total_skipped_input_bytes =
2215
2.71k
      internal_stats.output_level_stats.bytes_skipped_non_output_levels +
2216
2.71k
      internal_stats.output_level_stats.bytes_skipped_output_level;
2217
2218
2.71k
  if (internal_stats.has_proximal_level_output) {
2219
0
    job_stats_->total_input_bytes +=
2220
0
        internal_stats.proximal_level_stats.bytes_read_non_output_levels +
2221
0
        internal_stats.proximal_level_stats.bytes_read_output_level;
2222
0
    job_stats_->num_input_records +=
2223
0
        internal_stats.proximal_level_stats.num_input_records;
2224
0
    job_stats_->num_input_files +=
2225
0
        internal_stats.proximal_level_stats
2226
0
            .num_input_files_in_non_output_levels +
2227
0
        internal_stats.proximal_level_stats.num_input_files_in_output_level;
2228
0
    job_stats_->num_input_files_at_output_level +=
2229
0
        internal_stats.proximal_level_stats.num_input_files_in_output_level;
2230
0
    job_stats_->num_filtered_input_files +=
2231
0
        internal_stats.proximal_level_stats
2232
0
            .num_filtered_input_files_in_non_output_levels +
2233
0
        internal_stats.proximal_level_stats
2234
0
            .num_filtered_input_files_in_output_level;
2235
0
    job_stats_->num_filtered_input_files_at_output_level +=
2236
0
        internal_stats.proximal_level_stats
2237
0
            .num_filtered_input_files_in_output_level;
2238
0
    job_stats_->total_skipped_input_bytes +=
2239
0
        internal_stats.proximal_level_stats.bytes_skipped_non_output_levels +
2240
0
        internal_stats.proximal_level_stats.bytes_skipped_output_level;
2241
0
  }
2242
2.71k
}
2243
2244
void CompactionJob::UpdateCompactionJobOutputStats(
2245
5.33k
    const InternalStats::CompactionStatsFull& internal_stats) const {
2246
5.33k
  assert(job_stats_);
2247
5.33k
  job_stats_->elapsed_micros = internal_stats.output_level_stats.micros;
2248
5.33k
  job_stats_->cpu_micros = internal_stats.output_level_stats.cpu_micros;
2249
2250
  // output information
2251
5.33k
  job_stats_->total_output_bytes =
2252
5.33k
      internal_stats.output_level_stats.bytes_written;
2253
5.33k
  job_stats_->total_output_bytes_blob =
2254
5.33k
      internal_stats.output_level_stats.bytes_written_blob;
2255
5.33k
  job_stats_->num_output_records =
2256
5.33k
      internal_stats.output_level_stats.num_output_records;
2257
5.33k
  job_stats_->num_output_files =
2258
5.33k
      internal_stats.output_level_stats.num_output_files;
2259
5.33k
  job_stats_->num_output_files_blob =
2260
5.33k
      internal_stats.output_level_stats.num_output_files_blob;
2261
2262
5.33k
  if (internal_stats.has_proximal_level_output) {
2263
0
    job_stats_->total_output_bytes +=
2264
0
        internal_stats.proximal_level_stats.bytes_written;
2265
0
    job_stats_->total_output_bytes_blob +=
2266
0
        internal_stats.proximal_level_stats.bytes_written_blob;
2267
0
    job_stats_->num_output_records +=
2268
0
        internal_stats.proximal_level_stats.num_output_records;
2269
0
    job_stats_->num_output_files +=
2270
0
        internal_stats.proximal_level_stats.num_output_files;
2271
0
    job_stats_->num_output_files_blob +=
2272
0
        internal_stats.proximal_level_stats.num_output_files_blob;
2273
0
  }
2274
2275
5.33k
  if (job_stats_->num_output_files > 0) {
2276
2.33k
    CopyPrefix(compact_->SmallestUserKey(),
2277
2.33k
               CompactionJobStats::kMaxPrefixLength,
2278
2.33k
               &job_stats_->smallest_output_key_prefix);
2279
2.33k
    CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength,
2280
2.33k
               &job_stats_->largest_output_key_prefix);
2281
2.33k
  }
2282
5.33k
}
2283
2284
5.33k
void CompactionJob::LogCompaction() {
2285
5.33k
  Compaction* compaction = compact_->compaction;
2286
5.33k
  ColumnFamilyData* cfd = compaction->column_family_data();
2287
  // Let's check if anything will get logged. Don't prepare all the info if
2288
  // we're not logging
2289
5.33k
  if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) {
2290
5.33k
    Compaction::InputLevelSummaryBuffer inputs_summary;
2291
5.33k
    ROCKS_LOG_INFO(
2292
5.33k
        db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f",
2293
5.33k
        cfd->GetName().c_str(), job_id_,
2294
5.33k
        compaction->InputLevelSummary(&inputs_summary), compaction->score());
2295
5.33k
    char scratch[2345];
2296
5.33k
    compaction->Summary(scratch, sizeof(scratch));
2297
5.33k
    ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n",
2298
5.33k
                   cfd->GetName().c_str(), scratch);
2299
    // build event logger report
2300
5.33k
    auto stream = event_logger_->Log();
2301
5.33k
    stream << "job" << job_id_ << "event" << "compaction_started" << "cf_name"
2302
5.33k
           << cfd->GetName() << "compaction_reason"
2303
5.33k
           << GetCompactionReasonString(compaction->compaction_reason());
2304
13.8k
    for (size_t i = 0; i < compaction->num_input_levels(); ++i) {
2305
8.55k
      stream << ("files_L" + std::to_string(compaction->level(i)));
2306
8.55k
      stream.StartArray();
2307
27.4k
      for (auto f : *compaction->inputs(i)) {
2308
27.4k
        stream << f->fd.GetNumber();
2309
27.4k
      }
2310
8.55k
      stream.EndArray();
2311
8.55k
    }
2312
5.33k
    stream << "score" << compaction->score() << "input_data_size"
2313
5.33k
           << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno"
2314
5.33k
           << (job_context_->snapshot_seqs.empty()
2315
5.33k
                   ? int64_t{-1}  // Use -1 for "none"
2316
5.33k
                   : static_cast<int64_t>(
2317
12
                         job_context_->GetEarliestSnapshotSequence()));
2318
5.33k
    if (compaction->SupportsPerKeyPlacement()) {
2319
0
      stream << "proximal_after_seqno" << proximal_after_seqno_;
2320
0
      stream << "preserve_seqno_after" << preserve_seqno_after_;
2321
0
      stream << "proximal_output_level" << compaction->GetProximalLevel();
2322
0
      stream << "proximal_output_range"
2323
0
             << GetCompactionProximalOutputRangeTypeString(
2324
0
                    compaction->GetProximalOutputRangeType());
2325
2326
0
      if (compaction->GetProximalOutputRangeType() ==
2327
0
          Compaction::ProximalOutputRangeType::kDisabled) {
2328
0
        ROCKS_LOG_WARN(
2329
0
            db_options_.info_log,
2330
0
            "[%s] [JOB %d] Proximal level output is disabled, likely "
2331
0
            "because of the range conflict in the proximal level",
2332
0
            cfd->GetName().c_str(), job_id_);
2333
0
      }
2334
0
    }
2335
5.33k
  }
2336
5.33k
}
2337
2338
4.67k
std::string CompactionJob::GetTableFileName(uint64_t file_number) {
2339
4.67k
  return TableFileName(compact_->compaction->immutable_options().cf_paths,
2340
4.67k
                       file_number, compact_->compaction->output_path_id());
2341
4.67k
}
2342
2343
9.34k
Env::IOPriority CompactionJob::GetRateLimiterPriority() {
2344
9.34k
  if (versions_ && versions_->GetColumnFamilySet() &&
2345
9.34k
      versions_->GetColumnFamilySet()->write_controller()) {
2346
9.34k
    WriteController* write_controller =
2347
9.34k
        versions_->GetColumnFamilySet()->write_controller();
2348
9.34k
    if (write_controller->NeedsDelay() || write_controller->IsStopped()) {
2349
85
      return Env::IO_USER;
2350
85
    }
2351
9.34k
  }
2352
2353
9.26k
  return Env::IO_LOW;
2354
9.34k
}
2355
2356
Status CompactionJob::VerifyInputRecordCount(
2357
2.71k
    uint64_t num_input_range_del) const {
2358
2.71k
  size_t ts_sz = compact_->compaction->column_family_data()
2359
2.71k
                     ->user_comparator()
2360
2.71k
                     ->timestamp_size();
2361
  // When trim_ts_ is non-empty, CompactionIterator takes
2362
  // HistoryTrimmingIterator as input iterator and sees a trimmed view of
2363
  // input keys. So the number of keys it processed is not suitable for
2364
  // verification here.
2365
  // TODO: support verification when trim_ts_ is non-empty.
2366
2.71k
  if (!(ts_sz > 0 && !trim_ts_.empty())) {
2367
2.71k
    assert(internal_stats_.output_level_stats.num_input_records > 0);
2368
    // TODO: verify the number of range deletion entries.
2369
2.71k
    uint64_t expected = internal_stats_.output_level_stats.num_input_records -
2370
2.71k
                        num_input_range_del;
2371
2.71k
    uint64_t actual = job_stats_->num_input_records;
2372
2.71k
    if (expected != actual) {
2373
0
      char scratch[2345];
2374
0
      compact_->compaction->Summary(scratch, sizeof(scratch));
2375
0
      std::string msg =
2376
0
          "Compaction number of input keys does not match "
2377
0
          "number of keys processed. Expected " +
2378
0
          std::to_string(expected) + " but processed " +
2379
0
          std::to_string(actual) + ". Compaction summary: " + scratch;
2380
0
      if (db_options_.compaction_verify_record_count) {
2381
0
        return Status::Corruption(msg);
2382
0
      }
2383
0
    }
2384
2.71k
  }
2385
2.71k
  return Status::OK();
2386
2.71k
}
2387
2388
}  // namespace ROCKSDB_NAMESPACE