Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_picker.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/compaction/compaction_picker.h"
11
12
#include <cinttypes>
13
#include <limits>
14
#include <queue>
15
#include <string>
16
#include <utility>
17
#include <vector>
18
19
#include "db/column_family.h"
20
#include "file/filename.h"
21
#include "logging/log_buffer.h"
22
#include "logging/logging.h"
23
#include "monitoring/statistics_impl.h"
24
#include "test_util/sync_point.h"
25
#include "util/random.h"
26
#include "util/string_util.h"
27
28
namespace ROCKSDB_NAMESPACE {
29
30
#ifndef NDEBUG
31
static void AssertCleanCut(const InternalKeyComparator* icmp,
32
                           VersionStorageInfo* vstorage,
33
                           CompactionInputFiles* inputs, int level,
34
                           Logger* logger) {
35
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(level);
36
  if (inputs->files.empty() || level_files.empty()) {
37
    return;
38
  }
39
40
  const Comparator* ucmp = icmp->user_comparator();
41
42
  // Find first and last input file indices in level
43
  int first_input_idx = -1;
44
  int last_input_idx = -1;
45
  for (size_t i = 0; i < level_files.size(); i++) {
46
    if (level_files[i] == inputs->files.front()) {
47
      first_input_idx = static_cast<int>(i);
48
    }
49
    if (level_files[i] == inputs->files.back()) {
50
      last_input_idx = static_cast<int>(i);
51
    }
52
  }
53
54
  // Check file before first input
55
  if (first_input_idx > 0) {
56
    const FileMetaData* prev_file = level_files[first_input_idx - 1];
57
    const FileMetaData* first_file = inputs->files.front();
58
    int cmp = sstableKeyCompare(ucmp, prev_file->largest, first_file->smallest);
59
    if (cmp == 0) {
60
      ROCKS_LOG_ERROR(logger,
61
                      "Clean cut violated: L%d unselected file %" PRIu64
62
                      " adjacent to first selected file %" PRIu64,
63
                      level, prev_file->fd.GetNumber(),
64
                      first_file->fd.GetNumber());
65
      assert(false);
66
    }
67
  }
68
69
  // Check file after last input
70
  if (last_input_idx >= 0 &&
71
      static_cast<size_t>(last_input_idx) < level_files.size() - 1) {
72
    const FileMetaData* last_file = inputs->files.back();
73
    const FileMetaData* next_file = level_files[last_input_idx + 1];
74
    int cmp = sstableKeyCompare(ucmp, last_file->largest, next_file->smallest);
75
    if (cmp == 0) {
76
      ROCKS_LOG_ERROR(logger,
77
                      "Clean cut violated: L%d unselected file %" PRIu64
78
                      " adjacent to last selected file %" PRIu64,
79
                      level, next_file->fd.GetNumber(),
80
                      last_file->fd.GetNumber());
81
      assert(false);
82
    }
83
  }
84
}
85
#endif  // NDEBUG
86
87
bool PickCostBasedIntraL0Compaction(
88
    const std::vector<FileMetaData*>& level_files, size_t min_files_to_compact,
89
    uint64_t max_compact_bytes_per_del_file, uint64_t max_compaction_bytes,
90
0
    CompactionInputFiles* comp_inputs) {
91
0
  TEST_SYNC_POINT("PickCostBasedIntraL0Compaction");
92
93
0
  size_t start = 0;
94
95
0
  if (level_files.size() == 0 || level_files[start]->being_compacted) {
96
0
    return false;
97
0
  }
98
99
0
  size_t compact_bytes = static_cast<size_t>(level_files[start]->fd.file_size);
100
0
  size_t compact_bytes_per_del_file = std::numeric_limits<size_t>::max();
101
  // Compaction range will be [start, limit).
102
0
  size_t limit;
103
  // Pull in files until the amount of compaction work per deleted file begins
104
  // increasing or maximum total compaction size is reached.
105
0
  size_t new_compact_bytes_per_del_file = 0;
106
0
  for (limit = start + 1; limit < level_files.size(); ++limit) {
107
0
    compact_bytes += static_cast<size_t>(level_files[limit]->fd.file_size);
108
0
    new_compact_bytes_per_del_file = compact_bytes / (limit - start);
109
0
    if (level_files[limit]->being_compacted ||
110
0
        new_compact_bytes_per_del_file > compact_bytes_per_del_file ||
111
0
        compact_bytes > max_compaction_bytes) {
112
0
      break;
113
0
    }
114
0
    compact_bytes_per_del_file = new_compact_bytes_per_del_file;
115
0
  }
116
117
0
  if ((limit - start) >= min_files_to_compact &&
118
0
      compact_bytes_per_del_file < max_compact_bytes_per_del_file) {
119
0
    assert(comp_inputs != nullptr);
120
0
    comp_inputs->level = 0;
121
0
    for (size_t i = start; i < limit; ++i) {
122
0
      comp_inputs->files.push_back(level_files[i]);
123
0
    }
124
0
    return true;
125
0
  }
126
0
  return false;
127
0
}
128
129
// Determine compression type, based on user options, level of the output
130
// file and whether compression is disabled.
131
// If enable_compression is false, then compression is always disabled no
132
// matter what the values of the other two parameters are.
133
// Otherwise, the compression type is determined based on options and level.
134
CompressionType GetCompressionType(const VersionStorageInfo* vstorage,
135
                                   const MutableCFOptions& mutable_cf_options,
136
                                   int level, int base_level,
137
11.4k
                                   const bool enable_compression) {
138
11.4k
  if (!enable_compression) {
139
    // disable compression
140
0
    return kNoCompression;
141
0
  }
142
143
  // If bottommost_compression is set and we are compacting to the
144
  // bottommost level then we should use it.
145
11.4k
  if (mutable_cf_options.bottommost_compression != kDisableCompressionOption &&
146
0
      level >= (vstorage->num_non_empty_levels() - 1)) {
147
0
    return mutable_cf_options.bottommost_compression;
148
0
  }
149
  // If the user has specified a different compression level for each level,
150
  // then pick the compression for that level.
151
11.4k
  if (!mutable_cf_options.compression_per_level.empty()) {
152
0
    assert(level == 0 || level >= base_level);
153
0
    int idx = (level == 0) ? 0 : level - base_level + 1;
154
155
0
    const int n =
156
0
        static_cast<int>(mutable_cf_options.compression_per_level.size()) - 1;
157
    // It is possible for level_ to be -1; in that case, we use level
158
    // 0's compression.  This occurs mostly in backwards compatibility
159
    // situations when the builder doesn't know what level the file
160
    // belongs to.  Likewise, if level is beyond the end of the
161
    // specified compression levels, use the last value.
162
0
    return mutable_cf_options
163
0
        .compression_per_level[std::max(0, std::min(idx, n))];
164
11.4k
  } else {
165
11.4k
    return mutable_cf_options.compression;
166
11.4k
  }
167
11.4k
}
168
169
CompressionOptions GetCompressionOptions(const MutableCFOptions& cf_options,
170
                                         const VersionStorageInfo* vstorage,
171
                                         int level,
172
7.82k
                                         const bool enable_compression) {
173
7.82k
  if (!enable_compression) {
174
0
    return cf_options.compression_opts;
175
0
  }
176
  // If bottommost_compression_opts is enabled and we are compacting to the
177
  // bottommost level then we should use the specified compression options.
178
7.82k
  if (level >= (vstorage->num_non_empty_levels() - 1) &&
179
7.82k
      cf_options.bottommost_compression_opts.enabled) {
180
0
    return cf_options.bottommost_compression_opts;
181
0
  }
182
7.82k
  return cf_options.compression_opts;
183
7.82k
}
184
185
CompactionPicker::CompactionPicker(const ImmutableOptions& ioptions,
186
                                   const InternalKeyComparator* icmp)
187
100k
    : ioptions_(ioptions), icmp_(icmp) {}
188
189
100k
CompactionPicker::~CompactionPicker() = default;
190
191
// Delete this compaction from the list of running compactions.
192
void CompactionPicker::ReleaseCompactionFiles(Compaction* c,
193
7.82k
                                              const Status& status) {
194
7.82k
  UnregisterCompaction(c);
195
7.82k
  if (!status.ok()) {
196
1.35k
    c->ResetNextCompactionIndex();
197
1.35k
  }
198
7.82k
}
199
200
void CompactionPicker::GetRange(const CompactionInputFiles& inputs,
201
                                InternalKey* smallest,
202
44.3k
                                InternalKey* largest) const {
203
44.3k
  const int level = inputs.level;
204
44.3k
  assert(!inputs.empty());
205
44.3k
  smallest->Clear();
206
44.3k
  largest->Clear();
207
208
44.3k
  if (level == 0) {
209
99.1k
    for (size_t i = 0; i < inputs.size(); i++) {
210
68.3k
      FileMetaData* f = inputs[i];
211
68.3k
      if (i == 0) {
212
30.7k
        *smallest = f->smallest;
213
30.7k
        *largest = f->largest;
214
37.5k
      } else {
215
37.5k
        if (icmp_->Compare(f->smallest, *smallest) < 0) {
216
334
          *smallest = f->smallest;
217
334
        }
218
37.5k
        if (icmp_->Compare(f->largest, *largest) > 0) {
219
35.3k
          *largest = f->largest;
220
35.3k
        }
221
37.5k
      }
222
68.3k
    }
223
30.7k
  } else {
224
13.5k
    *smallest = inputs[0]->smallest;
225
13.5k
    *largest = inputs[inputs.size() - 1]->largest;
226
13.5k
  }
227
44.3k
}
228
229
void CompactionPicker::GetRange(const CompactionInputFiles& inputs1,
230
                                const CompactionInputFiles& inputs2,
231
                                InternalKey* smallest,
232
8.57k
                                InternalKey* largest) const {
233
8.57k
  assert(!inputs1.empty() || !inputs2.empty());
234
8.57k
  if (inputs1.empty()) {
235
0
    GetRange(inputs2, smallest, largest);
236
8.57k
  } else if (inputs2.empty()) {
237
3.86k
    GetRange(inputs1, smallest, largest);
238
4.70k
  } else {
239
4.70k
    InternalKey smallest1, smallest2, largest1, largest2;
240
4.70k
    GetRange(inputs1, &smallest1, &largest1);
241
4.70k
    GetRange(inputs2, &smallest2, &largest2);
242
4.70k
    *smallest =
243
4.70k
        icmp_->Compare(smallest1, smallest2) < 0 ? smallest1 : smallest2;
244
4.70k
    *largest = icmp_->Compare(largest1, largest2) < 0 ? largest2 : largest1;
245
4.70k
  }
246
8.57k
}
247
248
void CompactionPicker::GetRange(const std::vector<CompactionInputFiles>& inputs,
249
                                InternalKey* smallest, InternalKey* largest,
250
9.95k
                                int exclude_level) const {
251
9.95k
  InternalKey current_smallest;
252
9.95k
  InternalKey current_largest;
253
9.95k
  bool initialized = false;
254
12.3k
  for (const auto& in : inputs) {
255
12.3k
    if (in.empty() || in.level == exclude_level) {
256
0
      continue;
257
0
    }
258
12.3k
    GetRange(in, &current_smallest, &current_largest);
259
12.3k
    if (!initialized) {
260
9.95k
      *smallest = current_smallest;
261
9.95k
      *largest = current_largest;
262
9.95k
      initialized = true;
263
9.95k
    } else {
264
2.35k
      if (icmp_->Compare(current_smallest, *smallest) < 0) {
265
17
        *smallest = current_smallest;
266
17
      }
267
2.35k
      if (icmp_->Compare(current_largest, *largest) > 0) {
268
2.26k
        *largest = current_largest;
269
2.26k
      }
270
2.35k
    }
271
12.3k
  }
272
9.95k
  assert(initialized);
273
9.95k
}
274
275
bool CompactionPicker::ExpandInputsToCleanCut(const std::string& /*cf_name*/,
276
                                              VersionStorageInfo* vstorage,
277
                                              CompactionInputFiles* inputs,
278
12.9k
                                              InternalKey** next_smallest) {
279
  // This isn't good compaction
280
12.9k
  assert(!inputs->empty());
281
282
12.9k
  const int level = inputs->level;
283
  // GetOverlappingInputs will always do the right thing for level-0.
284
  // So we don't need to do any expansion if level == 0.
285
12.9k
  if (level == 0) {
286
8.47k
    return true;
287
8.47k
  }
288
289
4.45k
  InternalKey smallest, largest;
290
291
  // Keep expanding inputs until we are sure that there is a "clean cut"
292
  // boundary between the files in input and the surrounding files.
293
  // This will ensure that no parts of a key are lost during compaction.
294
4.45k
  int hint_index = -1;
295
4.45k
  size_t old_size;
296
4.79k
  do {
297
4.79k
    old_size = inputs->size();
298
4.79k
    GetRange(*inputs, &smallest, &largest);
299
4.79k
    inputs->clear();
300
4.79k
    vstorage->GetOverlappingInputs(level, &smallest, &largest, &inputs->files,
301
4.79k
                                   hint_index, &hint_index, true, nullptr,
302
4.79k
                                   next_smallest);
303
4.79k
  } while (inputs->size() > old_size);
304
305
  // we started off with inputs non-empty and the previous loop only grew
306
  // inputs. thus, inputs should be non-empty here
307
4.45k
  assert(!inputs->empty());
308
309
#ifndef NDEBUG
310
  AssertCleanCut(icmp_, vstorage, inputs, level, ioptions_.logger);
311
#endif  // NDEBUG
312
313
  // If, after the expansion, there are files that are already under
314
  // compaction, then we must drop/cancel this compaction.
315
4.45k
  if (AreFilesInCompaction(inputs->files)) {
316
0
    return false;
317
0
  }
318
4.45k
  return true;
319
4.45k
}
320
321
bool CompactionPicker::RangeOverlapWithCompaction(
322
    const Slice& smallest_user_key, const Slice& largest_user_key,
323
9.95k
    int level) const {
324
9.95k
  const Comparator* ucmp = icmp_->user_comparator();
325
9.95k
  for (Compaction* c : compactions_in_progress_) {
326
52
    if (c->output_level() == level &&
327
52
        ucmp->CompareWithoutTimestamp(smallest_user_key,
328
52
                                      c->GetLargestUserKey()) <= 0 &&
329
30
        ucmp->CompareWithoutTimestamp(largest_user_key,
330
30
                                      c->GetSmallestUserKey()) >= 0) {
331
      // Overlap
332
0
      return true;
333
0
    }
334
52
    if (c->SupportsPerKeyPlacement()) {
335
0
      if (c->OverlapProximalLevelOutputRange(smallest_user_key,
336
0
                                             largest_user_key)) {
337
0
        return true;
338
0
      }
339
0
    }
340
52
  }
341
  // Did not overlap with any running compaction in level `level`
342
9.95k
  return false;
343
9.95k
}
344
345
bool CompactionPicker::FilesRangeOverlapWithCompaction(
346
    const std::vector<CompactionInputFiles>& inputs, int level,
347
9.95k
    int proximal_level) const {
348
9.95k
  bool is_empty = true;
349
9.95k
  for (auto& in : inputs) {
350
9.95k
    if (!in.empty()) {
351
9.95k
      is_empty = false;
352
9.95k
      break;
353
9.95k
    }
354
9.95k
  }
355
9.95k
  if (is_empty) {
356
    // No files in inputs
357
0
    return false;
358
0
  }
359
360
  // TODO: Intra L0 compactions can have the ranges overlapped, but the input
361
  //  files cannot be overlapped in the order of L0 files.
362
9.95k
  InternalKey smallest, largest;
363
9.95k
  GetRange(inputs, &smallest, &largest, Compaction::kInvalidLevel);
364
9.95k
  if (proximal_level != Compaction::kInvalidLevel) {
365
0
    if (ioptions_.compaction_style == kCompactionStyleUniversal) {
366
0
      if (RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(),
367
0
                                     proximal_level)) {
368
0
        return true;
369
0
      }
370
0
    } else {
371
0
      InternalKey proximal_smallest, proximal_largest;
372
0
      GetRange(inputs, &proximal_smallest, &proximal_largest, level);
373
0
      if (RangeOverlapWithCompaction(proximal_smallest.user_key(),
374
0
                                     proximal_largest.user_key(),
375
0
                                     proximal_level)) {
376
0
        return true;
377
0
      }
378
0
    }
379
0
  }
380
381
9.95k
  return RangeOverlapWithCompaction(smallest.user_key(), largest.user_key(),
382
9.95k
                                    level);
383
9.95k
}
384
385
// Returns true if any one of specified files are being compacted
386
bool CompactionPicker::AreFilesInCompaction(
387
16.3k
    const std::vector<FileMetaData*>& files) {
388
36.0k
  for (size_t i = 0; i < files.size(); i++) {
389
19.7k
    if (files[i]->being_compacted) {
390
22
      return true;
391
22
    }
392
19.7k
  }
393
16.3k
  return false;
394
16.3k
}
395
396
Compaction* CompactionPicker::PickCompactionForCompactFiles(
397
    const CompactionOptions& compact_options,
398
    const std::vector<CompactionInputFiles>& input_files, int output_level,
399
    VersionStorageInfo* vstorage, const MutableCFOptions& mutable_cf_options,
400
    const MutableDBOptions& mutable_db_options, uint32_t output_path_id,
401
    std::optional<SequenceNumber> earliest_snapshot,
402
0
    const SnapshotChecker* snapshot_checker) {
403
#ifndef NDEBUG
404
  assert(input_files.size());
405
  // This compaction output should not overlap with a running compaction as
406
  // `SanitizeAndConvertCompactionInputFiles` should've checked earlier and db
407
  // mutex shouldn't have been released since.
408
  int start_level = Compaction::kInvalidLevel;
409
  for (const auto& in : input_files) {
410
    // input_files should already be sorted by level
411
    if (!in.empty()) {
412
      start_level = in.level;
413
      break;
414
    }
415
  }
416
  assert(output_level == 0 || !FilesRangeOverlapWithCompaction(
417
                                  input_files, output_level,
418
                                  Compaction::EvaluateProximalLevel(
419
                                      vstorage, mutable_cf_options, ioptions_,
420
                                      start_level, output_level)));
421
#endif /* !NDEBUG */
422
423
0
  CompressionType compression_type;
424
0
  if (compact_options.compression == kDisableCompressionOption) {
425
0
    int base_level;
426
0
    if (ioptions_.compaction_style == kCompactionStyleLevel) {
427
0
      base_level = vstorage->base_level();
428
0
    } else {
429
0
      base_level = 1;
430
0
    }
431
0
    compression_type = GetCompressionType(vstorage, mutable_cf_options,
432
0
                                          output_level, base_level);
433
0
  } else {
434
    // TODO(ajkr): `CompactionOptions` offers configurable `CompressionType`
435
    // without configurable `CompressionOptions`, which is inconsistent.
436
0
    compression_type = compact_options.compression;
437
0
  }
438
439
0
  auto c = new Compaction(
440
0
      vstorage, ioptions_, mutable_cf_options, mutable_db_options, input_files,
441
0
      output_level, compact_options.output_file_size_limit,
442
0
      mutable_cf_options.max_compaction_bytes, output_path_id, compression_type,
443
0
      GetCompressionOptions(mutable_cf_options, vstorage, output_level),
444
0
      compact_options.output_temperature_override,
445
0
      compact_options.max_subcompactions,
446
0
      /* grandparents */ {}, earliest_snapshot, snapshot_checker,
447
0
      CompactionReason::kManualCompaction);
448
0
  RegisterCompaction(c);
449
0
  return c;
450
0
}
451
452
Status CompactionPicker::GetCompactionInputsFromFileNumbers(
453
    std::vector<CompactionInputFiles>* input_files,
454
    std::unordered_set<uint64_t>* input_set, const VersionStorageInfo* vstorage,
455
0
    const CompactionOptions& /*compact_options*/) const {
456
0
  if (input_set->size() == 0U) {
457
0
    return Status::InvalidArgument(
458
0
        "Compaction must include at least one file.");
459
0
  }
460
0
  assert(input_files);
461
462
0
  std::vector<CompactionInputFiles> matched_input_files;
463
0
  matched_input_files.resize(vstorage->num_levels());
464
0
  int first_non_empty_level = -1;
465
0
  int last_non_empty_level = -1;
466
  // TODO(yhchiang): use a lazy-initialized mapping from
467
  //                 file_number to FileMetaData in Version.
468
0
  for (int level = 0; level < vstorage->num_levels(); ++level) {
469
0
    for (auto file : vstorage->LevelFiles(level)) {
470
0
      auto iter = input_set->find(file->fd.GetNumber());
471
0
      if (iter != input_set->end()) {
472
0
        matched_input_files[level].files.push_back(file);
473
0
        input_set->erase(iter);
474
0
        last_non_empty_level = level;
475
0
        if (first_non_empty_level == -1) {
476
0
          first_non_empty_level = level;
477
0
        }
478
0
      }
479
0
    }
480
0
  }
481
482
0
  if (!input_set->empty()) {
483
0
    std::string message(
484
0
        "Cannot find matched SST files for the following file numbers:");
485
0
    for (auto fn : *input_set) {
486
0
      message += " ";
487
0
      message += std::to_string(fn);
488
0
    }
489
0
    return Status::InvalidArgument(message);
490
0
  }
491
492
0
  for (int level = first_non_empty_level; level <= last_non_empty_level;
493
0
       ++level) {
494
0
    matched_input_files[level].level = level;
495
0
    input_files->emplace_back(std::move(matched_input_files[level]));
496
0
  }
497
498
0
  return Status::OK();
499
0
}
500
501
// Returns true if any one of the parent files are being compacted
502
bool CompactionPicker::IsRangeInCompaction(VersionStorageInfo* vstorage,
503
                                           const InternalKey* smallest,
504
                                           const InternalKey* largest,
505
2.84k
                                           int level, int* level_index) {
506
2.84k
  std::vector<FileMetaData*> inputs;
507
2.84k
  assert(level < NumberLevels());
508
509
2.84k
  vstorage->GetOverlappingInputs(level, smallest, largest, &inputs,
510
2.84k
                                 level_index ? *level_index : 0, level_index);
511
2.84k
  return AreFilesInCompaction(inputs);
512
2.84k
}
513
514
// Populates the set of inputs of all other levels that overlap with the
515
// start level.
516
// Now we assume all levels except start level and output level are empty.
517
// Will also attempt to expand "start level" if that doesn't expand
518
// "output level" or cause "level" to include a file for compaction that has an
519
// overlapping user-key with another file.
520
// REQUIRES: input_level and output_level are different
521
// REQUIRES: inputs->empty() == false
522
// Returns false if files on parent level are currently in compaction, which
523
// means that we can't compact them
524
bool CompactionPicker::SetupOtherInputs(
525
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
526
    VersionStorageInfo* vstorage, CompactionInputFiles* inputs,
527
    CompactionInputFiles* output_level_inputs, int* parent_index,
528
    int base_index, bool only_expand_towards_right,
529
6.24k
    const FileMetaData* starting_l0_file) {
530
6.24k
  assert(!inputs->empty());
531
6.24k
  assert(output_level_inputs->empty());
532
6.24k
  const int input_level = inputs->level;
533
6.24k
  const int output_level = output_level_inputs->level;
534
6.24k
  if (input_level == output_level) {
535
    // no possibility of conflict
536
832
    return true;
537
832
  }
538
539
  // For now, we only support merging two levels, start level and output level.
540
  // We need to assert other levels are empty.
541
32.4k
  for (int l = input_level + 1; l < output_level; l++) {
542
27.0k
    assert(vstorage->NumLevelFiles(l) == 0);
543
27.0k
  }
544
545
5.40k
  InternalKey smallest, largest;
546
547
  // Get the range one last time.
548
5.40k
  GetRange(*inputs, &smallest, &largest);
549
550
  // Populate the set of next-level files (inputs_GetOutputLevelInputs()) to
551
  // include in compaction
552
5.40k
  vstorage->GetOverlappingInputs(output_level, &smallest, &largest,
553
5.40k
                                 &output_level_inputs->files, *parent_index,
554
5.40k
                                 parent_index);
555
5.40k
  if (AreFilesInCompaction(output_level_inputs->files)) {
556
22
    return false;
557
22
  }
558
5.38k
  if (!output_level_inputs->empty()) {
559
2.35k
    if (!ExpandInputsToCleanCut(cf_name, vstorage, output_level_inputs)) {
560
0
      return false;
561
0
    }
562
2.35k
  }
563
564
  // See if we can further grow the number of inputs in "level" without
565
  // changing the number of "level+1" files we pick up. We also choose NOT
566
  // to expand if this would cause "level" to include some entries for some
567
  // user key, while excluding other entries for the same user key. This
568
  // can happen when one user key spans multiple files.
569
5.38k
  if (!output_level_inputs->empty()) {
570
2.35k
    const uint64_t output_level_inputs_size =
571
2.35k
        TotalFileSize(output_level_inputs->files);
572
2.35k
    const uint64_t inputs_size = TotalFileSize(inputs->files);
573
2.35k
    bool expand_inputs = false;
574
575
2.35k
    CompactionInputFiles expanded_inputs;
576
2.35k
    expanded_inputs.level = input_level;
577
    // Get closed interval of output level
578
2.35k
    InternalKey all_start, all_limit;
579
2.35k
    GetRange(*inputs, *output_level_inputs, &all_start, &all_limit);
580
2.35k
    bool try_overlapping_inputs = true;
581
2.35k
    if (only_expand_towards_right) {
582
      // Round-robin compaction only allows expansion towards the larger side.
583
0
      vstorage->GetOverlappingInputs(input_level, &smallest, &all_limit,
584
0
                                     &expanded_inputs.files, base_index,
585
0
                                     nullptr, true, starting_l0_file);
586
2.35k
    } else {
587
2.35k
      vstorage->GetOverlappingInputs(input_level, &all_start, &all_limit,
588
2.35k
                                     &expanded_inputs.files, base_index,
589
2.35k
                                     nullptr, true, starting_l0_file);
590
2.35k
    }
591
2.35k
    uint64_t expanded_inputs_size = TotalFileSize(expanded_inputs.files);
592
2.35k
    if (!ExpandInputsToCleanCut(cf_name, vstorage, &expanded_inputs)) {
593
0
      try_overlapping_inputs = false;
594
0
    }
595
    // It helps to reduce write amp and avoid a further separate compaction
596
    // to include more input level files without expanding output level files.
597
    // So we apply a softer limit. We still need a limit to avoid overly large
598
    // compactions and potential high space amp spikes.
599
2.35k
    const uint64_t limit =
600
2.35k
        MultiplyCheckOverflow(mutable_cf_options.max_compaction_bytes, 2.0);
601
2.35k
    if (try_overlapping_inputs && expanded_inputs.size() > inputs->size() &&
602
11
        !AreFilesInCompaction(expanded_inputs.files) &&
603
11
        output_level_inputs_size + expanded_inputs_size < limit) {
604
11
      InternalKey new_start, new_limit;
605
11
      GetRange(expanded_inputs, &new_start, &new_limit);
606
11
      CompactionInputFiles expanded_output_level_inputs;
607
11
      expanded_output_level_inputs.level = output_level;
608
11
      vstorage->GetOverlappingInputs(output_level, &new_start, &new_limit,
609
11
                                     &expanded_output_level_inputs.files,
610
11
                                     *parent_index, parent_index);
611
11
      assert(!expanded_output_level_inputs.empty());
612
11
      if (!AreFilesInCompaction(expanded_output_level_inputs.files) &&
613
11
          ExpandInputsToCleanCut(cf_name, vstorage,
614
11
                                 &expanded_output_level_inputs) &&
615
11
          expanded_output_level_inputs.size() == output_level_inputs->size()) {
616
11
        expand_inputs = true;
617
11
      }
618
11
    }
619
2.35k
    if (!expand_inputs) {
620
2.34k
      vstorage->GetCleanInputsWithinInterval(input_level, &all_start,
621
2.34k
                                             &all_limit, &expanded_inputs.files,
622
2.34k
                                             base_index, nullptr);
623
2.34k
      expanded_inputs_size = TotalFileSize(expanded_inputs.files);
624
2.34k
      if (expanded_inputs.size() > inputs->size() &&
625
0
          !AreFilesInCompaction(expanded_inputs.files) &&
626
0
          (output_level_inputs_size + expanded_inputs_size) < limit) {
627
0
        expand_inputs = true;
628
0
      }
629
2.34k
    }
630
2.35k
    if (expand_inputs) {
631
11
      ROCKS_LOG_INFO(ioptions_.logger,
632
11
                     "[%s] Expanding@%d %" ROCKSDB_PRIszt "+%" ROCKSDB_PRIszt
633
11
                     "(%" PRIu64 "+%" PRIu64 " bytes) to %" ROCKSDB_PRIszt
634
11
                     "+%" ROCKSDB_PRIszt " (%" PRIu64 "+%" PRIu64 " bytes)\n",
635
11
                     cf_name.c_str(), input_level, inputs->size(),
636
11
                     output_level_inputs->size(), inputs_size,
637
11
                     output_level_inputs_size, expanded_inputs.size(),
638
11
                     output_level_inputs->size(), expanded_inputs_size,
639
11
                     output_level_inputs_size);
640
11
      inputs->files = expanded_inputs.files;
641
11
    }
642
3.03k
  } else {
643
    // Likely to be trivial move. Expand files if they are still trivial moves,
644
    // but limit to mutable_cf_options.max_compaction_bytes or 8 files so that
645
    // we don't create too much compaction pressure for the next level.
646
3.03k
  }
647
5.38k
  return true;
648
5.38k
}
649
650
void CompactionPicker::GetGrandparents(
651
    VersionStorageInfo* vstorage, const CompactionInputFiles& inputs,
652
    const CompactionInputFiles& output_level_inputs,
653
6.21k
    std::vector<FileMetaData*>* grandparents) {
654
6.21k
  InternalKey start, limit;
655
6.21k
  GetRange(inputs, output_level_inputs, &start, &limit);
656
  // Compute the set of grandparent files that overlap this compaction
657
  // (parent == level+1; grandparent == level+2 or the first
658
  // level after that has overlapping files)
659
6.21k
  for (int level = output_level_inputs.level + 1; level < NumberLevels();
660
6.21k
       level++) {
661
0
    vstorage->GetOverlappingInputs(level, &start, &limit, grandparents);
662
0
    if (!grandparents->empty()) {
663
0
      break;
664
0
    }
665
0
  }
666
6.21k
}
667
668
Compaction* CompactionPicker::PickCompactionForCompactRange(
669
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
670
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
671
    int input_level, int output_level,
672
    const CompactRangeOptions& compact_range_options, const InternalKey* begin,
673
    const InternalKey* end, InternalKey** compaction_end, bool* manual_conflict,
674
    uint64_t max_file_num_to_ignore, const std::string& trim_ts,
675
2.98k
    const std::string& full_history_ts_low) {
676
  // CompactionPickerFIFO has its own implementation of compact range
677
2.98k
  assert(ioptions_.compaction_style != kCompactionStyleFIFO);
678
679
2.98k
  if (input_level == ColumnFamilyData::kCompactAllLevels) {
680
0
    assert(ioptions_.compaction_style == kCompactionStyleUniversal);
681
682
    // Universal compaction with more than one level always compacts all the
683
    // files together to the last level.
684
0
    assert(vstorage->num_levels() > 1);
685
0
    int max_output_level = vstorage->MaxOutputLevel(
686
0
        ioptions_.cf_allow_ingest_behind || ioptions_.allow_ingest_behind);
687
    // DBImpl::CompactRange() set output level to be the last level
688
0
    assert(output_level == max_output_level);
689
    // DBImpl::RunManualCompaction will make full range for universal compaction
690
0
    assert(begin == nullptr);
691
0
    assert(end == nullptr);
692
0
    *compaction_end = nullptr;
693
694
0
    int start_level = 0;
695
0
    for (; start_level <= max_output_level &&
696
0
           vstorage->NumLevelFiles(start_level) == 0;
697
0
         start_level++) {
698
0
    }
699
0
    if (start_level > max_output_level) {
700
0
      return nullptr;
701
0
    }
702
703
0
    if ((start_level == 0) && (!level0_compactions_in_progress_.empty())) {
704
0
      *manual_conflict = true;
705
      // Only one level 0 compaction allowed
706
0
      return nullptr;
707
0
    }
708
709
0
    std::vector<CompactionInputFiles> inputs(max_output_level + 1 -
710
0
                                             start_level);
711
0
    for (int level = start_level; level <= max_output_level; level++) {
712
0
      inputs[level - start_level].level = level;
713
0
      auto& files = inputs[level - start_level].files;
714
0
      for (FileMetaData* f : vstorage->LevelFiles(level)) {
715
0
        files.push_back(f);
716
0
      }
717
0
      if (AreFilesInCompaction(files)) {
718
0
        *manual_conflict = true;
719
0
        return nullptr;
720
0
      }
721
0
    }
722
723
    // 2 non-exclusive manual compactions could run at the same time producing
724
    // overlaping outputs in the same level.
725
0
    if (FilesRangeOverlapWithCompaction(
726
0
            inputs, output_level,
727
0
            Compaction::EvaluateProximalLevel(vstorage, mutable_cf_options,
728
0
                                              ioptions_, start_level,
729
0
                                              output_level))) {
730
      // This compaction output could potentially conflict with the output
731
      // of a currently running compaction, we cannot run it.
732
0
      *manual_conflict = true;
733
0
      return nullptr;
734
0
    }
735
736
0
    Compaction* c = new Compaction(
737
0
        vstorage, ioptions_, mutable_cf_options, mutable_db_options,
738
0
        std::move(inputs), output_level,
739
0
        MaxFileSizeForLevel(mutable_cf_options, output_level,
740
0
                            ioptions_.compaction_style),
741
0
        /* max_compaction_bytes */ LLONG_MAX,
742
0
        compact_range_options.target_path_id,
743
0
        GetCompressionType(vstorage, mutable_cf_options, output_level, 1),
744
0
        GetCompressionOptions(mutable_cf_options, vstorage, output_level),
745
0
        Temperature::kUnknown, compact_range_options.max_subcompactions,
746
0
        /* grandparents */ {}, /* earliest_snapshot */ std::nullopt,
747
0
        /* snapshot_checker */ nullptr, CompactionReason::kManualCompaction,
748
0
        trim_ts, /* score */ -1,
749
0
        /* l0_files_might_overlap */ true,
750
0
        compact_range_options.blob_garbage_collection_policy,
751
0
        compact_range_options.blob_garbage_collection_age_cutoff);
752
753
0
    RegisterCompaction(c);
754
0
    vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options,
755
0
                                     full_history_ts_low);
756
0
    return c;
757
0
  }
