Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/compaction/compaction_picker_fifo.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/compaction/compaction_picker_fifo.h"
11
12
#include <cinttypes>
13
#include <string>
14
#include <vector>
15
16
#include "db/column_family.h"
17
#include "logging/log_buffer.h"
18
#include "logging/logging.h"
19
#include "options/options_helper.h"
20
#include "rocksdb/listener.h"
21
#include "rocksdb/statistics.h"
22
#include "rocksdb/status.h"
23
#include "util/string_util.h"
24
25
namespace ROCKSDB_NAMESPACE {
26
namespace {
27
0
uint64_t GetTotalFilesSize(const std::vector<FileMetaData*>& files) {
28
0
  uint64_t total_size = 0;
29
0
  for (const auto& f : files) {
30
0
    total_size += f->fd.file_size;
31
0
  }
32
0
  return total_size;
33
0
}
34
}  // anonymous namespace
35
36
bool FIFOCompactionPicker::NeedsCompaction(
37
0
    const VersionStorageInfo* vstorage) const {
38
0
  const int kLevel0 = 0;
39
0
  return vstorage->CompactionScore(kLevel0) >= 1;
40
0
}
41
42
Compaction* FIFOCompactionPicker::PickTTLCompaction(
43
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
44
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
45
0
    LogBuffer* log_buffer) {
46
0
  assert(mutable_cf_options.ttl > 0);
47
48
0
  const int kLevel0 = 0;
49
0
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
50
0
  uint64_t total_size = GetTotalFilesSize(level_files);
51
52
0
  int64_t _current_time;
53
0
  auto status = ioptions_.clock->GetCurrentTime(&_current_time);
54
0
  if (!status.ok()) {
55
0
    ROCKS_LOG_BUFFER(log_buffer,
56
0
                     "[%s] FIFO compaction: Couldn't get current time: %s. "
57
0
                     "Not doing compactions based on TTL. ",
58
0
                     cf_name.c_str(), status.ToString().c_str());
59
0
    return nullptr;
60
0
  }
61
0
  const uint64_t current_time = static_cast<uint64_t>(_current_time);
62
63
0
  if (!level0_compactions_in_progress_.empty()) {
64
0
    ROCKS_LOG_BUFFER(
65
0
        log_buffer,
66
0
        "[%s] FIFO compaction: Already executing compaction. No need "
67
0
        "to run parallel compactions since compactions are very fast",
68
0
        cf_name.c_str());
69
0
    return nullptr;
70
0
  }
71
72
0
  std::vector<CompactionInputFiles> inputs;
73
0
  inputs.emplace_back();
74
0
  inputs[0].level = 0;
75
76
  // avoid underflow
77
0
  if (current_time > mutable_cf_options.ttl) {
78
0
    for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) {
79
0
      FileMetaData* f = *ritr;
80
0
      assert(f);
81
0
      if (f->fd.table_reader && f->fd.table_reader->GetTableProperties()) {
82
0
        uint64_t creation_time =
83
0
            f->fd.table_reader->GetTableProperties()->creation_time;
84
0
        if (creation_time == 0 ||
85
0
            creation_time >= (current_time - mutable_cf_options.ttl)) {
86
0
          break;
87
0
        }
88
0
      }
89
0
      total_size -= f->fd.file_size;
90
0
      inputs[0].files.push_back(f);
91
0
    }
92
0
  }
93
94
  // Return a nullptr and proceed to size-based FIFO compaction if:
95
  // 1. there are no files older than ttl OR
96
  // 2. there are a few files older than ttl, but deleting them will not bring
97
  //    the total size to be less than max_table_files_size threshold.
98
0
  if (inputs[0].files.empty() ||
99
0
      total_size >
100
0
          mutable_cf_options.compaction_options_fifo.max_table_files_size) {
101
0
    return nullptr;
102
0
  }
103
104
0
  for (const auto& f : inputs[0].files) {
105
0
    uint64_t creation_time = 0;
106
0
    assert(f);
107
0
    if (f->fd.table_reader && f->fd.table_reader->GetTableProperties()) {
108
0
      creation_time = f->fd.table_reader->GetTableProperties()->creation_time;
109
0
    }
110
0
    ROCKS_LOG_BUFFER(log_buffer,
111
0
                     "[%s] FIFO compaction: picking file %" PRIu64
112
0
                     " with creation time %" PRIu64 " for deletion",
113
0
                     cf_name.c_str(), f->fd.GetNumber(), creation_time);
114
0
  }
115
116
0
  Compaction* c = new Compaction(
117
0
      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
118
0
      std::move(inputs), 0, 0, 0, 0, kNoCompression,
119
0
      mutable_cf_options.compression_opts,
120
0
      mutable_cf_options.default_write_temperature,
121
0
      /* max_subcompactions */ 0, {}, /* is manual */ false,
122
0
      /* trim_ts */ "", vstorage->CompactionScore(0),
123
0
      /* is deletion compaction */ true, /* l0_files_might_overlap */ true,
124
0
      CompactionReason::kFIFOTtl);
125
0
  return c;
126
0
}
127
128
// The size-based compaction picker for FIFO.
129
//
130
// When the entire column family size exceeds max_table_files_size, FIFO will
131
// try to delete the oldest sst file(s) until the resulting column family size
132
// is smaller than max_table_files_size.
133
//
134
// This function also takes care the case where a DB is migrating from level /
135
// universal compaction to FIFO compaction.  During the migration, the column
136
// family will also have non-L0 files while FIFO can only create L0 files.
137
// In this case, this function will first purge the sst files in the bottom-
138
// most non-empty level first, and the DB will eventually converge to the
139
// regular FIFO case where there're only L0 files.  Note that during the
140
// migration case, the purge order will only be an approximation of "FIFO"
141
// as entries inside lower-level files might sometimes be newer than some
142
// entries inside upper-level files.
143
Compaction* FIFOCompactionPicker::PickSizeCompaction(
144
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
145
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
146
0
    LogBuffer* log_buffer) {
147
  // compute the total size and identify the last non-empty level
148
0
  int last_level = 0;
149
0
  uint64_t total_size = 0;
150
0
  for (int level = 0; level < vstorage->num_levels(); ++level) {
151
0
    auto level_size = GetTotalFilesSize(vstorage->LevelFiles(level));
152
0
    total_size += level_size;
153
0
    if (level_size > 0) {
154
0
      last_level = level;
155
0
    }
156
0
  }
157
0
  const std::vector<FileMetaData*>& last_level_files =
158
0
      vstorage->LevelFiles(last_level);
159
160
0
  if (last_level == 0 &&
161
0
      total_size <=
162
0
          mutable_cf_options.compaction_options_fifo.max_table_files_size) {
163
    // total size not exceeded, try to find intra level 0 compaction if enabled
164
0
    const std::vector<FileMetaData*>& level0_files = vstorage->LevelFiles(0);
165
0
    if (mutable_cf_options.compaction_options_fifo.allow_compaction &&
166
0
        level0_files.size() > 0) {
167
0
      CompactionInputFiles comp_inputs;
168
      // try to prevent same files from being compacted multiple times, which
169
      // could produce large files that may never TTL-expire. Achieve this by
170
      // disallowing compactions with files larger than memtable (inflate its
171
      // size by 10% to account for uncompressed L0 files that may have size
172
      // slightly greater than memtable size limit).
173
0
      size_t max_compact_bytes_per_del_file =
174
0
          static_cast<size_t>(MultiplyCheckOverflow(
175
0
              static_cast<uint64_t>(mutable_cf_options.write_buffer_size),
176
0
              1.1));
177
0
      if (FindIntraL0Compaction(
178
0
              level0_files,
179
0
              mutable_cf_options
180
0
                  .level0_file_num_compaction_trigger /* min_files_to_compact */
181
0
              ,
182
0
              max_compact_bytes_per_del_file,
183
0
              mutable_cf_options.max_compaction_bytes, &comp_inputs)) {
184
0
        Compaction* c = new Compaction(
185
0
            vstorage, ioptions_, mutable_cf_options, mutable_db_options,
186
0
            {comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */,
187
0
            0 /* max compaction bytes, not applicable */,
188
0
            0 /* output path ID */, mutable_cf_options.compression,
189
0
            mutable_cf_options.compression_opts,
190
0
            mutable_cf_options.default_write_temperature,
191
0
            0 /* max_subcompactions */, {}, /* is manual */ false,
192
0
            /* trim_ts */ "", vstorage->CompactionScore(0),
193
0
            /* is deletion compaction */ false,
194
0
            /* l0_files_might_overlap */ true,
195
0
            CompactionReason::kFIFOReduceNumFiles);
196
0
        return c;
197
0
      }
198
0
    }
199
200
0
    ROCKS_LOG_BUFFER(
201
0
        log_buffer,
202
0
        "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
203
0
        ", max size %" PRIu64 "\n",
204
0
        cf_name.c_str(), total_size,
205
0
        mutable_cf_options.compaction_options_fifo.max_table_files_size);
206
0
    return nullptr;
207
0
  }
208
209
0
  if (!level0_compactions_in_progress_.empty()) {
210
0
    ROCKS_LOG_BUFFER(
211
0
        log_buffer,
212
0
        "[%s] FIFO compaction: Already executing compaction. No need "
213
0
        "to run parallel compactions since compactions are very fast",
214
0
        cf_name.c_str());
215
0
    return nullptr;
216
0
  }
217
218
0
  std::vector<CompactionInputFiles> inputs;
219
0
  inputs.emplace_back();
220
0
  inputs[0].level = last_level;
221
222
0
  if (last_level == 0) {
223
    // In L0, right-most files are the oldest files.
224
0
    for (auto ritr = last_level_files.rbegin(); ritr != last_level_files.rend();
225
0
         ++ritr) {
226
0
      auto f = *ritr;
227
0
      total_size -= f->fd.file_size;
228
0
      inputs[0].files.push_back(f);
229
0
      char tmp_fsize[16];
230
0
      AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
231
0
      ROCKS_LOG_BUFFER(log_buffer,
232
0
                       "[%s] FIFO compaction: picking file %" PRIu64
233
0
                       " with size %s for deletion",
234
0
                       cf_name.c_str(), f->fd.GetNumber(), tmp_fsize);
235
0
      if (total_size <=
236
0
          mutable_cf_options.compaction_options_fifo.max_table_files_size) {
237
0
        break;
238
0
      }
239
0
    }
240
0
  } else if (total_size >
241
0
             mutable_cf_options.compaction_options_fifo.max_table_files_size) {
242
    // If the last level is non-L0, we actually don't know which file is
243
    // logically the oldest since the file creation time only represents
244
    // when this file was compacted to this level, which is independent
245
    // to when the entries in this file were first inserted.
246
    //
247
    // As a result, we delete files from the left instead.  This means the sst
248
    // file with the smallest key will be deleted first.  This design decision
249
    // better serves a major type of FIFO use cases where smaller keys are
250
    // associated with older data.
251
0
    for (const auto& f : last_level_files) {
252
0
      total_size -= f->fd.file_size;
253
0
      inputs[0].files.push_back(f);
254
0
      char tmp_fsize[16];
255
0
      AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize));
256
0
      ROCKS_LOG_BUFFER(
257
0
          log_buffer,
258
0
          "[%s] FIFO compaction: picking file %" PRIu64
259
0
          " with size %s for deletion under total size %" PRIu64
260
0
          " vs max table files size %" PRIu64,
261
0
          cf_name.c_str(), f->fd.GetNumber(), tmp_fsize, total_size,
262
0
          mutable_cf_options.compaction_options_fifo.max_table_files_size);
263
264
0
      if (total_size <=
265
0
          mutable_cf_options.compaction_options_fifo.max_table_files_size) {
266
0
        break;
267
0
      }
268
0
    }
269
0
  } else {
270
0
    ROCKS_LOG_BUFFER(
271
0
        log_buffer,
272
0
        "[%s] FIFO compaction: nothing to do. Total size %" PRIu64
273
0
        ", max size %" PRIu64 "\n",
274
0
        cf_name.c_str(), total_size,
275
0
        mutable_cf_options.compaction_options_fifo.max_table_files_size);
276
0
    return nullptr;
277
0
  }
