Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_outputs.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
//
7
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8
// Use of this source code is governed by a BSD-style license that can be
9
// found in the LICENSE file. See the AUTHORS file for names of contributors.
10
11
#include "db/compaction/compaction_outputs.h"
12
13
#include "db/builder.h"
14
15
namespace ROCKSDB_NAMESPACE {
16
17
1.84k
void CompactionOutputs::NewBuilder(const TableBuilderOptions& tboptions) {
18
1.84k
  builder_.reset(NewTableBuilder(tboptions, file_writer_.get()));
19
1.84k
}
20
21
Status CompactionOutputs::Finish(
22
    const Status& intput_status,
23
1.84k
    const SeqnoToTimeMapping& seqno_to_time_mapping) {
24
1.84k
  FileMetaData* meta = GetMetaData();
25
1.84k
  assert(meta != nullptr);
26
1.84k
  Status s = intput_status;
27
1.84k
  if (s.ok()) {
28
1.27k
    SeqnoToTimeMapping relevant_mapping;
29
1.27k
    relevant_mapping.CopyFromSeqnoRange(
30
1.27k
        seqno_to_time_mapping,
31
1.27k
        std::min(smallest_preferred_seqno_, meta->fd.smallest_seqno),
32
1.27k
        meta->fd.largest_seqno);
33
1.27k
    relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST);
34
1.27k
    builder_->SetSeqnoTimeTableProperties(relevant_mapping,
35
1.27k
                                          meta->oldest_ancester_time);
36
1.27k
    s = builder_->Finish();
37
38
1.27k
  } else {
39
573
    builder_->Abandon();
40
573
  }
41
1.84k
  Status io_s = builder_->io_status();
42
1.84k
  if (s.ok()) {
43
1.27k
    s = io_s;
44
1.27k
  } else {
45
573
    io_s.PermitUncheckedError();
46
573
  }
47
1.84k
  const uint64_t current_bytes = builder_->FileSize();
48
1.84k
  if (s.ok()) {
49
1.27k
    meta->fd.file_size = current_bytes;
50
1.27k
    meta->tail_size = builder_->GetTailSize();
51
1.27k
    meta->marked_for_compaction = builder_->NeedCompact();
52
1.27k
    const TableProperties& tp = builder_->GetTableProperties();
53
1.27k
    meta->user_defined_timestamps_persisted =
54
1.27k
        static_cast<bool>(tp.user_defined_timestamps_persisted);
55
1.27k
    ExtractTimestampFromTableProperties(tp, meta);
56
1.27k
  }
57
1.84k
  current_output().finished = true;
58
1.84k
  stats_.bytes_written += current_bytes;
59
1.84k
  stats_.bytes_written_pre_comp += builder_->PreCompressionSize();
60
1.84k
  stats_.num_output_files = static_cast<int>(outputs_.size());
61
1.84k
  worker_cpu_micros_ += builder_->GetWorkerCPUMicros();
62
63
1.84k
  return s;
64
1.84k
}
65
66
IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status,
67
                                            SystemClock* clock,
68
                                            Statistics* statistics,
69
1.84k
                                            bool use_fsync) {
70
1.84k
  IOStatus io_s;
71
1.84k
  IOOptions opts;
72
1.84k
  io_s = WritableFileWriter::PrepareIOOptions(
73
1.84k
      WriteOptions(Env::IOActivity::kCompaction), opts);
74
1.84k
  if (input_status.ok() && io_s.ok()) {
75
1.27k
    StopWatch sw(clock, statistics, COMPACTION_OUTFILE_SYNC_MICROS);
76
1.27k
    io_s = file_writer_->Sync(opts, use_fsync);
77
1.27k
  }
78
1.84k
  if (input_status.ok() && io_s.ok()) {
79
1.27k
    io_s = file_writer_->Close(opts);
80
1.27k
  }
81
82
1.84k
  if (input_status.ok() && io_s.ok()) {
83
1.27k
    FileMetaData* meta = GetMetaData();
84
1.27k
    meta->file_checksum = file_writer_->GetFileChecksum();
85
1.27k
    meta->file_checksum_func_name = file_writer_->GetFileChecksumFuncName();
86
1.27k
  }
87
88
1.84k
  file_writer_.reset();
89
90
1.84k
  return io_s;
91
1.84k
}
92
93
bool CompactionOutputs::UpdateFilesToCutForTTLStates(
94
1.83k
    const Slice& internal_key) {
95
1.83k
  if (!files_to_cut_for_ttl_.empty()) {
96
0
    const InternalKeyComparator* icmp =
97
0
        &compaction_->column_family_data()->internal_comparator();
98
0
    if (cur_files_to_cut_for_ttl_ != -1) {
99
      // Previous key is inside the range of a file
100
0
      if (icmp->Compare(internal_key,
101
0
                        files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_]
102
0
                            ->largest.Encode()) > 0) {
103
0
        next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1;
104
0
        cur_files_to_cut_for_ttl_ = -1;
105
0
        return true;
106
0
      }
107
0
    } else {
108
      // Look for the key position
109
0
      while (next_files_to_cut_for_ttl_ <
110
0
             static_cast<int>(files_to_cut_for_ttl_.size())) {
111
0
        if (icmp->Compare(internal_key,
112
0
                          files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
113
0
                              ->smallest.Encode()) >= 0) {
114
0
          if (icmp->Compare(internal_key,
115
0
                            files_to_cut_for_ttl_[next_files_to_cut_for_ttl_]
116
0
                                ->largest.Encode()) <= 0) {
117
            // With in the current file
118
0
            cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_;
119
0
            return true;
120
0
          }
121
          // Beyond the current file
122
0
          next_files_to_cut_for_ttl_++;
123
0
        } else {
124
          // Still fall into the gap
125
0
          break;
126
0
        }
127
0
      }
128
0
    }
129
0
  }