758
759
2.98k
  CompactionInputFiles inputs;
760
2.98k
  inputs.level = input_level;
761
2.98k
  bool covering_the_whole_range = true;
762
763
  // All files are 'overlapping' in universal style compaction.
764
  // We have to compact the entire range in one shot.
765
2.98k
  if (ioptions_.compaction_style == kCompactionStyleUniversal) {
766
0
    begin = nullptr;
767
0
    end = nullptr;
768
0
  }
769
770
2.98k
  vstorage->GetOverlappingInputs(input_level, begin, end, &inputs.files);
771
2.98k
  if (inputs.empty()) {
772
149
    return nullptr;
773
149
  }
774
775
2.83k
  if ((input_level == 0) && (!level0_compactions_in_progress_.empty())) {
776
    // Only one level 0 compaction allowed
777
266
    TEST_SYNC_POINT("CompactionPicker::CompactRange:Conflict");
778
266
    *manual_conflict = true;
779
266
    return nullptr;
780
266
  }
781
782
  // Avoid compacting too much in one shot in case the range is large.
783
  // But we cannot do this for level-0 since level-0 files can overlap
784
  // and we must not pick one file and drop another older file if the
785
  // two files overlap.
786
2.56k
  if (input_level > 0) {
787
0
    const uint64_t limit = mutable_cf_options.max_compaction_bytes;
788
0
    int hint_index = -1;
789
0
    assert(!inputs.empty());
790
    // Always include first file for progress.
791
0
    uint64_t input_level_total = inputs[0]->fd.GetFileSize();
792
0
    InternalKey* smallest = &(inputs[0]->smallest);
793
0
    InternalKey* largest = nullptr;
794
0
    for (size_t i = 1; i < inputs.size(); ++i) {
795
      // Consider whether to include inputs[i]
796
0
      largest = &inputs[i]->largest;
797
798
0
      uint64_t input_file_size = inputs[i]->fd.GetFileSize();
799
0
      uint64_t output_level_total = 0;
800
0
      if (output_level < vstorage->num_non_empty_levels()) {
801
0
        std::vector<FileMetaData*> files;
802
0
        vstorage->GetOverlappingInputsRangeBinarySearch(
803
0
            output_level, smallest, largest, &files, hint_index, &hint_index);
804
0
        for (const auto& file : files) {
805
0
          output_level_total += file->fd.GetFileSize();
806
0
        }
807
0
      }
808
809
0
      input_level_total += input_file_size;
810
811
0
      if (input_level_total + output_level_total > limit) {
812
        // To ensure compaction size is <= limit, leave out inputs from
813
        // index i onwards.
814
0
        covering_the_whole_range = false;
815
0
        inputs.files.resize(i);
816
0
        break;
817
0
      }
818
0
    }
819
0
  }