278
279
0
  Compaction* c = new Compaction(
280
0
      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
281
0
      std::move(inputs), last_level,
282
0
      /* target_file_size */ 0,
283
0
      /* max_compaction_bytes */ 0,
284
0
      /* output_path_id */ 0, kNoCompression,
285
0
      mutable_cf_options.compression_opts,
286
0
      mutable_cf_options.default_write_temperature,
287
0
      /* max_subcompactions */ 0, {}, /* is manual */ false,
288
0
      /* trim_ts */ "", vstorage->CompactionScore(0),
289
0
      /* is deletion compaction */ true,
290
0
      /* l0_files_might_overlap */ true, CompactionReason::kFIFOMaxSize);
291
0
  return c;
292
0
}
293
294
Compaction* FIFOCompactionPicker::PickTemperatureChangeCompaction(
295
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
296
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
297
0
    LogBuffer* log_buffer) {
298
0
  const std::vector<FileTemperatureAge>& ages =
299
0
      mutable_cf_options.compaction_options_fifo
300
0
          .file_temperature_age_thresholds;
301
0
  if (ages.empty()) {
302
0
    return nullptr;
303
0
  }
304
305
  // Does not apply to multi-level FIFO.
306
0
  if (vstorage->num_levels() > 1) {
307
0
    return nullptr;
308
0
  }
309
310
0
  const int kLevel0 = 0;
311
0
  const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0);