130
1.83k
  return false;
131
1.83k
}
132
133
size_t CompactionOutputs::UpdateGrandparentBoundaryInfo(
134
1.83k
    const Slice& internal_key) {
135
1.83k
  size_t curr_key_boundary_switched_num = 0;
136
1.83k
  const std::vector<FileMetaData*>& grandparents = compaction_->grandparents();
137
138
1.83k
  if (grandparents.empty()) {
139
1.83k
    return curr_key_boundary_switched_num;
140
1.83k
  }
141
0
  const Comparator* ucmp = compaction_->column_family_data()->user_comparator();
142
143
  // Move the grandparent_index_ to the file containing the current user_key.
144
  // If there are multiple files containing the same user_key, make sure the
145
  // index points to the last file containing the key.
146
0
  while (grandparent_index_ < grandparents.size()) {
147
0
    if (being_grandparent_gap_) {
148
0
      if (sstableKeyCompare(ucmp, internal_key,
149
0
                            grandparents[grandparent_index_]->smallest) < 0) {
150
0
        break;
151
0
      }
152
0
      if (seen_key_) {
153
0
        curr_key_boundary_switched_num++;
154
0
        grandparent_overlapped_bytes_ +=
155
0
            grandparents[grandparent_index_]->fd.GetFileSize();
156
0
        grandparent_boundary_switched_num_++;
157
0
      }
158
0
      being_grandparent_gap_ = false;
159
0
    } else {
160
0
      int cmp_result = sstableKeyCompare(
161
0
          ucmp, internal_key, grandparents[grandparent_index_]->largest);
162
      // If it's same key, make sure grandparent_index_ is pointing to the last
163
      // one.
164
0
      if (cmp_result < 0 ||
165
0
          (cmp_result == 0 &&
166
0
           (grandparent_index_ == grandparents.size() - 1 ||
167
0
            sstableKeyCompare(ucmp, internal_key,
168
0
                              grandparents[grandparent_index_ + 1]->smallest) <
169
0
                0))) {
170
0
        break;
171
0
      }
172
0
      if (seen_key_) {
173
0
        curr_key_boundary_switched_num++;
174
0
        grandparent_boundary_switched_num_++;
175
0
      }
176
0
      being_grandparent_gap_ = true;
177
0
      grandparent_index_++;
178
0
    }
179
0
  }
180
181
  // If the first key is in the middle of a grandparent file, adding it to the
182
  // overlap
183
0
  if (!seen_key_ && !being_grandparent_gap_) {
184
0
    assert(grandparent_overlapped_bytes_ == 0);
185
0
    grandparent_overlapped_bytes_ =
186
0
        GetCurrentKeyGrandparentOverlappedBytes(internal_key);
187
0
  }
188
189
0
  seen_key_ = true;
190
0
  return curr_key_boundary_switched_num;
191
1.83k
}
192
193
uint64_t CompactionOutputs::GetCurrentKeyGrandparentOverlappedBytes(
194
0
    const Slice& internal_key) const {
195
  // no overlap with any grandparent file
196
0
  if (being_grandparent_gap_) {
197
0
    return 0;
198
0
  }
199
0
  uint64_t overlapped_bytes = 0;
200
201
0
  const std::vector<FileMetaData*>& grandparents = compaction_->grandparents();
202
0
  const Comparator* ucmp = compaction_->column_family_data()->user_comparator();
203
0
  InternalKey ikey;
204
0
  ikey.DecodeFrom(internal_key);
205
#ifndef NDEBUG
206
  // make sure the grandparent_index_ is pointing to the last files containing
207
  // the current key.
208
  int cmp_result =
209
      sstableKeyCompare(ucmp, ikey, grandparents[grandparent_index_]->largest);
210
  assert(
211
      cmp_result < 0 ||
212
      (cmp_result == 0 &&
213
       (grandparent_index_ == grandparents.size() - 1 ||
214
        sstableKeyCompare(
215
            ucmp, ikey, grandparents[grandparent_index_ + 1]->smallest) < 0)));
216
  assert(sstableKeyCompare(ucmp, ikey,
217
                           grandparents[grandparent_index_]->smallest) >= 0);
218
#endif
219
0
  overlapped_bytes += grandparents[grandparent_index_]->fd.GetFileSize();
220
221
  // go backwards to find all overlapped files, one key can overlap multiple
222
  // files. In the following example, if the current output key is `c`, and one
223
  // compaction file was cut before `c`, current `c` can overlap with 3 files:
224
  //  [a b]               [c...
225
  // [b, b] [c, c] [c, c] [c, d]
226
0
  for (int64_t i = static_cast<int64_t>(grandparent_index_) - 1;
227
0
       i >= 0 && sstableKeyCompare(ucmp, ikey, grandparents[i]->largest) == 0;
228
0
       i--) {
229
0
    overlapped_bytes += grandparents[i]->fd.GetFileSize();
230
0
  }
231
232
0
  return overlapped_bytes;
233
0
}
234
235
2.17k
bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) {
236
2.17k
  assert(c_iter.Valid());
237
2.17k
  const Slice& internal_key = c_iter.key();
238
#ifndef NDEBUG
239
  bool should_stop = false;
240
  std::pair<bool*, const Slice> p{&should_stop, internal_key};
241
  TEST_SYNC_POINT_CALLBACK(
242
      "CompactionOutputs::ShouldStopBefore::manual_decision", (void*)&p);
243
  if (should_stop) {
244
    return true;
245
  }
246
#endif  // NDEBUG
247
2.17k
  const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_;
248
2.17k
  const InternalKeyComparator* icmp =
249
2.17k
      &compaction_->column_family_data()->internal_comparator();
250
2.17k
  size_t num_grandparent_boundaries_crossed = 0;
251
2.17k
  bool should_stop_for_ttl = false;
252
  // Always update grandparent information like overlapped file number, size
253
  // etc., and TTL states.
254
  // If compaction_->output_level() == 0, there is no need to update grandparent
255
  // info, and that `grandparent` should be empty.
256
2.17k
  if (compaction_->output_level() > 0) {
257
1.83k
    num_grandparent_boundaries_crossed =
258
1.83k
        UpdateGrandparentBoundaryInfo(internal_key);
259
1.83k
    should_stop_for_ttl = UpdateFilesToCutForTTLStates(internal_key);
260
1.83k
  }
261
262
2.17k
  if (!HasBuilder()) {
263
1.84k
    return false;
264
1.84k
  }
265
266
331
  if (should_stop_for_ttl) {
267
0
    return true;
268
0
  }
269
270
  // If there's user defined partitioner, check that first
271
331
  if (partitioner_ && partitioner_->ShouldPartition(PartitionerRequest(
272
0
                          last_key_for_partitioner_, c_iter.user_key(),
273
0
                          current_output_file_size_)) == kRequired) {
274
0
    return true;
275
0
  }
276
277
  // files output to Level 0 won't be split
278
331
  if (compaction_->output_level() == 0) {
279
28
    return false;
280
28
  }
281
282
  // reach the max file size
283
303
  uint64_t estimated_file_size = current_output_file_size_;
284
303
  if (compaction_->mutable_cf_options().target_file_size_is_upper_bound) {
285
0
    estimated_file_size += builder_->EstimatedTailSize();
286
0
  }
287
303
  if (estimated_file_size >= compaction_->max_output_file_size()) {
288
0
    return true;
289
0
  }
290
291
  // Check if it needs to split for RoundRobin
292
  // Invalid local_output_split_key indicates that we do not need to split
293
303
  if (local_output_split_key_ != nullptr && !is_split_) {
294
    // Split occurs when the next key is larger than/equal to the cursor
295
0
    if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) {
296
0
      is_split_ = true;
297
0
      return true;
298
0
    }
299
0
  }