820
821
2.56k
  assert(compact_range_options.target_path_id <
822
2.56k
         static_cast<uint32_t>(ioptions_.cf_paths.size()));
823
824
  // for BOTTOM LEVEL compaction only, use max_file_num_to_ignore to filter out
825
  // files that are created during the current compaction.
826
2.56k
  if ((compact_range_options.bottommost_level_compaction ==
827
2.56k
           BottommostLevelCompaction::kForceOptimized ||
828
2.56k
       compact_range_options.bottommost_level_compaction ==
829
2.56k
           BottommostLevelCompaction::kIfHaveCompactionFilter) &&
830
2.56k
      max_file_num_to_ignore != std::numeric_limits<uint64_t>::max()) {
831
0
    assert(input_level == output_level);
832
    // inputs_shrunk holds a continuous subset of input files which were all
833
    // created before the current manual compaction
834
0
    std::vector<FileMetaData*> inputs_shrunk;
835
0
    size_t skip_input_index = inputs.size();
836
0
    for (size_t i = 0; i < inputs.size(); ++i) {
837
0
      if (inputs[i]->fd.GetNumber() < max_file_num_to_ignore) {
838
0
        inputs_shrunk.push_back(inputs[i]);
839
0
      } else if (!inputs_shrunk.empty()) {
840
        // inputs[i] was created during the current manual compaction and
841
        // need to be skipped
842
0
        skip_input_index = i;
843
0
        break;
844
0
      }
845
0
    }
846
0
    if (inputs_shrunk.empty()) {
847
0
      return nullptr;
848
0
    }
849
0
    if (inputs.size() != inputs_shrunk.size()) {
850
0
      inputs.files.swap(inputs_shrunk);
851
0
    }
852
    // set covering_the_whole_range to false if there is any file that need to
853
    // be compacted in the range of inputs[skip_input_index+1, inputs.size())
854
0
    for (size_t i = skip_input_index + 1; i < inputs.size(); ++i) {
855
0
      if (inputs[i]->fd.GetNumber() < max_file_num_to_ignore) {
856
0
        covering_the_whole_range = false;
857
0
      }
858
0
    }
859
0
  }
