Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_outputs.h
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
//
7
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8
// Use of this source code is governed by a BSD-style license that can be
9
// found in the LICENSE file. See the AUTHORS file for names of contributors.
10
11
#pragma once
12
13
#include "db/blob/blob_garbage_meter.h"
14
#include "db/compaction/compaction.h"
15
#include "db/compaction/compaction_iterator.h"
16
#include "db/internal_stats.h"
17
#include "db/output_validator.h"
18
19
namespace ROCKSDB_NAMESPACE {
20
21
class CompactionOutputs;
22
using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>;
23
using CompactionFileCloseFunc =
24
    std::function<Status(const Status&, const ParsedInternalKey&, const Slice&,
25
                         const CompactionIterator*, CompactionOutputs&)>;
26
27
// Files produced by subcompaction, most of the functions are used by
28
// compaction_job Open/Close compaction file functions.
29
class CompactionOutputs {
30
 public:
31
  // compaction output file
32
  struct Output {
33
    Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
34
           bool _enable_hash, bool _finished, uint64_t precalculated_hash,
35
           bool _is_proximal_level)
36
1.63k
        : meta(std::move(_meta)),
37
1.63k
          validator(_icmp, _enable_hash, precalculated_hash),
38
1.63k
          finished(_finished),
39
1.63k
          is_proximal_level(_is_proximal_level) {}
40
    FileMetaData meta;
41
    OutputValidator validator;
42
    bool finished;
43
    bool is_proximal_level;
44
    std::shared_ptr<const TableProperties> table_properties;
45
  };
46
47
  CompactionOutputs() = delete;
48
49
  explicit CompactionOutputs(const Compaction* compaction,
50
                             const bool is_proximal_level);
51
52
1.63k
  bool IsProximalLevel() const { return is_proximal_level_; }
53
54
  // Add generated output to the list
55
  void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
56
                 bool enable_hash, bool finished = false,
57
1.63k
                 uint64_t precalculated_hash = 0) {
58
1.63k
    outputs_.emplace_back(std::move(meta), icmp, enable_hash, finished,
59
1.63k
                          precalculated_hash, is_proximal_level_);
60
1.63k
  }
61
62
0
  const std::vector<Output>& GetOutputs() const { return outputs_; }
63
64
  // Set new table builder for the current output
65
  void NewBuilder(const TableBuilderOptions& tboptions);
66
67
  // Assign a new WritableFileWriter to the current output
68
1.63k
  void AssignFileWriter(WritableFileWriter* writer) {
69
1.63k
    file_writer_.reset(writer);
70
1.63k
  }
71
72
  // TODO: Move the BlobDB builder into CompactionOutputs
73
2.03k
  const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
74
2.03k
    if (is_proximal_level_) {
75
0
      assert(blob_file_additions_.empty());
76
0
    }
77
2.03k
    return blob_file_additions_;
78
2.03k
  }
79
80
0
  std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() {
81
0
    assert(!is_proximal_level_);
82
0
    return &blob_file_additions_;
83
0
  }
84
85
2.03k
  bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); }
86
87
  // Get all file paths (SST and blob) created during compaction.
88
0
  const std::vector<std::string>& GetOutputFilePaths() const {
89
0
    return output_file_paths_;
90
0
  }
91
92
0
  std::vector<std::string>* GetOutputFilePathsPtr() {
93
0
    return &output_file_paths_;
94
0
  }
95
96
1.63k
  void AddOutputFilePath(const std::string& path) {
97
1.63k
    output_file_paths_.push_back(path);
98
1.63k
  }
99
100
0
  BlobGarbageMeter* CreateBlobGarbageMeter() {
101
0
    assert(!is_proximal_level_);
102
0
    blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>();
103
0
    return blob_garbage_meter_.get();
104
0
  }
105
106
2.03k
  BlobGarbageMeter* GetBlobGarbageMeter() const {
107
2.03k
    if (is_proximal_level_) {
108
      // blobdb doesn't support per_key_placement yet
109
0
      assert(blob_garbage_meter_ == nullptr);
110
0
      return nullptr;
111
0
    }
112
2.03k
    return blob_garbage_meter_.get();
113
2.03k
  }
114
115
0
  void UpdateBlobStats() {
116
0
    assert(!is_proximal_level_);
117
0
    stats_.num_output_files_blob =
118
0
        static_cast<int>(blob_file_additions_.size());
119
0
    for (const auto& blob : blob_file_additions_) {
120
0
      stats_.bytes_written_blob += blob.GetTotalBlobBytes();
121
0
    }
122
0
  }
123
124
  // Finish the current output file