300
301
  // only check if the current key is going to cross the grandparents file
302
  // boundary (either the file beginning or ending).
303
303
  if (num_grandparent_boundaries_crossed > 0) {
304
    // Cut the file before the current key if the size of the current output
305
    // file + its overlapped grandparent files is bigger than
306
    // max_compaction_bytes. Which is to prevent future bigger than
307
    // max_compaction_bytes compaction from the current output level.
308
0
    if (grandparent_overlapped_bytes_ + current_output_file_size_ >
309
0
        compaction_->max_compaction_bytes()) {
310
0
      return true;
311
0
    }
312
313
    // Cut the file if including the key is going to add a skippable file on
314
    // the grandparent level AND its size is reasonably big (1/8 of target file
315
    // size). For example, if it's compacting the files L0 + L1:
316
    //  L0:  [1,   21]
317
    //  L1:    [3,   23]
318
    //  L2: [2, 4] [11, 15] [22, 24]
319
    // Without this break, it will output as:
320
    //  L1: [1,3, 21,23]
321
    // With this break, it will output as (assuming [11, 15] at L2 is bigger
322
    // than 1/8 of target size):
323
    //  L1: [1,3] [21,23]
324
    // Then for the future compactions, [11,15] won't be included.
325
    // For random datasets (either evenly distributed or skewed), it rarely
326
    // triggers this condition, but if the user is adding 2 different datasets
327
    // without any overlap, it may likely happen.
328
    // More details, check PR #1963
329
0
    const size_t num_skippable_boundaries_crossed =
330
0
        being_grandparent_gap_ ? 2 : 3;
331
0
    if (compaction_->immutable_options().compaction_style ==
332
0
            kCompactionStyleLevel &&
333
0
        num_grandparent_boundaries_crossed >=
334
0
            num_skippable_boundaries_crossed &&
335
0
        grandparent_overlapped_bytes_ - previous_overlapped_bytes >
336
0
            compaction_->target_output_file_size() / 8) {
337
0
      return true;
338
0
    }
339
340
    // Pre-cut the output file if it's reaching a certain size AND it's at the
341
    // boundary of a grandparent file. It can reduce the future compaction size,
342
    // the cost is having smaller files.
343
    // The pre-cut size threshold is based on how many grandparent boundaries
344
    // it has seen before. Basically, if it has seen no boundary at all, then it
345
    // will pre-cut at 50% target file size. Every boundary it has seen
346
    // increases the threshold by 5%, max at 90%, which it will always cut.
347
    // The idea is based on if it has seen more boundaries before, it will more
348
    // likely to see another boundary (file cutting opportunity) before the
349
    // target file size. The test shows it can generate larger files than a
350
    // static threshold like 75% and has a similar write amplification
351
    // improvement.
352
0
    if (compaction_->immutable_options().compaction_style ==
353
0
            kCompactionStyleLevel &&
354
0
        current_output_file_size_ >=
355
0
            ((compaction_->target_output_file_size() + 99) / 100) *
356
0
                (50 + std::min(grandparent_boundary_switched_num_ * 5,
357
0
                               size_t{40}))) {
358
0
      return true;
359
0
    }
360
0
  }