860
861
2.56k
  InternalKey key_storage;
862
2.56k
  InternalKey* next_smallest = &key_storage;
863
2.56k
  if (ExpandInputsToCleanCut(cf_name, vstorage, &inputs, &next_smallest) ==
864
2.56k
      false) {
865
    // manual compaction is now multi-threaded, so it can
866
    // happen that ExpandWhileOverlapping fails
867
    // we handle it higher in RunManualCompaction
868
0
    *manual_conflict = true;
869
0
    return nullptr;
870
0
  }
871
872
2.56k
  if (covering_the_whole_range || !next_smallest) {
873
2.56k
    *compaction_end = nullptr;
874
2.56k
  } else {
875
0
    **compaction_end = *next_smallest;
876
0
  }
877
878
2.56k
  CompactionInputFiles output_level_inputs;
879
2.56k
  if (output_level == ColumnFamilyData::kCompactToBaseLevel) {
880
2.56k
    assert(input_level == 0);
881
2.56k
    output_level = vstorage->base_level();
882
2.56k
    assert(output_level > 0);
883
2.56k
  }
884
15.4k
  for (int i = input_level + 1; i < output_level; i++) {
885
12.8k
    assert(vstorage->NumLevelFiles(i) == 0);
886
12.8k
  }
887
2.56k
  output_level_inputs.level = output_level;