312
0
  if (level_files.empty()) {
313
0
    return nullptr;
314
0
  }
315
316
0
  int64_t _current_time;
317
0
  auto status = ioptions_.clock->GetCurrentTime(&_current_time);
318
0
  if (!status.ok()) {
319
0
    ROCKS_LOG_BUFFER(
320
0
        log_buffer,
321
0
        "[%s] FIFO compaction: Couldn't get current time: %s. "
322
0
        "Not doing compactions based on file temperature-age threshold. ",
323
0
        cf_name.c_str(), status.ToString().c_str());
324
0
    return nullptr;
325
0
  }
326
0
  const uint64_t current_time = static_cast<uint64_t>(_current_time);
327
328
0
  if (!level0_compactions_in_progress_.empty()) {
329
0
    ROCKS_LOG_BUFFER(
330
0
        log_buffer,
331
0
        "[%s] FIFO compaction: Already executing compaction. Parallel "
332
0
        "compactions are not supported",
333
0
        cf_name.c_str());
334
0
    return nullptr;
335
0
  }
336
337
0
  std::vector<CompactionInputFiles> inputs;
338
0
  inputs.emplace_back();
339
0
  inputs[0].level = 0;
340
341
  // avoid underflow
342
0
  uint64_t min_age = ages[0].age;