361
362
303
  return false;
363
303
}
364
365
Status CompactionOutputs::AddToOutput(
366
    const CompactionIterator& c_iter,
367
    const CompactionFileOpenFunc& open_file_func,
368
    const CompactionFileCloseFunc& close_file_func,
369
2.17k
    const ParsedInternalKey& prev_iter_output_internal_key) {
370
2.17k
  Status s;
371
2.17k
  bool is_range_del = c_iter.IsDeleteRangeSentinelKey();
372
2.17k
  if (is_range_del && compaction_->bottommost_level()) {
373
    // We don't consider range tombstone for bottommost level since:
374
    // 1. there is no grandparent and hence no overlap to consider
375
    // 2. range tombstone may be dropped at bottommost level.
376
0
    return s;
377
0
  }
378
2.17k
  const Slice& key = c_iter.key();
379
2.17k
  if (ShouldStopBefore(c_iter) && HasBuilder()) {
380
0
    s = close_file_func(c_iter.InputStatus(), prev_iter_output_internal_key,
381
0
                        key, &c_iter, *this);
382
0
    if (!s.ok()) {
383
0
      return s;
384
0
    }
385
    // reset grandparent information
386
0
    grandparent_boundary_switched_num_ = 0;
387
0
    grandparent_overlapped_bytes_ =
388
0
        GetCurrentKeyGrandparentOverlappedBytes(key);
389
0
    if (UNLIKELY(is_range_del)) {
390
      // lower bound for this new output file, this is needed as the lower bound
391
      // does not come from the smallest point key in this case.
392
0
      range_tombstone_lower_bound_.DecodeFrom(key);
393
0
    } else {
394
0
      range_tombstone_lower_bound_.Clear();
395
0
    }
396
0
  }
397
398
  // Open output file if necessary
399
2.17k
  if (!HasBuilder()) {
400
1.84k
    s = open_file_func(*this);
401
1.84k
    if (!s.ok()) {
402
0
      return s;
403
0
    }
404
1.84k
  }
405
406
  // c_iter may emit range deletion keys, so update `last_key_for_partitioner_`
407
  // here before returning below when `is_range_del` is true
408
2.17k
  if (partitioner_) {
409
0
    last_key_for_partitioner_.assign(c_iter.user_key().data_,
410
0
                                     c_iter.user_key().size_);
411
0
  }
412
413
2.17k
  if (UNLIKELY(is_range_del)) {
414
0
    return s;
415
0
  }
416
417
2.17k
  assert(builder_ != nullptr);
418
2.17k
  const Slice& value = c_iter.value();
419
2.17k
  s = current_output().validator.Add(key, value);
420
2.17k
  if (!s.ok()) {
421
0
    return s;
422
0
  }
423
2.17k
  builder_->Add(key, value);
424
425
2.17k
  stats_.num_output_records++;
426
2.17k
  current_output_file_size_ = builder_->EstimatedFileSize();
427
428
2.17k
  if (blob_garbage_meter_) {
429
0
    s = blob_garbage_meter_->ProcessOutFlow(key, value);
430
0
  }
431
432
2.17k
  if (!s.ok()) {
433
0
    return s;
434
0
  }
435
436
2.17k
  const ParsedInternalKey& ikey = c_iter.ikey();
437
2.17k
  if (ikey.type == kTypeValuePreferredSeqno) {
438
0
    SequenceNumber preferred_seqno = ParsePackedValueForSeqno(value);
439
0
    smallest_preferred_seqno_ =
440
0
        std::min(smallest_preferred_seqno_, preferred_seqno);
441
0
  }
442
2.17k
  s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence,
443
2.17k
                                             ikey.type);