125
  Status Finish(const Status& intput_status,
126
                const SeqnoToTimeMapping& seqno_to_time_mapping);
127
128
  // Update output table properties from already populated TableProperties.
129
  // Used for remote compaction
130
0
  void UpdateTableProperties(const TableProperties& table_properties) {
131
0
    current_output().table_properties =
132
0
        std::make_shared<TableProperties>(table_properties);
133
0
  }
134
  // Update output table properties from table builder
135
1.19k
  void UpdateTableProperties() {
136
1.19k
    current_output().table_properties =
137
1.19k
        std::make_shared<TableProperties>(GetTableProperties());
138
1.19k
  }
139
140
  IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock,
141
                           Statistics* statistics, bool use_fsync);
142
143
2.39k
  TableProperties GetTableProperties() {
144
2.39k
    return builder_->GetTableProperties();
145
2.39k
  }
146
147
1.19k
  Slice SmallestUserKey() const {
148
1.19k
    if (!outputs_.empty() && outputs_[0].finished) {
149
1.19k
      return outputs_[0].meta.smallest.user_key();
150
1.19k
    } else {
151
0
      return Slice{nullptr, 0};
152
0
    }
153
1.19k
  }
154
155
1.19k
  Slice LargestUserKey() const {
156
1.19k
    if (!outputs_.empty() && outputs_.back().finished) {
157
1.19k
      return outputs_.back().meta.largest.user_key();
158
1.19k
    } else {
159
0
      return Slice{nullptr, 0};
160
0
    }
161
1.19k
  }
162
163
  // In case the last output file is empty, which doesn't need to keep.
164
6.14k
  void RemoveLastEmptyOutput() {
165
6.14k
    if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) {
166
      // An error occurred, so ignore the last output.
167
437
      outputs_.pop_back();
168
437
    }
169
6.14k
  }
170
171
  // Remove the last output, for example the last output doesn't have data (no
172
  // entry and no range-dels), but file_size might not be 0, as it has SST
173
  // metadata.
174
0
  void RemoveLastOutput() {
175
0
    assert(!outputs_.empty());
176
0
    outputs_.pop_back();
177
0
  }
178
179
8.90k
  bool HasBuilder() const { return builder_ != nullptr; }
180
181
4.46k
  FileMetaData* GetMetaData() { return &current_output().meta; }
182
183
6.30k
  bool HasOutput() const { return !outputs_.empty(); }
184
185
1.63k
  uint64_t NumEntries() const { return builder_->NumEntries(); }
186
187
3.07k
  uint64_t GetWorkerCPUMicros() const {
188
3.07k
    return worker_cpu_micros_ + (builder_ ? builder_->GetWorkerCPUMicros() : 0);
189
3.07k
  }
190
191
1.63k
  void ResetBuilder() {
192
1.63k
    builder_.reset();
193
1.63k
    current_output_file_size_ = 0;
194
1.63k
  }
195
196
  // Add range deletions from the range_del_agg_ to the current output file.
197
  // Input parameters, `range_tombstone_lower_bound_` and current output's
198
  // metadata determine the bounds on range deletions to add. Updates output
199
  // file metadata boundary if extended by range tombstones.
200
  //
201
  // @param comp_start_user_key and comp_end_user_key include timestamp if
202
  // user-defined timestamp is enabled. Their timestamp should be max timestamp.
203
  // @param next_table_min_key internal key lower bound for the next compaction
204
  // output.
205
  // @param full_history_ts_low used for range tombstone garbage collection.
206
  Status AddRangeDels(
207
      CompactionRangeDelAggregator& range_del_agg,
208
      const Slice* comp_start_user_key, const Slice* comp_end_user_key,
209
      CompactionIterationStats& range_del_out_stats, bool bottommost_level,
210
      const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
211
      std::pair<SequenceNumber, SequenceNumber> keep_seqno_range,
212
      const Slice& next_table_min_key, const std::string& full_history_ts_low);
213
214
0
  void SetNumOutputRecords(uint64_t num_output_records) {
215
0
    stats_.num_output_records = num_output_records;
216
0
  }
217
218
 private:
219
  friend class SubcompactionState;
220
221
  void FillFilesToCutForTtl();
222
223
  void SetOutputSlitKey(const std::optional<Slice> start,
224
3.07k
                        const std::optional<Slice> end) {
225
3.07k
    const InternalKeyComparator* icmp =
226
3.07k
        &compaction_->column_family_data()->internal_comparator();
227
228
3.07k
    const InternalKey* output_split_key = compaction_->GetOutputSplitKey();
229
    // Invalid output_split_key indicates that we do not need to split
230
3.07k
    if (output_split_key != nullptr) {
231
      // We may only split the output when the cursor is in the range. Split
232
0
      if ((!end.has_value() ||
233
0
           icmp->user_comparator()->Compare(
234
0
               ExtractUserKey(output_split_key->Encode()), *end) < 0) &&
235
0
          (!start.has_value() ||
236
0
           icmp->user_comparator()->Compare(
237
0
               ExtractUserKey(output_split_key->Encode()), *start) > 0)) {
238
0
        local_output_split_key_ = output_split_key;
239
0
      }
240
0
    }
241
3.07k
  }
