Coverage Report

Created: 2026-05-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_picker_fifo.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/compaction/compaction_picker_fifo.h"
11
12
#include <algorithm>
13
#include <cinttypes>
14
#include <string>
15
#include <vector>
16
17
#include "db/column_family.h"
18
#include "logging/log_buffer.h"
19
#include "logging/logging.h"
20
#include "options/options_helper.h"
21
#include "rocksdb/listener.h"
22
#include "rocksdb/statistics.h"
23
#include "rocksdb/status.h"
24
#include "util/string_util.h"
25
26
namespace ROCKSDB_NAMESPACE {
27
namespace {
28
0
uint64_t GetTotalFilesSize(const std::vector<FileMetaData*>& files) {
29
0
  uint64_t total_size = 0;
30
0
  for (const auto& f : files) {
31
0
    total_size += f->fd.file_size;
32
0
  }
33
0
  return total_size;
34
0
}
35
36
// Compute effective data size and capacity limit for FIFO compaction.
37
// When max_data_files_size > 0 (blob-aware mode), the effective size includes
38
// both SST and blob file sizes, and the limit is max_data_files_size.
39
// Otherwise, only SST sizes are used with max_table_files_size as the limit.
40
void GetEffectiveSizeAndLimit(const CompactionOptionsFIFO& fifo_opts,
41
                              uint64_t total_sst_size, uint64_t total_blob_size,
42
                              uint64_t* effective_size,
43
0
                              uint64_t* effective_max) {
44
0
  *effective_size = total_sst_size;
45
0
  *effective_max = fifo_opts.max_table_files_size;
46
0
  if (fifo_opts.max_data_files_size > 0) {
47
0
    *effective_size += total_blob_size;
48
0
    *effective_max = fifo_opts.max_data_files_size;
49
0
  }
50
0
}
51
52
// Return the effective capacity limit for FIFO compaction.
53
// Convenience wrapper when only the limit is needed (e.g., PickTTLCompaction).
54
0
uint64_t GetEffectiveMax(const CompactionOptionsFIFO& fifo_opts) {
55
0
  return fifo_opts.max_data_files_size > 0 ? fifo_opts.max_data_files_size
56
0
                                           : fifo_opts.max_table_files_size;
57
0
}
58
}  // anonymous namespace
59
60
bool FIFOCompactionPicker::NeedsCompaction(
61
0
    const VersionStorageInfo* vstorage) const {
62
0
  const int kLevel0 = 0;
63
0
  return vstorage->CompactionScore(kLevel0) >= 1;
64
0
}
65
66
Compaction* FIFOCompactionPicker::PickTTLCompaction(
67
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
68
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
69
0
    LogBuffer* log_buffer) {
70
0
  assert(mutable_cf_options.ttl > 0);
71
72
0
  const int kLevel0 = 0;
73
0
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
74
0
  uint64_t total_size = GetTotalFilesSize(level_files);
75
76
0
  int64_t _current_time;
77
0
  auto status = ioptions_.clock->GetCurrentTime(&_current_time);
78
0
  if (!status.ok()) {
79
0
    ROCKS_LOG_BUFFER(log_buffer,
80
0
                     "[%s] FIFO compaction: Couldn't get current time: %s. "
81
0
                     "Not doing compactions based on TTL. ",
82
0
                     cf_name.c_str(), status.ToString().c_str());
83
0
    return nullptr;
84
0
  }
85
0
  const uint64_t current_time = static_cast<uint64_t>(_current_time);
86
87
0
  if (!level0_compactions_in_progress_.empty()) {
88
0
    ROCKS_LOG_BUFFER(
89
0
        log_buffer,
90
0
        "[%s] FIFO compaction: Already executing compaction. No need "
91
0
        "to run parallel compactions since compactions are very fast",
92
0
        cf_name.c_str());
93
0
    return nullptr;
94
0
  }
95
96
0
  std::vector<CompactionInputFiles> inputs;
97
0
  inputs.emplace_back();
98
0
  inputs[0].level = 0;
99
100
  // avoid underflow
101
0
  if (current_time > mutable_cf_options.ttl) {
102
0
    for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
103
0
      FileMetaData* f = *ritr;
104
0
      assert(f);
105
0
      TableReader* reader = f->fd.pinned_reader.Get();
106
0
      if (reader != nullptr && reader->GetTableProperties() != nullptr) {
107
0
        uint64_t newest_key_time = f->TryGetNewestKeyTime();
108
0
        uint64_t creation_time = reader->GetTableProperties()->creation_time;
109
0
        uint64_t est_newest_key_time = newest_key_time == kUnknownNewestKeyTime
110
0
                                           ? creation_time
111
0
                                           : newest_key_time;
112
0
        if (est_newest_key_time == kUnknownNewestKeyTime ||
113
0
            est_newest_key_time >= (current_time - mutable_cf_options.ttl)) {
114
0
          break;
115
0
        }
116
0
      }
117
0
      total_size -= f->fd.file_size;
118
0
      inputs[0].files.push_back(f);
119
0
    }
120
0
  }
121
122
  // Return a nullptr and proceed to size-based FIFO compaction if:
123
  // 1. there are no files older than ttl OR
124
  // 2. there are a few files older than ttl, but deleting them will not bring
125
  //    the total size to be less than the size threshold.
126
0
  uint64_t effective_max =
127
0
      GetEffectiveMax(mutable_cf_options.compaction_options_fifo);
128
  // Estimate the effective remaining data after dropping TTL-expired SSTs.
129
  // Each dropped SST also frees a proportional share of blob data.
130
  //
131
  // In multi-level FIFO (migration), we must use total SST across ALL levels
132
  // as the reference, because total_blob covers all levels. Using only L0
133
  // SST would inflate the blob estimate.
134
0
  uint64_t effective_remaining = total_size;
135
0
  if (mutable_cf_options.compaction_options_fifo.max_data_files_size > 0) {
136
0
    uint64_t total_blob = vstorage->GetBlobStats().total_file_size;
137
    // Compute total SST across all levels so the reference scope matches
138
    // total_blob's scope (all levels).
139
0
    uint64_t total_sst_all_levels = GetTotalFilesSize(level_files);
140
0
    for (int level = 1; level < vstorage->num_levels(); ++level) {
141
0
      total_sst_all_levels += GetTotalFilesSize(vstorage->LevelFiles(level));
142
0
    }
143
    // remaining_sst_all = total_sst_all - dropped_l0_sst
144
    // total_size is the remaining L0 SST after removing expired files;
145
    // original L0 SST minus remaining L0 SST = dropped.
146
0
    uint64_t original_l0_sst = GetTotalFilesSize(level_files);
147
0
    uint64_t dropped_sst = original_l0_sst - total_size;
148
0
    uint64_t remaining_sst_all = total_sst_all_levels - dropped_sst;
149
    // Proportional blob estimate: each SST byte "owns" a proportional
150
    // share of blob bytes. Both reference sizes must come from the same
151
    // scope (all levels) to avoid inflated estimates.
152
0
    if (total_sst_all_levels > 0 && total_blob > 0) {
153
0
      effective_remaining =
154
0
          remaining_sst_all +
155
0
          static_cast<uint64_t>(static_cast<double>(remaining_sst_all) /
156
0
                                total_sst_all_levels * total_blob);
157
0
    } else {
158
0
      effective_remaining = remaining_sst_all;
159
0
    }
160
0
  }
161
0
  if (inputs[0].files.empty() || effective_remaining > effective_max) {
162
0
    return nullptr;
163
0
  }
164
165
0
  for (const auto& f : inputs[0].files) {
166
0
    assert(f);
167
0
    uint64_t newest_key_time = f->TryGetNewestKeyTime();
168
0
    uint64_t creation_time = 0;
169
0
    TableReader* reader = f->fd.pinned_reader.Get();
170
0
    if (reader != nullptr && reader->GetTableProperties() != nullptr) {
171
0
      creation_time = reader->GetTableProperties()->creation_time;
172
0
    }
173
0
    uint64_t est_newest_key_time = newest_key_time == kUnknownNewestKeyTime
174
0
                                       ? creation_time
175
0
                                       : newest_key_time;
176
0
    ROCKS_LOG_BUFFER(log_buffer,
177
0
                     "[%s] FIFO compaction: picking file %" PRIu64
178
0
                     " with estimated newest key time %" PRIu64 " for deletion",
179
0
                     cf_name.c_str(), f->fd.GetNumber(), est_newest_key_time);
180
0
  }
181
182
0
  Compaction* c = new Compaction(
183
0
      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
184
0
      std::move(inputs), 0, 0, 0, 0, kNoCompression,
185
0
      mutable_cf_options.compression_opts, Temperature::kUnknown,
186
0
      /* max_subcompactions */ 0, {}, /* earliest_snapshot */ std::nullopt,
187
0
      /* snapshot_checker */ nullptr, CompactionReason::kFIFOTtl,
188
0
      /* trim_ts */ "", vstorage->CompactionScore(0),
189
0
      /* l0_files_might_overlap */ true);
190
0
  return c;
191
0
}
192
193
// The size-based compaction picker for FIFO.
194
//
195
// When the entire column family size exceeds max_table_files_size, FIFO will
196
// try to delete the oldest sst file(s) until the resulting column family size
197
// is smaller than max_table_files_size.
198
//
199
// This function also takes care the case where a DB is migrating from level /
200
// universal compaction to FIFO compaction.  During the migration, the column
201
// family will also have non-L0 files while FIFO can only create L0 files.
202
// In this case, this function will first purge the sst files in the bottom-
203
// most non-empty level first, and the DB will eventually converge to the
204
// regular FIFO case where there're only L0 files.  Note that during the
205
// migration case, the purge order will only be an approximation of "FIFO"
206
// as entries inside lower-level files might sometimes be newer than some
207
// entries inside upper-level files.
208
Compaction* FIFOCompactionPicker::PickSizeCompaction(
209
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
210
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
211
0
    LogBuffer* log_buffer) {
212
0
  const auto& fifo_opts = mutable_cf_options.compaction_options_fifo;
213
214
  // compute the total SST size and identify the last non-empty level
215
0
  int last_level = 0;
216
0
  uint64_t total_size = 0;
217
0
  for (int level = 0; level < vstorage->num_levels(); ++level) {
218
0
    auto level_size = GetTotalFilesSize(vstorage->LevelFiles(level));
219
0
    total_size += level_size;
220
0
    if (level_size > 0) {
221
0
      last_level = level;
222
0
    }
223
0
  }
224
0
  const std::vector<FileMetaData*>& last_level_files =
225
0
      vstorage->LevelFiles(last_level);
226
227
  // Compute effective size and limit for comparison.
228
0
  uint64_t effective_size, effective_max;
229
0
  GetEffectiveSizeAndLimit(fifo_opts, total_size,
230
0
                           vstorage->GetBlobStats().total_file_size,
231
0
                           &effective_size, &effective_max);
232
233
0
  if (last_level == 0 && effective_size <= effective_max) {
234
0
    return nullptr;
235
0
  }
236
237
0
  if (!level0_compactions_in_progress_.empty()) {
238
0
    ROCKS_LOG_BUFFER(
239
0
        log_buffer,
240
0
        "[%s] FIFO compaction: Already executing compaction. No need "
241
0
        "to run parallel compactions since compactions are very fast",
242
0
        cf_name.c_str());
243
0
    return nullptr;
244
0
  }
245
246
0
  std::vector<CompactionInputFiles> inputs;
247
0
  inputs.emplace_back();
248
0
  inputs[0].level = last_level;
249
250
0
  if (last_level == 0) {
251
    // When using blob-aware sizing, use proportional estimation (same
252
    // principle as EstimateTotalDataForSST): each SST "owns"
253
    // effective_size / num_files of total data. This is an approximation
254
    // -- individual SSTs may reference different amounts of blob data,
255
    // but uniform distribution is a reasonable estimate for FIFO dropping.
256
0
    uint64_t remaining_size = effective_size;
257
0
    const uint64_t num_files = last_level_files.size();
258
    // Proportional estimate of data per file (SST + blob).
259
    // Use max(1) to prevent stalling when effective_size < num_files.
260
0
    const uint64_t data_per_file =
261
0
        (fifo_opts.max_data_files_size > 0 && num_files > 0)
262
0
            ? std::max(effective_size / num_files, uint64_t{1})
263
0
            : 0;
264
265
    // In L0, right-most files are the oldest files.
266
0
    for (auto ritr = last_level_files.rbegin(); ritr != last_level_files.rend();
267
0
         ++ritr) {
268
0
      auto f = *ritr;
269
0
      if (fifo_opts.max_data_files_size > 0) {
270
0
        remaining_size -= std::min(remaining_size, data_per_file);
271
0
      } else {
272
0
        remaining_size -= std::min(remaining_size, f->fd.file_size);
273
0
      }
274
0
      inputs[0].files.push_back(f);
275
0
      char tmp_fsize[16];
276
0
      AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
277
0
      ROCKS_LOG_BUFFER(log_buffer,
278
0
                       "[%s] FIFO compaction: picking file %" PRIu64
279
0
                       " with size %s for deletion",
280
0
                       cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
281
0
      if (remaining_size <= effective_max) {
282
0
        break;
283
0
      }
284
0
    }
285
0
  } else if (effective_size > effective_max) {
286
    // If the last level is non-L0, we actually don't know which file is
287
    // logically the oldest since the file creation time only represents
288
    // when this file was compacted to this level, which is independent
289
    // to when the entries in this file were first inserted.
290
    //
291
    // As a result, we delete files from the left instead.  This means the sst
292
    // file with the smallest key will be deleted first.  This design decision
293
    // better serves a major type of FIFO use cases where smaller keys are
294
    // associated with older data.
295
0
    const uint64_t num_files = last_level_files.size();
296
    // Proportional estimate of data per file (SST + blob), same as L0 path.
297
0
    const uint64_t data_per_file =
298
0
        (fifo_opts.max_data_files_size > 0 && num_files > 0)
299
0
            ? std::max(effective_size / num_files, uint64_t{1})
300
0
            : 0;
301
0
    for (const auto& f : last_level_files) {
302
0
      if (f->being_compacted) {
303
0
        continue;
304
0
      }
305
0
      if (fifo_opts.max_data_files_size > 0) {
306
0
        effective_size -= std::min(effective_size, data_per_file);
307
0
      } else {
308
0
        effective_size -= std::min(effective_size, f->fd.file_size);
309
0
      }
310
0
      inputs[0].files.push_back(f);
311
0
      char tmp_fsize[16];
312
0
      AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
313
0
      ROCKS_LOG_BUFFER(log_buffer,
314
0
                       "[%s] FIFO compaction: picking file %" PRIu64
315
0
                       " with size %s for deletion under total size %" PRIu64
316
0
                       " vs max size %" PRIu64,
317
0
                       cf_name.c_str(), f->fd.GetNumber(), tmp_fsize,
318
0
                       effective_size, effective_max);
319
320
0
      if (effective_size <= effective_max) {
321
0
        break;
322
0
      }
323
0
    }
324
0
  } else {
325
0
    return nullptr;
326
0
  }
327
328
0
  Compaction* c = new Compaction(
329
0
      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
330
0
      std::move(inputs), last_level,
331
0
      /* target_file_size */ 0,
332
0
      /* max_compaction_bytes */ 0,
333
0
      /* output_path_id */ 0, kNoCompression,
334
0
      mutable_cf_options.compression_opts, Temperature::kUnknown,
335
0
      /* max_subcompactions */ 0, {}, /* earliest_snapshot */ std::nullopt,
336
0
      /* snapshot_checker */ nullptr, CompactionReason::kFIFOMaxSize,
337
0
      /* trim_ts */ "", vstorage->CompactionScore(0),
338
0
      /* l0_files_might_overlap */ true);
339
0
  return c;
340
0
}
341
342
Compaction* FIFOCompactionPicker::PickTemperatureChangeCompaction(
343
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
344
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
345
0
    LogBuffer* log_buffer) const {
346
0
  const std::vector<FileTemperatureAge>& ages =
347
0
      mutable_cf_options.compaction_options_fifo
348
0
          .file_temperature_age_thresholds;
349
0
  if (ages.empty()) {
350
0
    return nullptr;
351
0
  }
352
353
  // Does not apply to multi-level FIFO.
354
0
  if (vstorage->num_levels() > 1) {
355
0
    return nullptr;
356
0
  }
357
358
0
  const int kLevel0 = 0;
359
0
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
360
0
  if (level_files.empty()) {
361
0
    return nullptr;
362
0
  }
363
364
0
  int64_t _current_time;
365
0
  auto status = ioptions_.clock->GetCurrentTime(&_current_time);
366
0
  if (!status.ok()) {
367
0
    ROCKS_LOG_BUFFER(
368
0
        log_buffer,
369
0
        "[%s] FIFO compaction: Couldn't get current time: %s. "
370
0
        "Not doing compactions based on file temperature-age threshold. ",
371
0
        cf_name.c_str(), status.ToString().c_str());
372
0
    return nullptr;
373
0
  }
374
0
  const uint64_t current_time = static_cast<uint64_t>(_current_time);
375
376
0
  if (!level0_compactions_in_progress_.empty()) {
377
0
    ROCKS_LOG_BUFFER(
378
0
        log_buffer,
379
0
        "[%s] FIFO compaction: Already executing compaction. Parallel "
380
0
        "compactions are not supported",
381
0
        cf_name.c_str());
382
0
    return nullptr;
383
0
  }
384
385
0
  std::vector<CompactionInputFiles> inputs;
386
0
  inputs.emplace_back();
387
0
  inputs[0].level = 0;
388
389
  // avoid underflow
390
0
  uint64_t min_age = ages[0].age;
391
  // kLastTemperature means target temperature is to be determined.
392
0
  Temperature compaction_target_temp = Temperature::kLastTemperature;
393
0
  if (current_time > min_age) {
394
0
    uint64_t create_time_threshold = current_time - min_age;
395
0
    assert(level_files.size() >= 1);
396
0
    for (size_t index = level_files.size(); index >= 1; --index) {
397
      // Try to add cur_file to compaction inputs.
398
0
      FileMetaData* cur_file = level_files[index - 1];
399
0
      FileMetaData* prev_file = index < 2 ? nullptr : level_files[index - 2];
400
0
      if (cur_file->being_compacted) {
401
        // Should not happen since we check for
402
        // `level0_compactions_in_progress_` above. Here we simply just don't
403
        // schedule anything.
404
0
        return nullptr;
405
0
      }
406
0
      uint64_t est_newest_key_time = cur_file->TryGetNewestKeyTime(prev_file);
407
      // Newer file could have newest_key_time populated
408
0
      if (est_newest_key_time == kUnknownNewestKeyTime) {
409
0
        continue;
410
0
      }
411
0
      if (est_newest_key_time > create_time_threshold) {
412
0
        break;
413
0
      }
414
0
      Temperature cur_target_temp = ages[0].temperature;
415
0
      for (size_t i = 1; i < ages.size(); ++i) {
416
0
        if (current_time >= ages[i].age &&
417
0
            est_newest_key_time <= current_time - ages[i].age) {
418
0
          cur_target_temp = ages[i].temperature;
419
0
        }
420
0
      }
421
0
      if (cur_file->temperature == cur_target_temp) {
422
0
        continue;
423
0
      }
424
425
      // cur_file needs to change temperature
426
0
      assert(compaction_target_temp == Temperature::kLastTemperature);
427
0
      compaction_target_temp = cur_target_temp;
428
0
      inputs[0].files.push_back(cur_file);
429
0
      ROCKS_LOG_BUFFER(log_buffer,
430
0
                       "[%s] FIFO compaction: picking file %" PRIu64
431
0
                       " with estimated newest key time %" PRIu64
432
0
                       " and temperature %s for temperature %s.",
433
0
                       cf_name.c_str(), cur_file->fd.GetNumber(),
434
0
                       est_newest_key_time,
435
0
                       temperature_to_string[cur_file->temperature].c_str(),
436
0
                       temperature_to_string[cur_target_temp].c_str());
437
0
      break;
438
0
    }
439
0
  }
440
441
0
  if (inputs[0].files.empty()) {
442
0
    return nullptr;
443
0
  }
444
0
  assert(compaction_target_temp != Temperature::kLastTemperature);
445
  // Only compact one file at a time.
446
0
  assert(inputs.size() == 1);
447
0
  assert(inputs[0].size() == 1);
448
0
  Compaction* c = new Compaction(
449
0
      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
450
0
      std::move(inputs), 0, 0 /* output file size limit */,
451
0
      0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
452
0
      mutable_cf_options.compression, mutable_cf_options.compression_opts,
453
0
      compaction_target_temp,
454
0
      /* max_subcompactions */ 0, {}, /* earliest_snapshot */ std::nullopt,
455
0
      /* snapshot_checker */ nullptr, CompactionReason::kChangeTemperature,
456
0
      /* trim_ts */ "", vstorage->CompactionScore(0),
457
0
      /* l0_files_might_overlap */ true);
458
0
  return c;
459
0
}
460
461
Compaction* FIFOCompactionPicker::PickIntraL0Compaction(
462
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
463
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
464
0
    LogBuffer* log_buffer) {
465
0
  const auto& fifo_opts = mutable_cf_options.compaction_options_fifo;
466
467
0
  if (!fifo_opts.allow_compaction) {
468
0
    return nullptr;
469
0
  }
470
471
0
  const std::vector<FileMetaData*>& level0_files = vstorage->LevelFiles(0);
472
0
  if (level0_files.empty()) {
473
0
    return nullptr;
474
0
  }
475
476
0
  if (fifo_opts.use_kv_ratio_compaction) {
477
0
    if (fifo_opts.max_data_files_size == 0) {
478
0
      ROCKS_LOG_BUFFER(
479
0
          log_buffer,
480
0
          "[%s] FIFO kv-ratio compaction: skipping -- "
481
0
          "max_data_files_size is 0, cannot compute target file size. ",
482
0
          cf_name.c_str());
483
0
    } else if (fifo_opts.max_data_files_size < fifo_opts.max_table_files_size) {
484
0
      ROCKS_LOG_BUFFER(log_buffer,
485
0
                       "[%s] FIFO kv-ratio compaction: skipping -- "
486
0
                       "max_data_files_size (%" PRIu64
487
0
                       ") < max_table_files_size "
488
0
                       "(%" PRIu64 ").",
489
0
                       cf_name.c_str(), fifo_opts.max_data_files_size,
490
0
                       fifo_opts.max_table_files_size);
491
0
    } else {
492
0
      return PickRatioBasedIntraL0Compaction(cf_name, mutable_cf_options,
493
0
                                             mutable_db_options, vstorage,
494
0
                                             log_buffer);
495
0
    }
496
0
    ROCKS_LOG_BUFFER(
497
0
        log_buffer, "[%s] FIFO: falling back to cost-based intra-L0 compaction",
498
0
        cf_name.c_str());
499
0
  }
500
501
  // Old intra-L0 path: merge small files using PickCostBasedIntraL0Compaction.
502
  // Minimum files to compact follows level0_file_num_compaction_trigger.
503
  // Try to prevent same files from being compacted multiple times, which
504
  // could produce large files that may never TTL-expire. Achieve this by
505
  // disallowing compactions with files larger than memtable (inflate its
506
  // size by 10% to account for uncompressed L0 files that may have size
507
  // slightly greater than memtable size limit).
508
509
0
  CompactionInputFiles comp_inputs;
510
0
  size_t max_compact_bytes_per_del_file =
511
0
      static_cast<size_t>(MultiplyCheckOverflow(
512
0
          static_cast<uint64_t>(mutable_cf_options.write_buffer_size), 1.1));
513
0
  if (PickCostBasedIntraL0Compaction(
514
0
          level0_files,
515
0
          mutable_cf_options
516
0
              .level0_file_num_compaction_trigger /* min_files_to_compact */,
517
0
          max_compact_bytes_per_del_file,
518
0
          mutable_cf_options.max_compaction_bytes, &comp_inputs)) {
519
0
    Compaction* c = new Compaction(
520
0
        vstorage, ioptions_, mutable_cf_options, mutable_db_options,
521
0
        {comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
522
0
        0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
523
0
        mutable_cf_options.compression, mutable_cf_options.compression_opts,
524
0
        Temperature::kUnknown, 0 /* max_subcompactions */, {},
525
0
        /* earliest_snapshot */ std::nullopt,
526
0
        /* snapshot_checker */ nullptr, CompactionReason::kFIFOReduceNumFiles,
527
0
        /* trim_ts */ "", vstorage->CompactionScore(0),
528
0
        /* l0_files_might_overlap */ true);
529
0
    return c;
530
0
  }
531
532
0
  return nullptr;
533
0
}
534
535
Compaction* FIFOCompactionPicker::PickRatioBasedIntraL0Compaction(
536
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
537
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
538
0
    LogBuffer* log_buffer) {
539
0
  const auto& fifo_opts = mutable_cf_options.compaction_options_fifo;
540
0
  assert(fifo_opts.use_kv_ratio_compaction);
541
0
  assert(fifo_opts.max_data_files_size > 0);
542
543
  // During migration from level/universal compaction to FIFO, non-L0 levels
544
  // may still contain files. The ratio-based algorithm only operates on L0,
545
  // so skip it until PickSizeCompaction has drained all non-L0 levels.
546
  // Once levels collapse to L0-only, this algorithm will kick in.
547
0
  for (int level = 1; level < vstorage->num_levels(); ++level) {
548
0
    if (!vstorage->LevelFiles(level).empty()) {
549
0
      ROCKS_LOG_BUFFER(log_buffer,
550
0
                       "[%s] FIFO kv-ratio compaction: skipping -- non-L0 "
551
0
                       "level %d still has %" ROCKSDB_PRIszt
552
0
                       " files (migration in progress)",
553
0
                       cf_name.c_str(), level,
554
0
                       vstorage->LevelFiles(level).size());
555
0
      return nullptr;
556
0
    }
557
0
  }
558
559
0
  if (!level0_compactions_in_progress_.empty()) {
560
0
    return nullptr;
561
0
  }
562
563
0
  const std::vector<FileMetaData*>& level0_files = vstorage->LevelFiles(0);
564
0
  if (mutable_cf_options.level0_file_num_compaction_trigger <= 1) {
565
    // trigger <= 0 is invalid; trigger == 1 means compact after every flush,
566
    // which doesn't make sense for tiered merging (the tier boundary loop
567
    // divides by trigger, so trigger == 1 would cause an infinite loop).
568
0
    return nullptr;
569
0
  }
570
0
  const size_t trigger = static_cast<size_t>(
571
0
      mutable_cf_options.level0_file_num_compaction_trigger);
572
0
  if (level0_files.size() < trigger) {
573
0
    return nullptr;
574
0
  }
575
576
  // Determine the target compacted file size.
577
  //
578
  // When max_compaction_bytes > 0 (explicitly set by user), use it directly
579
  // as the target. This allows users to override the auto-calculated value.
580
  //
581
  // When max_compaction_bytes == 0 (default), auto-calculate from the data
582
  // capacity and observed SST/blob ratio:
583
  //   target = max_data_files_size * sst_ratio / trigger
584
  //
585
  // This is recomputed on every PickCompaction call. The computation is
586
  // trivial (sum file sizes + arithmetic) and PickCompaction is only called
587
  // once per flush or compaction completion, so no caching is needed.
588
0
  uint64_t target = 0;
589
0
  if (mutable_cf_options.max_compaction_bytes > 0) {
590
    // User explicitly set max_compaction_bytes -- use it as target
591
0
    target = mutable_cf_options.max_compaction_bytes;
592
0
  } else {
593
    // Auto-calculate from capacity and observed SST/blob ratio
594
0
    uint64_t total_sst = GetTotalFilesSize(level0_files);
595
0
    uint64_t total_blob = vstorage->GetBlobStats().total_file_size;
596
0
    uint64_t total_data = total_sst + total_blob;
597
598
0
    if (total_data == 0 || total_sst == 0) {
599
0
      return nullptr;
600
0
    }
601
602
    // Compute sst_ratio (inverse of EstimateTotalDataForSST's proportion):
603
    // when no blob files exist, sst_ratio is 1.0 and the target becomes
604
    // max_data_files_size / trigger, which is large. The algorithm will
605
    // naturally not find small enough files to compact.
606
0
    double sst_ratio =
607
0
        (total_blob > 0) ? static_cast<double>(total_sst) / total_data : 1.0;
608
609
0
    uint64_t total_sst_at_cap =
610
0
        static_cast<uint64_t>(fifo_opts.max_data_files_size * sst_ratio);
611
0
    target = total_sst_at_cap / trigger;
612
613
0
    ROCKS_LOG_BUFFER(log_buffer,
614
0
                     "[%s] FIFO ratio-based compaction: sst_ratio=%.4f, "
615
0
                     "target_file_size=%" PRIu64,
616
0
                     cf_name.c_str(), sst_ratio, target);
617
0
  }
618
0
  if (target == 0) {
619
0
    return nullptr;
620
0
  }
621
622
  // Tiered size-based file selection.
623
  //
624
  // Tier boundaries form a geometric sequence descending from target:
625
  //   ..., target/trigger^2, target/trigger, target
626
  // For each boundary (smallest first), find contiguous L0 files with
627
  // size < boundary. If their accumulated bytes >= boundary, merge them.
628
  // The output (~boundary bytes) advances to the next tier. Files that
629
  // reach target are "graduated" and never compacted again.
630
  //
631
  // Trade-off: write amplification vs L0 file count.
632
  //
633
  // Write amp: O(log(target/flush) / log(trigger)) per byte, instead of
634
  //   O(target / (trigger * flush)) from flat merging. Each byte is
635
  //   rewritten once per tier crossing.
636
  //
637
  // L0 file count: trigger + k * (trigger - 1) at steady state, where
638
  //   k = ceil(log(target/flush) / log(trigger)). This is higher than
639
  //   the original trigger target because intermediate tier files
640
  //   accumulate while waiting for the next tier merge. The trade-off
641
  //   is explicit: more L0 files in exchange for logarithmic (instead
642
  //   of linear) write amplification.
643
644
  // Build tier boundaries from smallest to largest.
645
  // Stop at 10KB minimum -- SST files of most workloads are larger than
646
  // this, so lower boundaries would only waste CPU scanning L0 files.
647
  // Files smaller than the lowest boundary simply merge at that boundary.
648
0
  static constexpr uint64_t kMinTierBoundary = 10 * 1024;  // 10KB
649
0
  std::vector<uint64_t> boundaries;
650
0
  for (uint64_t b = target; b >= kMinTierBoundary; b /= trigger) {
651
0
    boundaries.push_back(b);
652
0
  }
653
0
  if (boundaries.empty()) {
654
    // target itself is below kMinTierBoundary -- use target as the
655
    // sole boundary so we can still compact at the target size.
656
0
    boundaries.push_back(target);
657
0
  }
658
0
  std::reverse(boundaries.begin(), boundaries.end());
659
660
  // For each tier boundary (smallest first), scan L0 for mergeable batches.
661
  // L0 files are stored newest-first; oldest is at the end.
662
0
  for (const uint64_t boundary : boundaries) {
663
0
    for (size_t scan = level0_files.size(); scan > 0;) {
664
      // Skip files >= boundary (they belong to higher tiers) or in-progress
665
0
      if (level0_files[scan - 1]->fd.file_size >= boundary ||
666
0
          level0_files[scan - 1]->being_compacted) {
667
0
        --scan;
668
0
        continue;
669
0
      }
670
671
      // Found a file < boundary -- collect contiguous batch
672
0
      std::vector<FileMetaData*> batch;
673
0
      uint64_t accumulated = 0;
674
0
      size_t pos = scan;
675
0
      while (pos > 0 && level0_files[pos - 1]->fd.file_size < boundary &&
676
0
             !level0_files[pos - 1]->being_compacted) {
677
        // Don't let output exceed 2x boundary (prevent tier-skipping)
678
0
        if (accumulated >= boundary &&
679
0
            accumulated + level0_files[pos - 1]->fd.file_size > boundary * 2) {
680
0
          break;
681
0
        }
682
0
        batch.push_back(level0_files[pos - 1]);
683
0
        accumulated += level0_files[pos - 1]->fd.file_size;
684
0
        --pos;
685
0
      }
686
687
      // Viable: >= 2 files and accumulated >= boundary
688
0
      if (batch.size() >= 2 && accumulated >= boundary) {
689
0
        CompactionInputFiles comp_inputs;
690
0
        comp_inputs.level = 0;
691
0
        comp_inputs.files = std::move(batch);
692
693
0
        ROCKS_LOG_BUFFER(
694
0
            log_buffer,
695
0
            "[%s] FIFO kv-ratio compaction: picking %" ROCKSDB_PRIszt
696
0
            " files (%" PRIu64 " bytes) at tier boundary %" PRIu64
697
0
            " for intra-L0 compaction, target=%" PRIu64,
698
0
            cf_name.c_str(), comp_inputs.files.size(), accumulated, boundary,
699
0
            target);
700
701
0
        Compaction* c = new Compaction(
702
0
            vstorage, ioptions_, mutable_cf_options, mutable_db_options,
703
0
            {comp_inputs}, 0, boundary /* output file size limit */,
704
0
            0 /* max compaction bytes, not applicable */,
705
0
            0 /* output path ID */, mutable_cf_options.compression,
706
0
            mutable_cf_options.compression_opts, Temperature::kUnknown,
707
0
            0 /* max_subcompactions */, {},
708
0
            /* earliest_snapshot */ std::nullopt,
709
0
            /* snapshot_checker */ nullptr,
710
0
            CompactionReason::kFIFOReduceNumFiles,
711
0
            /* trim_ts */ "", vstorage->CompactionScore(0),
712
0
            /* l0_files_might_overlap */ true);
713
0
        return c;
714
0
      }
715
716
      // This batch wasn't enough -- advance past it
717
0
      scan = pos;
718
0
    }
719
0
  }
720
721
0
  return nullptr;
722
0
}
723
724
// The full_history_ts_low parameter is used to control bottommost file marking
725
// for compaction when user-defined timestamps (UDT) are enabled.
726
727
// TODO leverage full_history_ts_low for FIFO compaction, by trigggerring
728
// compaction early for data that has already expired to achieve the goal of TTL
729
// enforced compliance.
730
Compaction* FIFOCompactionPicker::PickCompaction(
731
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
732
    const MutableDBOptions& mutable_db_options,
733
    const std::vector<SequenceNumber>& /* existing_snapshots */,
734
    const SnapshotChecker* /* snapshot_checker */, VersionStorageInfo* vstorage,
735
    LogBuffer* log_buffer, const std::string& /* full_history_ts_low */,
736
0
    bool /* require_max_output_level*/) {
737
0
  Compaction* c = nullptr;
738
0
  if (mutable_cf_options.ttl > 0) {
739
0
    c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options,
740
0
                          vstorage, log_buffer);
741
0
  }
742
0
  if (c == nullptr) {
743
0
    c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options,
744
0
                           vstorage, log_buffer);
745
0
  }
746
  // Intra-L0 compaction merges small files to reduce file count.
747
  // It runs after size-based dropping: if PickSizeCompaction dropped files,
748
  // it returned non-null and we skip this. Otherwise, we try to reduce
749
  // L0 file count by merging small files together.
750
0
  if (c == nullptr) {
751
0
    c = PickIntraL0Compaction(cf_name, mutable_cf_options, mutable_db_options,
752
0
                              vstorage, log_buffer);
753
0
  }
754
0
  if (c == nullptr) {
755
0
    c = PickTemperatureChangeCompaction(
756
0
        cf_name, mutable_cf_options, mutable_db_options, vstorage, log_buffer);
757
0
  }
758
0
  if (c == nullptr) {
759
0
    ROCKS_LOG_BUFFER(log_buffer, "[%s] FIFO compaction: no compaction picked",
760
0
                     cf_name.c_str());
761
0
  }
762
0
  RegisterCompaction(c);
763
0
  return c;
764
0
}
765
766
Compaction* FIFOCompactionPicker::PickCompactionForCompactRange(
767
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
768
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
769
    int input_level, int output_level,
770
    const CompactRangeOptions& /*compact_range_options*/,
771
    const InternalKey* /*begin*/, const InternalKey* /*end*/,
772
    InternalKey** compaction_end, bool* /*manual_conflict*/,
773
    uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/,
774
0
    const std::string& full_history_ts_low) {
775
0
#ifdef NDEBUG
776
0
  (void)input_level;
777
0
  (void)output_level;
778
0
#endif
779
0
  assert(input_level == 0);
780
  assert(output_level == 0);
781
0
  *compaction_end = nullptr;
782
0
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.logger);
783
0
  Compaction* c = PickCompaction(
784
0
      cf_name, mutable_cf_options, mutable_db_options,
785
0
      /*existing_snapshots*/ {}, /*snapshot_checker*/ nullptr, vstorage,
786
0
      &log_buffer, full_history_ts_low, /* require_max_output_level */ false);
787
0
  log_buffer.FlushBufferToLog();
788
0
  return c;
789
0
}
790
791
}  // namespace ROCKSDB_NAMESPACE