343
  // kLastTemperature means target temperature is to be determined.
344
0
  Temperature compaction_target_temp = Temperature::kLastTemperature;
345
0
  if (current_time > min_age) {
346
0
    uint64_t create_time_threshold = current_time - min_age;
347
0
    uint64_t compaction_size = 0;
348
    // We will ideally identify a file qualifying for temperature change by
349
    // knowing the timestamp for the youngest entry in the file. However, right
350
    // now we don't have the information. We infer it by looking at timestamp of
351
    // the previous file's (which is just younger) oldest entry's timestamp.
352
0
    Temperature cur_target_temp;
353
    // avoid index underflow
354
0
    assert(level_files.size() >= 1);
355
0
    for (size_t index = level_files.size() - 1; index >= 1; --index) {
356
      // Try to add cur_file to compaction inputs.
357
0
      FileMetaData* cur_file = level_files[index];
358
      // prev_file is just younger than cur_file
359
0
      FileMetaData* prev_file = level_files[index - 1];
360
0
      if (cur_file->being_compacted) {
361
        // Should not happen since we check for
362
        // `level0_compactions_in_progress_` above. Here we simply just don't
363
        // schedule anything.
364
0
        return nullptr;
365
0
      }
366
0
      uint64_t oldest_ancestor_time = prev_file->TryGetOldestAncesterTime();
367
0
      if (oldest_ancestor_time == kUnknownOldestAncesterTime) {
368
        // Older files might not have enough information. It is possible to
369
        // handle these files by looking at newer files, but maintaining the
370
        // logic isn't worth it.
371
0
        break;
372
0
      }
373
0
      if (oldest_ancestor_time > create_time_threshold) {
374
        // cur_file is too fresh
375
0
        break;
376
0
      }
377
0
      cur_target_temp = ages[0].temperature;
378
0
      for (size_t i = 1; i < ages.size(); ++i) {
379
0
        if (current_time >= ages[i].age &&
380
0
            oldest_ancestor_time <= current_time - ages[i].age) {
381
0
          cur_target_temp = ages[i].temperature;
382
0
        }
383
0
      }
384
0
      if (cur_file->temperature == cur_target_temp) {
385
0
        if (inputs[0].empty()) {
386
0
          continue;
387
0
        } else {
388
0
          break;
389
0
        }
390
0
      }
391
392
      // cur_file needs to change temperature
393
0
      if (compaction_target_temp == Temperature::kLastTemperature) {
394
0
        assert(inputs[0].empty());
395
0
        compaction_target_temp = cur_target_temp;
396
0
      } else if (cur_target_temp != compaction_target_temp) {
397
0
        assert(!inputs[0].empty());
398
0
        break;
399
0
      }
400
0
      if (inputs[0].empty() || compaction_size + cur_file->fd.GetFileSize() <=
401
0
                                   mutable_cf_options.max_compaction_bytes) {
402
0
        inputs[0].files.push_back(cur_file);
403
0
        compaction_size += cur_file->fd.GetFileSize();
404
0
        ROCKS_LOG_BUFFER(
405
0
            log_buffer,
406
0
            "[%s] FIFO compaction: picking file %" PRIu64
407
0
            " with next file's oldest time %" PRIu64 " for temperature %s.",
408
0
            cf_name.c_str(), cur_file->fd.GetNumber(), oldest_ancestor_time,
409
0
            temperature_to_string[cur_target_temp].c_str());
410
0
      }
411
0
      if (compaction_size > mutable_cf_options.max_compaction_bytes) {
412
0
        break;
413
0
      }
414
0
    }
415
0
  }