242
243
  // Returns true iff we should stop building the current output
244
  // before processing the current key in compaction iterator.
245
  bool ShouldStopBefore(const CompactionIterator& c_iter);
246
247
6.14k
  void Cleanup() {
248
6.14k
    if (builder_ != nullptr) {
249
      // May happen if we get a shutdown call in the middle of compaction
250
0
      builder_->Abandon();
251
0
      builder_.reset();
252
0
    }
253
6.14k
  }
254
255
  // Updates states related to file cutting for TTL.
256
  // Returns a boolean value indicating whether the current
257
  // compaction output file should be cut before `internal_key`.
258
  //
259
  // @param internal_key the current key to be added to output.
260
  bool UpdateFilesToCutForTTLStates(const Slice& internal_key);
261
262
  // update tracked grandparents information like grandparent index, if it's
263
  // in the gap between 2 grandparent files, accumulated grandparent files size
264
  // etc.
265
  // It returns how many boundaries it crosses by including current key.
266
  size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key);
267
268
  // helper function to get the overlapped grandparent files size, it's only
269
  // used for calculating the first key's overlap.
270
  uint64_t GetCurrentKeyGrandparentOverlappedBytes(
271
      const Slice& internal_key) const;
272
273
  // Add current key from compaction_iterator to the output file. If needed
274
  // close and open new compaction output with the functions provided.
275
  Status AddToOutput(const CompactionIterator& c_iter,
276
                     const CompactionFileOpenFunc& open_file_func,
277
                     const CompactionFileCloseFunc& close_file_func,
278
                     const ParsedInternalKey& prev_iter_output_internal_key);
279
280
  // Close the current output. `open_file_func` is needed for creating new file
281
  // for range-dels only output file.
282
  Status CloseOutput(const Status& curr_status,
283
                     CompactionRangeDelAggregator* range_del_agg,
284
                     const CompactionFileOpenFunc& open_file_func,
285
3.07k
                     const CompactionFileCloseFunc& close_file_func) {
286
3.07k
    Status status = curr_status;
287
    // Handle subcompaction containing only range deletions. They could
288
    // be dropped or sent to another output level, so this is only an
289
    // over-approximate check for whether opening is needed.
290
3.07k
    if (status.ok() && !HasBuilder() && !HasOutput() && range_del_agg &&
291
838
        !range_del_agg->IsEmpty()) {
292
0
      status = open_file_func(*this);
293
0
    }
294
295
3.07k
    if (HasBuilder()) {
296
1.63k
      const ParsedInternalKey empty_internal_key{};
297
1.63k
      const Slice empty_key{};
298
1.63k
      Status s = close_file_func(status, empty_internal_key, empty_key,
299
1.63k
                                 nullptr /* c_iter */, *this);
300
1.63k
      if (!s.ok() && status.ok()) {
301
0
        status = s;
302
0
      }
303
1.63k
    }
304
305
3.07k
    return status;
306
3.07k
  }
307
308
  // This subcompaction's output could be empty if compaction was aborted before
309
  // this subcompaction had a chance to generate any output files. When
310
  // subcompactions are executed sequentially this is more likely and will be
311
  // particularly likely for the later subcompactions to be empty. Once they are
312
  // run in parallel however it should be much rarer.
313
  // It's caller's responsibility to make sure it's not empty.
314
11.0k
  Output& current_output() {
315
11.0k
    assert(!outputs_.empty());
316
11.0k
    return outputs_.back();
317
11.0k
  }
318
319
  const Compaction* compaction_;
320
321
  // current output builder and writer
322
  std::unique_ptr<TableBuilder> builder_;
323
  std::unique_ptr<WritableFileWriter> file_writer_;
324
  uint64_t current_output_file_size_ = 0;
325
  SequenceNumber smallest_preferred_seqno_ = kMaxSequenceNumber;
326
327
  // Sum of all the GetWorkerCPUMicros() for all the closed builders so far.
328
  uint64_t worker_cpu_micros_ = 0;
329
330
  // all the compaction outputs so far
331
  std::vector<Output> outputs_;
332
333
  // BlobDB info
334
  std::vector<BlobFileAddition> blob_file_additions_;