444
445
2.17k
  return s;
446
2.17k
}
447
448
namespace {
449
void SetMaxSeqAndTs(InternalKey& internal_key, const Slice& user_key,
450
0
                    const size_t ts_sz) {
451
0
  if (ts_sz) {
452
0
    static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff";
453
0
    if (ts_sz <= strlen(kTsMax)) {
454
0
      internal_key = InternalKey(user_key, kMaxSequenceNumber,
455
0
                                 kTypeRangeDeletion, Slice(kTsMax, ts_sz));
456
0
    } else {
457
0
      internal_key =
458
0
          InternalKey(user_key, kMaxSequenceNumber, kTypeRangeDeletion,
459
0
                      std::string(ts_sz, '\xff'));
460
0
    }
461
0
  } else {
462
0
    internal_key.Set(user_key, kMaxSequenceNumber, kTypeRangeDeletion);
463
0
  }
464
0
}
465
}  // namespace
466
467
Status CompactionOutputs::AddRangeDels(
468
    CompactionRangeDelAggregator& range_del_agg,
469
    const Slice* comp_start_user_key, const Slice* comp_end_user_key,
470
    CompactionIterationStats& range_del_out_stats, bool bottommost_level,
471
    const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
472
    std::pair<SequenceNumber, SequenceNumber> keep_seqno_range,
473
0
    const Slice& next_table_min_key, const std::string& full_history_ts_low) {
474
  // The following example does not happen since
475
  // CompactionOutput::ShouldStopBefore() always return false for the first
476
  // point key. But we should consider removing this dependency. Suppose for the
477
  // first compaction output file,
478
  //  - next_table_min_key.user_key == comp_start_user_key
479
  //  - no point key is in the output file
480
  //  - there is a range tombstone @seqno to be added that covers
481
  //  comp_start_user_key
482
  // Then meta.smallest will be set to comp_start_user_key@seqno
483
  // and meta.largest will be set to comp_start_user_key@kMaxSequenceNumber
484
  // which violates the assumption that meta.smallest should be <= meta.largest.
485
0
  assert(!range_del_agg.IsEmpty());
486
0
  FileMetaData& meta = current_output().meta;
487
0
  const Comparator* ucmp = icmp.user_comparator();
488
0
  InternalKey lower_bound_buf, upper_bound_buf;
489
0
  Slice lower_bound_guard, upper_bound_guard;
490
0
  std::string smallest_user_key;
491
0
  const Slice *lower_bound, *upper_bound;
492
493
  // We first determine the internal key lower_bound and upper_bound for
494
  // this output file. All and only range tombstones that overlap with
495
  // [lower_bound, upper_bound] should be added to this file. File
496
  // boundaries (meta.smallest/largest) should be updated accordingly when
497
  // extended by range tombstones.
498
0
  size_t output_size = outputs_.size();
499
0
  if (output_size == 1) {
500
    // This is the first file in the subcompaction.
501
    //
502
    // When outputting a range tombstone that spans a subcompaction boundary,
503
    // the files on either side of that boundary need to include that
504
    // boundary's user key. Otherwise, the spanning range tombstone would lose
505
    // coverage.
506
    //
507
    // To achieve this while preventing files from overlapping in internal key
508
    // (an LSM invariant violation), we allow the earlier file to include the
509
    // boundary user key up to `kMaxSequenceNumber,kTypeRangeDeletion`. The
510
    // later file can begin at the boundary user key at the newest key version
511
    // it contains. At this point that version number is unknown since we have
512
    // not processed the range tombstones yet, so permit any version. Same story
513
    // applies to timestamp, and a non-nullptr `comp_start_user_key` should have
514
    // `kMaxTs` here, which similarly permits any timestamp.
515
0
    if (comp_start_user_key) {
516
0
      lower_bound_buf.Set(*comp_start_user_key, kMaxSequenceNumber,
517
0
                          kTypeRangeDeletion);
518
0
      lower_bound_guard = lower_bound_buf.Encode();
519
0
      lower_bound = &lower_bound_guard;
520
0
    } else {
521
0
      lower_bound = nullptr;
522
0
    }
523
0
  } else {
524
    // For subsequent output tables, only include range tombstones from min
525
    // key onwards since the previous file was extended to contain range
526
    // tombstones falling before min key.
527
0
    if (range_tombstone_lower_bound_.size() > 0) {
528
0
      assert(meta.smallest.size() == 0 ||
529
0
             icmp.Compare(range_tombstone_lower_bound_, meta.smallest) < 0);
530
0
      lower_bound_guard = range_tombstone_lower_bound_.Encode();
531
0
    } else {
532
0
      assert(meta.smallest.size() > 0);
533
0
      lower_bound_guard = meta.smallest.Encode();
534
0
    }
535
0
    lower_bound = &lower_bound_guard;
536
0
  }
537
538
0
  const size_t ts_sz = ucmp->timestamp_size();
539
0
  if (next_table_min_key.empty()) {
540
    // Last file of the subcompaction.
541
0
    if (comp_end_user_key) {
542
0
      upper_bound_buf.Set(*comp_end_user_key, kMaxSequenceNumber,
543
0
                          kTypeRangeDeletion);
544
0
      upper_bound_guard = upper_bound_buf.Encode();
545
0
      upper_bound = &upper_bound_guard;
546
0
    } else {
547
0
      upper_bound = nullptr;
548
0
    }
549
0
  } else {
550
    // There is another file coming whose coverage will begin at
551
    // `next_table_min_key`. The current file needs to extend range tombstone
552
    // coverage through its own keys (through `meta.largest`) and through user
553
    // keys preceding `next_table_min_key`'s user key.
554
0
    ParsedInternalKey next_table_min_key_parsed;
555
0
    ParseInternalKey(next_table_min_key, &next_table_min_key_parsed,
556
0
                     false /* log_err_key */)
557
0
        .PermitUncheckedError();
558
0
    assert(next_table_min_key_parsed.sequence < kMaxSequenceNumber);
559
0
    assert(meta.largest.size() == 0 ||
560
0
           icmp.Compare(meta.largest.Encode(), next_table_min_key) < 0);
561
0
    assert(!lower_bound || icmp.Compare(*lower_bound, next_table_min_key) <= 0);
562
0
    if (meta.largest.size() > 0 &&
563
0
        ucmp->EqualWithoutTimestamp(meta.largest.user_key(),
564
0
                                    next_table_min_key_parsed.user_key)) {
565
      // Caution: this assumes meta.largest.Encode() lives longer than
566
      // upper_bound, which is only true if meta.largest is never updated.
567
      // This just happens to be the case here since meta.largest serves
568
      // as the upper_bound.
569
0
      upper_bound_guard = meta.largest.Encode();
570
0
    } else {
571
0
      SetMaxSeqAndTs(upper_bound_buf, next_table_min_key_parsed.user_key,
572
0
                     ts_sz);
573
0
      upper_bound_guard = upper_bound_buf.Encode();
574
0
    }
575
0
    upper_bound = &upper_bound_guard;
576
0
  }
577
0
  if (lower_bound && upper_bound &&
578
0
      icmp.Compare(*lower_bound, *upper_bound) > 0) {
579
0
    assert(meta.smallest.size() == 0 &&
580
0
           ucmp->EqualWithoutTimestamp(ExtractUserKey(*lower_bound),
581
0
                                       ExtractUserKey(*upper_bound)));
582
    // This can only happen when lower_bound have the same user key as
583
    // next_table_min_key and that there is no point key in the current
584
    // compaction output file.
585
0
    return Status::OK();
586
0
  }
587
  // The end key of the subcompaction must be bigger or equal to the upper
588
  // bound. If the end of subcompaction is null or the upper bound is null,
589
  // it means that this file is the last file in the compaction. So there
590
  // will be no overlapping between this file and others.
591
0
  assert(comp_end_user_key == nullptr || upper_bound == nullptr ||
592
0
         ucmp->CompareWithoutTimestamp(ExtractUserKey(*upper_bound),
593
0
                                       *comp_end_user_key) <= 0);
594
0
  auto it = range_del_agg.NewIterator(lower_bound, upper_bound);
595
0
  Slice last_tombstone_start_user_key{};
596
0
  bool reached_lower_bound = false;
597
0
  const ReadOptions read_options(Env::IOActivity::kCompaction);
598
0
  for (it->SeekToFirst(); it->Valid(); it->Next()) {
599
0
    auto tombstone = it->Tombstone();
600
0
    auto kv = tombstone.Serialize();
601
    // Filter out by seqno for per-key placement
602
0
    if (tombstone.seq_ < keep_seqno_range.first ||
603
0
        tombstone.seq_ >= keep_seqno_range.second) {
604
0
      continue;
605
0
    }
606
607
0
    InternalKey tombstone_end = tombstone.SerializeEndKey();
608
    // TODO: the underlying iterator should support clamping the bounds.
609
    // tombstone_end.Encode is of form user_key@kMaxSeqno
610
    // if it is equal to lower_bound, there is no need to include
611
    // such range tombstone.
612
0
    if (!reached_lower_bound && lower_bound &&
613
0
        icmp.Compare(tombstone_end.Encode(), *lower_bound) <= 0) {
614
0
      continue;
615
0
    }
616
0
    assert(!lower_bound ||
617
0
           icmp.Compare(*lower_bound, tombstone_end.Encode()) <= 0);
618
0
    reached_lower_bound = true;
619
620
    // Garbage collection for range tombstones.
621
    // If user-defined timestamp is enabled, range tombstones are dropped if
622
    // they are at bottommost_level, below full_history_ts_low and not visible
623
    // in any snapshot. trim_ts_ is passed to the constructor for
624
    // range_del_agg_, and range_del_agg_ internally drops tombstones above
625
    // trim_ts_.
626
0
    bool consider_drop =
627
0
        tombstone.seq_ <= earliest_snapshot &&
628
0
        (ts_sz == 0 ||
629
0
         (!full_history_ts_low.empty() &&
630
0
          ucmp->CompareTimestamp(tombstone.ts_, full_history_ts_low) < 0));
631
0
    if (consider_drop && bottommost_level) {
632
      // TODO(andrewkr): tombstones that span multiple output files are
633
      // counted for each compaction output file, so lots of double
634
      // counting.
635
0
      range_del_out_stats.num_range_del_drop_obsolete++;
636
0
      range_del_out_stats.num_record_drop_obsolete++;
637
0
      continue;
638
0
    }
639
640
0
    assert(lower_bound == nullptr ||
641
0
           ucmp->CompareWithoutTimestamp(ExtractUserKey(*lower_bound),
642
0
                                         kv.second) < 0);
643
0
    InternalKey tombstone_start = kv.first;
644
0
    if (lower_bound &&
645
0
        ucmp->CompareWithoutTimestamp(tombstone_start.user_key(),
646
0
                                      ExtractUserKey(*lower_bound)) < 0) {
647
      // This just updates the non-timestamp portion of `tombstone_start`'s user
648
      // key. Ideally there would be a simpler API usage
649
0
      ParsedInternalKey tombstone_start_parsed;
650
0
      ParseInternalKey(tombstone_start.Encode(), &tombstone_start_parsed,
651
0
                       false /* log_err_key */)
652
0
          .PermitUncheckedError();
653
      // timestamp should be from where sequence number is from, which is from
654
      // tombstone in this case
655
0
      std::string ts =
656
0
          tombstone_start_parsed.GetTimestamp(ucmp->timestamp_size())
657
0
              .ToString();
658
0
      tombstone_start_parsed.user_key = ExtractUserKey(*lower_bound);
659
0
      tombstone_start.SetFrom(tombstone_start_parsed, ts);
660
0
    }
661
0
    if (upper_bound != nullptr &&
662
0
        icmp.Compare(*upper_bound, tombstone_start.Encode()) < 0) {
663
0
      break;
664
0
    }
665
0
    if (lower_bound &&
666
0
        icmp.Compare(tombstone_start.Encode(), *lower_bound) < 0) {
667
0
      tombstone_start.DecodeFrom(*lower_bound);
668
0
    }
669
0
    if (upper_bound && icmp.Compare(*upper_bound, tombstone_end.Encode()) < 0) {
670
0
      tombstone_end.DecodeFrom(*upper_bound);
671
0
    }
672
0
    if (consider_drop && compaction_->KeyRangeNotExistsBeyondOutputLevel(
673
0
                             tombstone_start.user_key(),
674
0
                             tombstone_end.user_key(), &level_ptrs_)) {
675
0
      range_del_out_stats.num_range_del_drop_obsolete++;
676
0
      range_del_out_stats.num_record_drop_obsolete++;
677
0
      continue;
678
0
    }
679
    // Here we show that *only* range tombstones that overlap with
680
    // [lower_bound, upper_bound] are added to the current file, and
681
    // sanity checking invariants that should hold:
682
    // - [tombstone_start, tombstone_end] overlaps with [lower_bound,
683
    // upper_bound]
684
    // - meta.smallest <= meta.largest
685
    // Corresponding assertions are made, the proof is broken is any of them
686
    // fails.
687
    // TODO: show that *all* range tombstones that overlap with
688
    //  [lower_bound, upper_bound] are added.
689
    // TODO: some invariant about boundaries are correctly updated.
690
    //
691
    // Note that `tombstone_start` is updated in the if condition above, we use
692
    // tombstone_start to refer to its initial value, i.e.,
693
    // it->Tombstone().first, and use tombstone_start* to refer to its value
694
    // after the update.
695
    //
696
    // To show [lower_bound, upper_bound] overlaps with [tombstone_start,
697
    // tombstone_end]:
698
    // lower_bound <= upper_bound from the if condition right after all
699
    // bounds are initialized. We assume each tombstone fragment has
700
    // start_key.user_key < end_key.user_key, so
701
    // tombstone_start < tombstone_end by
702
    // FragmentedTombstoneIterator::Tombstone(). So these two ranges are both
703
    // non-emtpy. The flag `reached_lower_bound` and the if logic before it
704
    // ensures lower_bound <= tombstone_end. tombstone_start is only updated
705
    // if it has a smaller user_key than lower_bound user_key, so
706
    // tombstone_start <= tombstone_start*. The above if condition implies
707
    // tombstone_start* <= upper_bound. So we have
708
    // tombstone_start <= upper_bound and lower_bound <= tombstone_end
709
    // and the two ranges overlap.
710
    //
711
    // To show meta.smallest <= meta.largest:
712
    // From the implementation of UpdateBoundariesForRange(), it suffices to
713
    // prove that when it is first called in this function, its parameters
714
    // satisfy `start <= end`, where start = max(tombstone_start*, lower_bound)
715
    // and end = min(tombstone_end, upper_bound). From the above proof we have
716
    // lower_bound <= tombstone_end and lower_bound <= upper_bound. We only need
717
    // to show that tombstone_start* <= min(tombstone_end, upper_bound).
718
    // Note that tombstone_start*.user_key = max(tombstone_start.user_key,
719
    // lower_bound.user_key). Assuming tombstone_end always has
720
    // kMaxSequenceNumber and lower_bound.seqno < kMaxSequenceNumber.
721
    // Since lower_bound <= tombstone_end and lower_bound.seqno <
722
    // tombstone_end.seqno (in absolute number order, not internal key order),
723
    // lower_bound.user_key < tombstone_end.user_key.
724
    // Since lower_bound.user_key < tombstone_end.user_key and
725
    // tombstone_start.user_key < tombstone_end.user_key, tombstone_start* <
726
    // tombstone_end. Since tombstone_start* <= upper_bound from the above proof
727
    // and tombstone_start* < tombstone_end, tombstone_start* <=
728
    // min(tombstone_end, upper_bound), so the two ranges overlap.
729
730
    // Range tombstone is not supported by output validator yet.
731
0
    builder_->Add(kv.first.Encode(), kv.second);
732
0
    assert(icmp.Compare(tombstone_start, tombstone_end) <= 0);
733
0
    meta.UpdateBoundariesForRange(tombstone_start, tombstone_end,
734
0
                                  tombstone.seq_, icmp);
735
0
    if (!bottommost_level) {
736
0
      bool start_user_key_changed =
737
0
          last_tombstone_start_user_key.empty() ||
738
0
          ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key,
739
0
                                        it->start_key()) < 0;
740
0
      last_tombstone_start_user_key = it->start_key();
741
0
      if (start_user_key_changed) {
742
        // If tombstone_start >= tombstone_end, then either no key range is
743
        // covered, or that they have the same user key. If they have the same
744
        // user key, then the internal key range should only be within this
745
        // level, and no keys from older levels is covered.
746
0
        if (ucmp->CompareWithoutTimestamp(tombstone_start.user_key(),
747
0
                                          tombstone_end.user_key()) < 0) {
748
0
          SizeApproximationOptions approx_opts;
749
0
          approx_opts.files_size_error_margin = 0.1;
750
0
          auto approximate_covered_size =
751
0
              compaction_->input_version()->version_set()->ApproximateSize(
752
0
                  approx_opts, read_options, compaction_->input_version(),
753
0
                  tombstone_start.Encode(), tombstone_end.Encode(),
754
0
                  compaction_->output_level() + 1 /* start_level */,
755
0
                  -1 /* end_level */, kCompaction);
756
0
          meta.compensated_range_deletion_size += approximate_covered_size;
757
0
        }
758
0
      }
759
0
    }
760
0
  }