888
2.56k
  if (input_level != output_level) {
889
2.56k
    int parent_index = -1;
890
2.56k
    if (!SetupOtherInputs(cf_name, mutable_cf_options, vstorage, &inputs,
891
2.56k
                          &output_level_inputs, &parent_index, -1)) {
892
      // manual compaction is now multi-threaded, so it can
893
      // happen that SetupOtherInputs fails
894
      // we handle it higher in RunManualCompaction
895
22
      *manual_conflict = true;
896
22
      return nullptr;
897
22
    }
898
2.56k
  }
899
900
2.54k
  std::vector<CompactionInputFiles> compaction_inputs({inputs});
901
2.54k
  if (!output_level_inputs.empty()) {
902
1.06k
    compaction_inputs.push_back(output_level_inputs);
903
1.06k
  }
904
6.15k
  for (size_t i = 0; i < compaction_inputs.size(); i++) {
905
3.61k
    if (AreFilesInCompaction(compaction_inputs[i].files)) {
906
0
      *manual_conflict = true;
907
0
      return nullptr;
908
0
    }
909
3.61k
  }
910
911
  // 2 non-exclusive manual compactions could run at the same time producing
912
  // overlaping outputs in the same level.
913
2.54k
  if (FilesRangeOverlapWithCompaction(
914
2.54k
          compaction_inputs, output_level,
915
2.54k
          Compaction::EvaluateProximalLevel(vstorage, mutable_cf_options,
916
2.54k
                                            ioptions_, input_level,
917
2.54k
                                            output_level))) {
918
    // This compaction output could potentially conflict with the output
919
    // of a currently running compaction, we cannot run it.
920
0
    *manual_conflict = true;
921
0
    return nullptr;
922
0
  }