335
  std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
336
337
  // All file paths (SST and blob) created during compaction.
338
  // Used for cleanup on abort - ensures orphan files are deleted even if
339
  // they were removed from outputs_ or blob_file_additions_ (e.g., by
340
  // RemoveLastEmptyOutput when file_size is 0 because builder was abandoned).
341
  std::vector<std::string> output_file_paths_;
342
343
  // Per level's output stat
344
  InternalStats::CompactionStats stats_;
345
346
  // indicate if this CompactionOutputs obj for proximal_level, should always
347
  // be false if per_key_placement feature is not enabled.
348
  const bool is_proximal_level_;
349
350
  // partitioner information
351
  std::string last_key_for_partitioner_;
352
  std::unique_ptr<SstPartitioner> partitioner_;
353
354
  // A flag determines if this subcompaction has been split by the cursor
355
  // for RoundRobin compaction
356
  bool is_split_ = false;
357
358
  // We also maintain the output split key for each subcompaction to avoid
359
  // repetitive comparison in ShouldStopBefore()
360
  const InternalKey* local_output_split_key_ = nullptr;
361
362
  // Some identified files with old oldest ancester time and the range should be
363
  // isolated out so that the output file(s) in that range can be merged down
364
  // for TTL and clear the timestamps for the range.
365
  std::vector<FileMetaData*> files_to_cut_for_ttl_;
366
  int cur_files_to_cut_for_ttl_ = -1;
367
  int next_files_to_cut_for_ttl_ = 0;
368
369
  // An index that used to speed up ShouldStopBefore().
370
  size_t grandparent_index_ = 0;
371
372
  // if the output key is being grandparent files gap, so:
373
  //  key > grandparents[grandparent_index_ - 1].largest &&
374
  //  key < grandparents[grandparent_index_].smallest
375
  bool being_grandparent_gap_ = true;
376
377
  // The number of bytes overlapping between the current output and
378
  // grandparent files used in ShouldStopBefore().
379
  uint64_t grandparent_overlapped_bytes_ = 0;
380
381
  // A flag determines whether the key has been seen in ShouldStopBefore()
382
  bool seen_key_ = false;
383
384
  // for the current output file, how many file boundaries has it crossed,
385
  // basically number of files overlapped * 2
386
  size_t grandparent_boundary_switched_num_ = 0;
387
388
  // The smallest key of the current output file, this is set when current
389
  // output file's smallest key is a range tombstone start key.
390
  InternalKey range_tombstone_lower_bound_;
391
392
  // Used for calls to compaction->KeyRangeNotExistsBeyondOutputLevel() in
393
  // CompactionOutputs::AddRangeDels().
394
  // level_ptrs_[i] holds index of the file that was checked during the last
395
  // call to compaction->KeyRangeNotExistsBeyondOutputLevel(). This allows
396
  // future calls to the function to pick up where it left off, since each
397
  // range tombstone added to output file within each subcompaction is in
398
  // increasing key range.
399
  std::vector<size_t> level_ptrs_;
400
};
401
402
// helper struct to concatenate the last level and proximal level outputs
403
// which could be replaced by std::ranges::join_view() in c++20
404
struct OutputIterator {
405
 public:
406
  explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a,
407
                          const std::vector<CompactionOutputs::Output>& b)
408
7.14k
      : a_(a), b_(b) {
409
7.14k
    within_a = !a_.empty();
410
7.14k
    idx_ = 0;
411
7.14k
  }
412
413
7.14k
  OutputIterator begin() { return *this; }
414
415
7.14k
  OutputIterator end() { return *this; }
416
417
0
  size_t size() { return a_.size() + b_.size(); }
418
419
3.59k
  const CompactionOutputs::Output& operator*() const {
420
3.59k
    return within_a ? a_[idx_] : b_[idx_];
421
3.59k
  }
422
423
3.59k
  OutputIterator& operator++() {
424
3.59k
    idx_++;
425
3.59k
    if (within_a && idx_ >= a_.size()) {
426
0
      within_a = false;
427
0
      idx_ = 0;
428
0
    }
429
3.59k
    assert(within_a || idx_ <= b_.size());
430
3.59k
    return *this;
431
3.59k
  }
432
433
10.7k
  bool operator!=(const OutputIterator& /*rhs*/) const {
434
10.7k
    return within_a || idx_ < b_.size();
435
10.7k
  }
436
437
 private:
438
  const std::vector<CompactionOutputs::Output>& a_;
439
  const std::vector<CompactionOutputs::Output>& b_;
440
  bool within_a;
441
  size_t idx_;
442
};
443
444
}  // namespace ROCKSDB_NAMESPACE