761
0
  return Status::OK();
762
0
}
763
764
5.57k
void CompactionOutputs::FillFilesToCutForTtl() {
765
5.57k
  if (compaction_->immutable_options().compaction_style !=
766
5.57k
          kCompactionStyleLevel ||
767
5.57k
      compaction_->immutable_options().compaction_pri != kMinOverlappingRatio ||
768
5.57k
      compaction_->mutable_cf_options().ttl == 0 ||
769
5.57k
      compaction_->num_input_levels() < 2 || compaction_->bottommost_level()) {
770
5.57k
    return;
771
5.57k
  }
772
773
  // We define new file with the oldest ancestor time to be younger than 1/4
774
  // TTL, and an old one to be older than 1/2 TTL time.
775
0
  int64_t temp_current_time;
776
0
  auto get_time_status = compaction_->immutable_options().clock->GetCurrentTime(
777
0
      &temp_current_time);
778
0
  if (!get_time_status.ok()) {
779
0
    return;
780
0
  }
781
782
0
  auto current_time = static_cast<uint64_t>(temp_current_time);
783
0
  if (current_time < compaction_->mutable_cf_options().ttl) {
784
0
    return;
785
0
  }
786
787
0
  uint64_t old_age_thres =
788
0
      current_time - compaction_->mutable_cf_options().ttl / 2;
789
0
  const std::vector<FileMetaData*>& olevel =
790
0
      *(compaction_->inputs(compaction_->num_input_levels() - 1));
791
0
  for (FileMetaData* file : olevel) {
792
    // Worth filtering out by start and end?
793
0
    uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime();
794
    // We put old files if they are not too small to prevent a flood
795
    // of small files.
796
0
    if (oldest_ancester_time < old_age_thres &&
797
0
        file->fd.GetFileSize() >
798
0
            compaction_->mutable_cf_options().target_file_size_base / 2) {
799
0
      files_to_cut_for_ttl_.push_back(file);
800
0
    }
801
0
  }
802
0
}
803
804
CompactionOutputs::CompactionOutputs(const Compaction* compaction,
805
                                     const bool is_proximal_level)
806
6.85k
    : compaction_(compaction), is_proximal_level_(is_proximal_level) {
807
6.85k
  partitioner_ = compaction->output_level() == 0
808
6.85k
                     ? nullptr
809
6.85k
                     : compaction->CreateSstPartitioner();
810
811
6.85k
  if (compaction->output_level() != 0) {
812
5.57k
    FillFilesToCutForTtl();
813
5.57k
  }
814
815
6.85k
  level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0);
816
6.85k
}
817
818
}  // namespace ROCKSDB_NAMESPACE