923
924
2.54k
  std::vector<FileMetaData*> grandparents;
925
2.54k
  GetGrandparents(vstorage, inputs, output_level_inputs, &grandparents);
926
2.54k
  Compaction* compaction = new Compaction(
927
2.54k
      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
928
2.54k
      std::move(compaction_inputs), output_level,
929
2.54k
      MaxFileSizeForLevel(mutable_cf_options, output_level,
930
2.54k
                          ioptions_.compaction_style, vstorage->base_level(),
931
2.54k
                          ioptions_.level_compaction_dynamic_level_bytes),
932
2.54k
      mutable_cf_options.max_compaction_bytes,
933
2.54k
      compact_range_options.target_path_id,
934
2.54k
      GetCompressionType(vstorage, mutable_cf_options, output_level,
935
2.54k
                         vstorage->base_level()),
936
2.54k
      GetCompressionOptions(mutable_cf_options, vstorage, output_level),
937
2.54k
      Temperature::kUnknown, compact_range_options.max_subcompactions,
938
2.54k
      std::move(grandparents),
939
2.54k
      /* earliest_snapshot */ std::nullopt, /* snapshot_checker */ nullptr,
940
2.54k
      CompactionReason::kManualCompaction, trim_ts, /* score */ -1,
941
2.54k
      /* l0_files_might_overlap */ true,
942
2.54k
      compact_range_options.blob_garbage_collection_policy,
943
2.54k
      compact_range_options.blob_garbage_collection_age_cutoff);
944
945
2.54k
  TEST_SYNC_POINT_CALLBACK("CompactionPicker::CompactRange:Return", compaction);
946
2.54k
  RegisterCompaction(compaction);
947
948
  // Creating a compaction influences the compaction score because the score
949
  // takes running compactions into account (by skipping files that are already
950
  // being compacted). Since we just changed compaction score, we recalculate it
951
  // here
952
2.54k
  vstorage->ComputeCompactionScore(ioptions_, mutable_cf_options,
953
2.54k
                                   full_history_ts_low);
954
955
2.54k
  return compaction;
956
2.54k
}
957
958
namespace {
959
// Test whether two files have overlapping key-ranges.
960
bool HaveOverlappingKeyRanges(const Comparator* c, const SstFileMetaData& a,
961
0
                              const SstFileMetaData& b) {
962
0
  if (c->CompareWithoutTimestamp(a.smallestkey, b.smallestkey) >= 0) {
963
0
    if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
964
      // b.smallestkey <= a.smallestkey <= b.largestkey
965
0
      return true;
966
0
    }
967
0
  } else if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) {
968
    // a.smallestkey < b.smallestkey <= a.largestkey
969
0
    return true;
970
0
  }
971
0
  if (c->CompareWithoutTimestamp(a.largestkey, b.largestkey) <= 0) {
972
0
    if (c->CompareWithoutTimestamp(a.largestkey, b.smallestkey) >= 0) {
973
      // b.smallestkey <= a.largestkey <= b.largestkey
974
0
      return true;
975
0
    }
976
0
  } else if (c->CompareWithoutTimestamp(a.smallestkey, b.largestkey) <= 0) {
977
    // a.smallestkey <= b.largestkey < a.largestkey
978
0
    return true;
979
0
  }
980
0
  return false;