416
417
0
  if (inputs[0].files.empty()) {
418
0
    return nullptr;
419
0
  }
420
0
  assert(compaction_target_temp != Temperature::kLastTemperature);
421
422
0
  Compaction* c = new Compaction(
423
0
      vstorage, ioptions_, mutable_cf_options, mutable_db_options,
424
0
      std::move(inputs), 0, 0 /* output file size limit */,
425
0
      0 /* max compaction bytes, not applicable */, 0 /* output path ID */,
426
0
      mutable_cf_options.compression, mutable_cf_options.compression_opts,
427
0
      compaction_target_temp,
428
0
      /* max_subcompactions */ 0, {}, /* is manual */ false, /* trim_ts */ "",
429
0
      vstorage->CompactionScore(0),
430
0
      /* is deletion compaction */ false, /* l0_files_might_overlap */ true,
431
0
      CompactionReason::kChangeTemperature);
432
0
  return c;
433
0
}
434
435
Compaction* FIFOCompactionPicker::PickCompaction(
436
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
437
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
438
0
    LogBuffer* log_buffer) {
439
0
  Compaction* c = nullptr;
440
0
  if (mutable_cf_options.ttl > 0) {
441
0
    c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options,
442
0
                          vstorage, log_buffer);
443
0
  }
444
0
  if (c == nullptr) {
445
0
    c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options,
446
0
                           vstorage, log_buffer);
447
0
  }
448
0
  if (c == nullptr) {
449
0
    c = PickTemperatureChangeCompaction(
450
0
        cf_name, mutable_cf_options, mutable_db_options, vstorage, log_buffer);
451
0
  }
452
0
  RegisterCompaction(c);
453
0
  return c;
454
0
}
455
456
Compaction* FIFOCompactionPicker::CompactRange(
457
    const std::string& cf_name, const MutableCFOptions& mutable_cf_options,
458
    const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage,
459
    int input_level, int output_level,
460
    const CompactRangeOptions& /*compact_range_options*/,
461
    const InternalKey* /*begin*/, const InternalKey* /*end*/,
462
    InternalKey** compaction_end, bool* /*manual_conflict*/,
463
0
    uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/) {
464
0
#ifdef NDEBUG
465
0
  (void)input_level;
466
0
  (void)output_level;
467
0
#endif
468
0
  assert(input_level == 0);
469
0
  assert(output_level == 0);
470
0
  *compaction_end = nullptr;
471
0
  LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.logger);
472
0
  Compaction* c = PickCompaction(cf_name, mutable_cf_options,
473
0
                                 mutable_db_options, vstorage, &log_buffer);
474
0
  log_buffer.FlushBufferToLog();
475
0
  return c;
476
0
}
477
478
}  // namespace ROCKSDB_NAMESPACE