981
0
}
982
}  // namespace
983
984
Status CompactionPicker::SanitizeCompactionInputFilesForAllLevels(
985
    std::unordered_set<uint64_t>* input_files,
986
0
    const ColumnFamilyMetaData& cf_meta, const int output_level) const {
987
0
  auto& levels = cf_meta.levels;
988
0
  auto comparator = icmp_->user_comparator();
989
990
  // TODO(yhchiang): add is_adjustable to CompactionOptions
991
992
  // the smallest and largest key of the current compaction input
993
0
  std::string smallestkey;
994
0
  std::string largestkey;
995
  // a flag for initializing smallest and largest key
996
0
  bool is_first = false;
997
0
  const int kNotFound = -1;
998
999
  // For each level, it does the following things:
1000
  // 1. Find the first and the last compaction input files
1001
  //    in the current level.
1002
  // 2. Include all files between the first and the last
1003
  //    compaction input files.
1004
  // 3. Update the compaction key-range.
1005
  // 4. For all remaining levels, include files that have
1006
  //    overlapping key-range with the compaction key-range.
1007
0
  for (int l = 0; l <= output_level; ++l) {
1008
0
    auto& current_files = levels[l].files;
1009
0
    int first_included = static_cast<int>(current_files.size());
1010
0
    int last_included = kNotFound;
1011
1012
    // identify the first and the last compaction input files
1013
    // in the current level.
1014
0
    for (size_t f = 0; f < current_files.size(); ++f) {
1015
0
      const uint64_t file_number = TableFileNameToNumber(current_files[f].name);
1016
0
      if (input_files->find(file_number) == input_files->end()) {
1017
0
        continue;
1018
0
      }
1019
0
      first_included = std::min(first_included, static_cast<int>(f));
1020
0
      last_included = std::max(last_included, static_cast<int>(f));
1021
0
      if (is_first == false) {
1022
0
        smallestkey = current_files[f].smallestkey;
1023
0
        largestkey = current_files[f].largestkey;
1024
0
        is_first = true;
1025
0
      }
1026
0
    }
1027
0
    if (last_included == kNotFound) {
1028
0
      continue;
1029
0
    }
1030
1031
0
    if (l != 0) {
1032
      // expand the compaction input of the current level if it
1033
      // has overlapping key-range with other non-compaction input
1034
      // files in the same level.
1035
0
      while (first_included > 0) {
1036
0
        if (comparator->CompareWithoutTimestamp(
1037
0
                current_files[first_included - 1].largestkey,
1038
0
                current_files[first_included].smallestkey) < 0) {
1039
0
          break;
1040
0
        }
1041
0
        first_included--;
1042
0
      }
1043
1044
0
      while (last_included < static_cast<int>(current_files.size()) - 1) {
1045
0
        if (comparator->CompareWithoutTimestamp(
1046
0
                current_files[last_included + 1].smallestkey,
1047
0
                current_files[last_included].largestkey) > 0) {
1048
0
          break;
1049
0
        }
1050
0
        last_included++;
1051
0
      }
1052
0
    } else if (output_level > 0) {
1053
0
      last_included = static_cast<int>(current_files.size() - 1);
1054
0
    }
1055
1056
    // include all files between the first and the last compaction input files.
1057
0
    for (int f = first_included; f <= last_included; ++f) {
1058
0
      if (current_files[f].being_compacted) {
1059
0
        return Status::Aborted("Necessary compaction input file " +
1060
0
                               current_files[f].name +
1061
0
                               " is currently being compacted.");
1062
0
      }
1063
1064
0
      input_files->insert(TableFileNameToNumber(current_files[f].name));
1065
0
    }
1066
1067
    // update smallest and largest key
1068
0
    if (l == 0) {
1069
0
      for (int f = first_included; f <= last_included; ++f) {
1070
0
        if (comparator->CompareWithoutTimestamp(
1071
0
                smallestkey, current_files[f].smallestkey) > 0) {
1072
0
          smallestkey = current_files[f].smallestkey;
1073
0
        }
1074
0
        if (comparator->CompareWithoutTimestamp(
1075
0
                largestkey, current_files[f].largestkey) < 0) {
1076
0
          largestkey = current_files[f].largestkey;
1077
0
        }
1078
0
      }
1079
0
    } else {
1080
0
      if (comparator->CompareWithoutTimestamp(
1081
0
              smallestkey, current_files[first_included].smallestkey) > 0) {
1082
0
        smallestkey = current_files[first_included].smallestkey;
1083
0
      }
1084
0
      if (comparator->CompareWithoutTimestamp(
1085
0
              largestkey, current_files[last_included].largestkey) < 0) {
1086
0
        largestkey = current_files[last_included].largestkey;
1087
0
      }
1088
0
    }
1089
1090
0
    SstFileMetaData aggregated_file_meta;
1091
0
    aggregated_file_meta.smallestkey = smallestkey;
1092
0
    aggregated_file_meta.largestkey = largestkey;
1093
1094
    // For all lower levels, include all overlapping files.
1095
    // We need to add overlapping files from the current level too because even
1096
    // if there no input_files in level l, we would still need to add files
1097
    // which overlap with the range containing the input_files in levels 0 to l
1098
    // Level 0 doesn't need to be handled this way because files are sorted by
1099
    // time and not by key
1100
0
    for (int m = std::max(l, 1); m <= output_level; ++m) {
1101
0
      for (auto& next_lv_file : levels[m].files) {
1102
0
        if (HaveOverlappingKeyRanges(comparator, aggregated_file_meta,
1103
0
                                     next_lv_file)) {
1104
0
          if (next_lv_file.being_compacted) {
1105
0
            return Status::Aborted(
1106
0
                "File " + next_lv_file.name +
1107
0
                " that has overlapping key range with one of the compaction "
1108
0
                " input file is currently being compacted.");
1109
0
          }
1110
0
          input_files->insert(TableFileNameToNumber(next_lv_file.name));
1111
0
        }
1112
0
      }
1113
0
    }
1114
0
  }
1115
0
  return Status::OK();
1116
0
}
1117
1118
Status CompactionPicker::SanitizeAndConvertCompactionInputFiles(
1119
    std::unordered_set<uint64_t>* input_files, const int output_level,
1120
    Version* version,
1121
0
    std::vector<CompactionInputFiles>* converted_input_files) const {
1122
0
  ColumnFamilyMetaData cf_meta;
1123
0
  version->GetColumnFamilyMetaData(&cf_meta);
1124
1125
0
  assert(static_cast<int>(cf_meta.levels.size()) - 1 ==
1126
0
         cf_meta.levels[cf_meta.levels.size() - 1].level);
1127
0
  assert(converted_input_files);
1128
1129
0
  if (output_level >= static_cast<int>(cf_meta.levels.size())) {
1130
0
    return Status::InvalidArgument(
1131
0
        "Output level for column family " + cf_meta.name +
1132
0
        " must between [0, " +
1133
0
        std::to_string(cf_meta.levels[cf_meta.levels.size() - 1].level) + "].");
1134
0
  }
1135
1136
0
  if (output_level > MaxOutputLevel()) {
1137
0
    return Status::InvalidArgument(
1138
0
        "Exceed the maximum output level defined by "
1139
0
        "the current compaction algorithm --- " +
1140
0
        std::to_string(MaxOutputLevel()));
1141
0
  }
1142
1143
0
  if (output_level < 0) {
1144
0
    return Status::InvalidArgument("Output level cannot be negative.");
1145
0
  }
1146
1147
0
  if (input_files->size() == 0) {
1148
0
    return Status::InvalidArgument(
1149
0
        "A compaction must contain at least one file.");
1150
0
  }
1151
1152
0
  Status s = SanitizeCompactionInputFilesForAllLevels(input_files, cf_meta,
1153
0
                                                      output_level);
1154
0
  if (!s.ok()) {
1155
0
    return s;
1156
0
  }
1157
1158
  // for all input files, check whether the file number matches
1159
  // any currently-existing files.
1160
0
  for (auto file_num : *input_files) {
1161
0
    bool found = false;
1162
0
    int input_file_level = -1;
1163
0
    for (const auto& level_meta : cf_meta.levels) {
1164
0
      for (const auto& file_meta : level_meta.files) {
1165
0
        if (file_num == TableFileNameToNumber(file_meta.name)) {
1166
0
          if (file_meta.being_compacted) {
1167
0
            return Status::Aborted("Specified compaction input file " +
1168
0
                                   MakeTableFileName("", file_num) +
1169
0
                                   " is already being compacted.");
1170
0
          }
1171
0
          found = true;
1172
0
          input_file_level = level_meta.level;
1173
0
          break;
1174
0
        }
1175
0
      }
1176
0
      if (found) {
1177
0
        break;
1178
0
      }
1179
0
    }
1180
0
    if (!found) {
1181
0
      return Status::InvalidArgument(
1182
0
          "Specified compaction input file " + MakeTableFileName("", file_num) +
1183
0
          " does not exist in column family " + cf_meta.name + ".");
1184
0
    }
1185
0
    if (input_file_level > output_level) {
1186
0
      return Status::InvalidArgument(
1187
0
          "Cannot compact file to up level, input file: " +
1188
0
          MakeTableFileName("", file_num) + " level " +
1189
0
          std::to_string(input_file_level) + " > output level " +
1190
0
          std::to_string(output_level));
1191
0
    }
1192
0
  }
1193
1194
0
  s = GetCompactionInputsFromFileNumbers(converted_input_files, input_files,
1195
0
                                         version->storage_info(),
1196
0
                                         CompactionOptions());
1197
0
  if (!s.ok()) {
1198
0
    return s;
1199
0
  }
1200
0
  assert(converted_input_files->size() > 0);
1201
0
  if (output_level != 0 &&
1202
0
      FilesRangeOverlapWithCompaction(
1203
0
          *converted_input_files, output_level,
1204
0
          Compaction::EvaluateProximalLevel(
1205
0
              version->storage_info(), version->GetMutableCFOptions(),
1206
0
              ioptions_, (*converted_input_files)[0].level, output_level))) {
1207
0
    return Status::Aborted(
1208
0
        "A running compaction is writing to the same output level(s) in an "
1209
0
        "overlapping key range");
1210
0
  }
1211
0
  return Status::OK();
1212
0
}
1213
1214
7.82k
void CompactionPicker::RegisterCompaction(Compaction* c) {
1215
7.82k
  if (c == nullptr) {
1216
0
    return;
1217
0
  }
1218
7.82k
  assert(ioptions_.compaction_style != kCompactionStyleLevel ||
1219
7.82k
         c->output_level() == 0 ||
1220
7.82k
         !FilesRangeOverlapWithCompaction(*c->inputs(), c->output_level(),
1221
7.82k
                                          c->GetProximalLevel()));
1222
  // CompactionReason::kExternalSstIngestion's start level is just a placeholder
1223
  // number without actual meaning as file ingestion technically does not have
1224
  // an input level like other compactions
1225
7.82k
  if ((c->start_level() == 0 &&
1226
6.99k
       c->compaction_reason() != CompactionReason::kExternalSstIngestion) ||
1227
6.99k
      ioptions_.compaction_style == kCompactionStyleUniversal) {
1228
6.99k
    level0_compactions_in_progress_.insert(c);
1229
6.99k
  }
1230
7.82k
  compactions_in_progress_.insert(c);
1231
7.82k
  TEST_SYNC_POINT_CALLBACK("CompactionPicker::RegisterCompaction:Registered",
1232
7.82k
                           c);
1233
7.82k
}
1234
1235
7.82k
void CompactionPicker::UnregisterCompaction(Compaction* c) {
1236
7.82k
  if (c == nullptr) {
1237
0
    return;
1238
0
  }
1239
7.82k
  if (c->start_level() == 0 ||
1240
6.99k
      ioptions_.compaction_style == kCompactionStyleUniversal) {
1241
6.99k
    level0_compactions_in_progress_.erase(c);
1242
6.99k
  }
1243
7.82k
  compactions_in_progress_.erase(c);
1244
7.82k
}
1245
1246
void CompactionPicker::PickFilesMarkedForCompaction(
1247
    const std::string& cf_name, VersionStorageInfo* vstorage, int* start_level,
1248
    int* output_level, CompactionInputFiles* start_level_inputs,
1249
1.62k
    std::function<bool(const FileMetaData*)> skip_marked_file) {
1250
1.62k
  if (vstorage->FilesMarkedForCompaction().empty()) {
1251
1.62k
    return;
1252
1.62k
  }
1253
1254
0
  auto continuation = [&, cf_name](std::pair<int, FileMetaData*> level_file) {
1255
    // If it's being compacted it has nothing to do here.
1256
    // If this assert() fails that means that some function marked some
1257
    // files as being_compacted, but didn't call ComputeCompactionScore()
1258
0
    assert(!level_file.second->being_compacted);
1259
0
    if (skip_marked_file(level_file.second)) {
1260
0
      return false;
1261
0
    }
1262
0
    *start_level = level_file.first;
1263
0
    *output_level =
1264
0
        (*start_level == 0) ? vstorage->base_level() : *start_level + 1;
1265
1266
0
    if (*start_level == 0 && !level0_compactions_in_progress()->empty()) {
1267
0
      return false;
1268
0
    }
1269
1270
0
    start_level_inputs->files = {level_file.second};
1271
0
    start_level_inputs->level = *start_level;
1272
0
    return ExpandInputsToCleanCut(cf_name, vstorage, start_level_inputs);
1273
0
  };
1274
1275
  // take a chance on a random file first
1276
0
  Random64 rnd(/* seed */ reinterpret_cast<uint64_t>(vstorage));
1277
0
  size_t random_file_index = static_cast<size_t>(rnd.Uniform(
1278
0
      static_cast<uint64_t>(vstorage->FilesMarkedForCompaction().size())));
1279
0
  TEST_SYNC_POINT_CALLBACK("CompactionPicker::PickFilesMarkedForCompaction",
1280
0
                           &random_file_index);
1281
1282
0
  if (continuation(vstorage->FilesMarkedForCompaction()[random_file_index])) {
1283
    // found the compaction!
1284
0
    return;
1285
0
  }
1286
1287
0
  for (auto& level_file : vstorage->FilesMarkedForCompaction()) {
1288
0
    if (continuation(level_file)) {
1289
      // found the compaction!
1290
0
      return;
1291
0
    }
1292
0
  }
1293
0
  start_level_inputs->files.clear();
1294
0
}
1295
1296
bool CompactionPicker::GetOverlappingL0Files(
1297
    VersionStorageInfo* vstorage, CompactionInputFiles* start_level_inputs,
1298
2.84k
    int output_level, int* parent_index, const FileMetaData* starting_l0_file) {
1299
  // Two level 0 compaction won't run at the same time, so don't need to worry
1300
  // about files on level 0 being compacted.
1301
2.84k
  assert(level0_compactions_in_progress()->empty());
1302
2.84k
  InternalKey smallest, largest;
1303
2.84k
  GetRange(*start_level_inputs, &smallest, &largest);
1304
  // Note that the next call will discard the file we placed in
1305
  // c->inputs_[0] earlier and replace it with an overlapping set
1306
  // which will include the picked file.
1307
2.84k
  start_level_inputs->files.clear();
1308
2.84k
  vstorage->GetOverlappingInputs(0, &smallest, &largest,
1309
2.84k
                                 &(start_level_inputs->files),
1310
2.84k
                                 /*hint_index=*/-1,
1311
2.84k
                                 /*file_index=*/nullptr,
1312
2.84k
                                 /*expand_range=*/true,
1313
2.84k
                                 /*starting_l0_file=*/starting_l0_file);
1314
1315
  // If we include more L0 files in the same compaction run it can
1316
  // cause the 'smallest' and 'largest' key to get extended to a
1317
  // larger range. So, re-invoke GetRange to get the new key range
1318
2.84k
  GetRange(*start_level_inputs, &smallest, &largest);
1319
2.84k
  if (IsRangeInCompaction(vstorage, &smallest, &largest, output_level,
1320
2.84k
                          parent_index)) {
1321
0
    return false;
1322
0
  }
1323
2.84k
  assert(!start_level_inputs->files.empty());
1324
1325
2.84k
  return true;
1326
2.84k
}
1327
1328
}  // namespace ROCKSDB_NAMESPACE