Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_outputs.h
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
//
7
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
8
// Use of this source code is governed by a BSD-style license that can be
9
// found in the LICENSE file. See the AUTHORS file for names of contributors.
10
11
#pragma once
12
13
#include "db/blob/blob_garbage_meter.h"
14
#include "db/compaction/compaction.h"
15
#include "db/compaction/compaction_iterator.h"
16
#include "db/internal_stats.h"
17
#include "db/output_validator.h"
18
19
namespace ROCKSDB_NAMESPACE {
20
21
class CompactionOutputs;
22
using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>;
23
using CompactionFileCloseFunc =
24
    std::function<Status(const Status&, const ParsedInternalKey&, const Slice&,
25
                         const CompactionIterator*, CompactionOutputs&)>;
26
27
// Files produced by subcompaction, most of the functions are used by
28
// compaction_job Open/Close compaction file functions.
29
class CompactionOutputs {
30
 public:
31
  // compaction output file
32
  struct Output {
33
    Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp,
34
           bool _enable_hash, bool _finished, uint64_t precalculated_hash,
35
           bool _is_proximal_level)
36
2.29k
        : meta(std::move(_meta)),
37
2.29k
          validator(_icmp, _enable_hash, precalculated_hash),
38
2.29k
          finished(_finished),
39
2.29k
          is_proximal_level(_is_proximal_level) {}
40
    FileMetaData meta;
41
    OutputValidator validator;
42
    bool finished;
43
    bool is_proximal_level;
44
    std::shared_ptr<const TableProperties> table_properties;
45
  };
46
47
  CompactionOutputs() = delete;
48
49
  explicit CompactionOutputs(const Compaction* compaction,
50
                             const bool is_proximal_level);
51
52
2.29k
  bool IsProximalLevel() const { return is_proximal_level_; }
53
54
  // Add generated output to the list
55
  void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp,
56
                 bool enable_hash, bool finished = false,
57
2.29k
                 uint64_t precalculated_hash = 0) {
58
2.29k
    outputs_.emplace_back(std::move(meta), icmp, enable_hash, finished,
59
2.29k
                          precalculated_hash, is_proximal_level_);
60
2.29k
  }
61
62
0
  const std::vector<Output>& GetOutputs() const { return outputs_; }
63
0
  std::vector<Output>& GetMutableOutputs() { return outputs_; }
64
65
  // Set new table builder for the current output
66
  void NewBuilder(const TableBuilderOptions& tboptions);
67
68
  // Assign a new WritableFileWriter to the current output
69
2.29k
  void AssignFileWriter(WritableFileWriter* writer) {
70
2.29k
    file_writer_.reset(writer);
71
2.29k
  }
72
73
  // TODO: Move the BlobDB builder into CompactionOutputs
74
2.89k
  const std::vector<BlobFileAddition>& GetBlobFileAdditions() const {
75
2.89k
    if (is_proximal_level_) {
76
0
      assert(blob_file_additions_.empty());
77
0
    }
78
2.89k
    return blob_file_additions_;
79
2.89k
  }
80
81
0
  std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() {
82
0
    assert(!is_proximal_level_);
83
0
    return &blob_file_additions_;
84
0
  }
85
86
2.89k
  bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); }
87
88
  // Get all file paths (SST and blob) created during compaction.
89
0
  const std::vector<std::string>& GetOutputFilePaths() const {
90
0
    return output_file_paths_;
91
0
  }
92
93
0
  std::vector<std::string>* GetOutputFilePathsPtr() {
94
0
    return &output_file_paths_;
95
0
  }
96
97
2.29k
  void AddOutputFilePath(const std::string& path) {
98
2.29k
    output_file_paths_.push_back(path);
99
2.29k
  }
100
101
0
  BlobGarbageMeter* CreateBlobGarbageMeter() {
102
0
    assert(!is_proximal_level_);
103
0
    blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>();
104
0
    return blob_garbage_meter_.get();
105
0
  }
106
107
2.89k
  BlobGarbageMeter* GetBlobGarbageMeter() const {
108
2.89k
    if (is_proximal_level_) {
109
      // blobdb doesn't support per_key_placement yet
110
0
      assert(blob_garbage_meter_ == nullptr);
111
0
      return nullptr;
112
0
    }
113
2.89k
    return blob_garbage_meter_.get();
114
2.89k
  }
115
116
0
  void UpdateBlobStats() {
117
0
    assert(!is_proximal_level_);
118
0
    stats_.num_output_files_blob =
119
0
        static_cast<int>(blob_file_additions_.size());
120
0
    for (const auto& blob : blob_file_additions_) {
121
0
      stats_.bytes_written_blob += blob.GetTotalBlobBytes();
122
0
    }
123
0
  }
124
125
  // Finish the current output file
126
  Status Finish(const Status& intput_status,
127
                const SeqnoToTimeMapping& seqno_to_time_mapping);
128
129
  // Update output table properties from already populated TableProperties.
130
  // Used for remote compaction
131
0
  void UpdateTableProperties(const TableProperties& table_properties) {
132
0
    current_output().table_properties =
133
0
        std::make_shared<TableProperties>(table_properties);
134
0
  }
135
  // Update output table properties from table builder
136
1.69k
  void UpdateTableProperties() {
137
1.69k
    current_output().table_properties =
138
1.69k
        std::make_shared<TableProperties>(GetTableProperties());
139
1.69k
  }
140
141
  IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock,
142
                           Statistics* statistics, bool use_fsync);
143
144
3.39k
  TableProperties GetTableProperties() {
145
3.39k
    return builder_->GetTableProperties();
146
3.39k
  }
147
148
1.69k
  Slice SmallestUserKey() const {
149
1.69k
    if (!outputs_.empty() && outputs_[0].finished) {
150
1.69k
      return outputs_[0].meta.smallest.user_key();
151
1.69k
    } else {
152
0
      return Slice{nullptr, 0};
153
0
    }
154
1.69k
  }
155
156
1.69k
  Slice LargestUserKey() const {
157
1.69k
    if (!outputs_.empty() && outputs_.back().finished) {
158
1.69k
      return outputs_.back().meta.largest.user_key();
159
1.69k
    } else {
160
0
      return Slice{nullptr, 0};
161
0
    }
162
1.69k
  }
163
164
  // In case the last output file is empty, which doesn't need to keep.
165
8.50k
  void RemoveLastEmptyOutput() {
166
8.50k
    if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) {
167
      // An error occurred, so ignore the last output.
168
595
      outputs_.pop_back();
169
595
    }
170
8.50k
  }
171
172
  // Remove the last output, for example the last output doesn't have data (no
173
  // entry and no range-dels), but file_size might not be 0, as it has SST
174
  // metadata.
175
0
  void RemoveLastOutput() {
176
0
    assert(!outputs_.empty());
177
0
    outputs_.pop_back();
178
0
  }
179
180
12.4k
  bool HasBuilder() const { return builder_ != nullptr; }
181
182
6.28k
  FileMetaData* GetMetaData() { return &current_output().meta; }
183
184
8.84k
  bool HasOutput() const { return !outputs_.empty(); }
185
186
2.29k
  uint64_t NumEntries() const { return builder_->NumEntries(); }
187
188
4.25k
  uint64_t GetWorkerCPUMicros() const {
189
4.25k
    return worker_cpu_micros_ + (builder_ ? builder_->GetWorkerCPUMicros() : 0);
190
4.25k
  }
191
192
2.29k
  void ResetBuilder() {
193
2.29k
    builder_.reset();
194
2.29k
    current_output_file_size_ = 0;
195
2.29k
  }
196
197
  // Add range deletions from the range_del_agg_ to the current output file.
198
  // Input parameters, `range_tombstone_lower_bound_` and current output's
199
  // metadata determine the bounds on range deletions to add. Updates output
200
  // file metadata boundary if extended by range tombstones.
201
  //
202
  // @param comp_start_user_key and comp_end_user_key include timestamp if
203
  // user-defined timestamp is enabled. Their timestamp should be max timestamp.
204
  // @param next_table_min_key internal key lower bound for the next compaction
205
  // output.
206
  // @param full_history_ts_low used for range tombstone garbage collection.
207
  Status AddRangeDels(
208
      CompactionRangeDelAggregator& range_del_agg,
209
      const Slice* comp_start_user_key, const Slice* comp_end_user_key,
210
      CompactionIterationStats& range_del_out_stats, bool bottommost_level,
211
      const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot,
212
      std::pair<SequenceNumber, SequenceNumber> keep_seqno_range,
213
      const Slice& next_table_min_key, const std::string& full_history_ts_low);
214
215
0
  void SetNumOutputRecords(uint64_t num_output_records) {
216
0
    stats_.num_output_records = num_output_records;
217
0
  }
218
219
 private:
220
  friend class SubcompactionState;
221
222
  void FillFilesToCutForTtl();
223
224
  void SetOutputSlitKey(const std::optional<Slice> start,
225
4.25k
                        const std::optional<Slice> end) {
226
4.25k
    const InternalKeyComparator* icmp =
227
4.25k
        &compaction_->column_family_data()->internal_comparator();
228
229
4.25k
    const InternalKey* output_split_key = compaction_->GetOutputSplitKey();
230
    // Invalid output_split_key indicates that we do not need to split
231
4.25k
    if (output_split_key != nullptr) {
232
      // We may only split the output when the cursor is in the range. Split
233
0
      if ((!end.has_value() ||
234
0
           icmp->user_comparator()->Compare(
235
0
               ExtractUserKey(output_split_key->Encode()), *end) < 0) &&
236
0
          (!start.has_value() ||
237
0
           icmp->user_comparator()->Compare(
238
0
               ExtractUserKey(output_split_key->Encode()), *start) > 0)) {
239
0
        local_output_split_key_ = output_split_key;
240
0
      }
241
0
    }
242
4.25k
  }
243
244
  // Returns true iff we should stop building the current output
245
  // before processing the current key in compaction iterator.
246
  bool ShouldStopBefore(const CompactionIterator& c_iter);
247
248
8.50k
  void Cleanup() {
249
8.50k
    if (builder_ != nullptr) {
250
      // May happen if we get a shutdown call in the middle of compaction
251
0
      builder_->Abandon();
252
0
      builder_.reset();
253
0
    }
254
8.50k
  }
255
256
  // Updates states related to file cutting for TTL.
257
  // Returns a boolean value indicating whether the current
258
  // compaction output file should be cut before `internal_key`.
259
  //
260
  // @param internal_key the current key to be added to output.
261
  bool UpdateFilesToCutForTTLStates(const Slice& internal_key);
262
263
  // update tracked grandparents information like grandparent index, if it's
264
  // in the gap between 2 grandparent files, accumulated grandparent files size
265
  // etc.
266
  // It returns how many boundaries it crosses by including current key.
267
  size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key);
268
269
  // helper function to get the overlapped grandparent files size, it's only
270
  // used for calculating the first key's overlap.
271
  uint64_t GetCurrentKeyGrandparentOverlappedBytes(
272
      const Slice& internal_key) const;
273
274
  // Add current key from compaction_iterator to the output file. If needed
275
  // close and open new compaction output with the functions provided.
276
  Status AddToOutput(const CompactionIterator& c_iter,
277
                     const CompactionFileOpenFunc& open_file_func,
278
                     const CompactionFileCloseFunc& close_file_func,
279
                     const ParsedInternalKey& prev_iter_output_internal_key);
280
281
  // Close the current output. `open_file_func` is needed for creating new file
282
  // for range-dels only output file.
283
  Status CloseOutput(const Status& curr_status,
284
                     CompactionRangeDelAggregator* range_del_agg,
285
                     const CompactionFileOpenFunc& open_file_func,
286
4.25k
                     const CompactionFileCloseFunc& close_file_func) {
287
4.25k
    Status status = curr_status;
288
    // Handle subcompaction containing only range deletions. They could
289
    // be dropped or sent to another output level, so this is only an
290
    // over-approximate check for whether opening is needed.
291
4.25k
    if (status.ok() && !HasBuilder() && !HasOutput() && range_del_agg &&
292
1.19k
        !range_del_agg->IsEmpty()) {
293
0
      status = open_file_func(*this);
294
0
    }
295
296
4.25k
    if (HasBuilder()) {
297
2.29k
      const ParsedInternalKey empty_internal_key{};
298
2.29k
      const Slice empty_key{};
299
2.29k
      Status s = close_file_func(status, empty_internal_key, empty_key,
300
2.29k
                                 nullptr /* c_iter */, *this);
301
2.29k
      if (!s.ok() && status.ok()) {
302
0
        status = s;
303
0
      }
304
2.29k
    }
305
306
4.25k
    return status;
307
4.25k
  }
308
309
  // This subcompaction's output could be empty if compaction was aborted before
310
  // this subcompaction had a chance to generate any output files. When
311
  // subcompactions are executed sequentially this is more likely and will be
312
  // particularly likely for the later subcompactions to be empty. Once they are
313
  // run in parallel however it should be much rarer.
314
  // It's caller's responsibility to make sure it's not empty.
315
15.6k
  Output& current_output() {
316
15.6k
    assert(!outputs_.empty());
317
15.6k
    return outputs_.back();
318
15.6k
  }
319
320
  const Compaction* compaction_;
321
322
  // current output builder and writer
323
  std::unique_ptr<TableBuilder> builder_;
324
  std::unique_ptr<WritableFileWriter> file_writer_;
325
  uint64_t current_output_file_size_ = 0;
326
  SequenceNumber smallest_preferred_seqno_ = kMaxSequenceNumber;
327
328
  // Sum of all the GetWorkerCPUMicros() for all the closed builders so far.
329
  uint64_t worker_cpu_micros_ = 0;
330
331
  // all the compaction outputs so far
332
  std::vector<Output> outputs_;
333
334
  // BlobDB info
335
  std::vector<BlobFileAddition> blob_file_additions_;
336
  std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_;
337
338
  // All file paths (SST and blob) created during compaction.
339
  // Used for cleanup on abort - ensures orphan files are deleted even if
340
  // they were removed from outputs_ or blob_file_additions_ (e.g., by
341
  // RemoveLastEmptyOutput when file_size is 0 because builder was abandoned).
342
  std::vector<std::string> output_file_paths_;
343
344
  // Per level's output stat
345
  InternalStats::CompactionStats stats_;
346
347
  // indicate if this CompactionOutputs obj for proximal_level, should always
348
  // be false if per_key_placement feature is not enabled.
349
  const bool is_proximal_level_;
350
351
  // partitioner information
352
  std::string last_key_for_partitioner_;
353
  std::unique_ptr<SstPartitioner> partitioner_;
354
355
  // A flag determines if this subcompaction has been split by the cursor
356
  // for RoundRobin compaction
357
  bool is_split_ = false;
358
359
  // We also maintain the output split key for each subcompaction to avoid
360
  // repetitive comparison in ShouldStopBefore()
361
  const InternalKey* local_output_split_key_ = nullptr;
362
363
  // Some identified files with old oldest ancester time and the range should be
364
  // isolated out so that the output file(s) in that range can be merged down
365
  // for TTL and clear the timestamps for the range.
366
  std::vector<FileMetaData*> files_to_cut_for_ttl_;
367
  int cur_files_to_cut_for_ttl_ = -1;
368
  int next_files_to_cut_for_ttl_ = 0;
369
370
  // An index that used to speed up ShouldStopBefore().
371
  size_t grandparent_index_ = 0;
372
373
  // if the output key is being grandparent files gap, so:
374
  //  key > grandparents[grandparent_index_ - 1].largest &&
375
  //  key < grandparents[grandparent_index_].smallest
376
  bool being_grandparent_gap_ = true;
377
378
  // The number of bytes overlapping between the current output and
379
  // grandparent files used in ShouldStopBefore().
380
  uint64_t grandparent_overlapped_bytes_ = 0;
381
382
  // A flag determines whether the key has been seen in ShouldStopBefore()
383
  bool seen_key_ = false;
384
385
  // for the current output file, how many file boundaries has it crossed,
386
  // basically number of files overlapped * 2
387
  size_t grandparent_boundary_switched_num_ = 0;
388
389
  // The smallest key of the current output file, this is set when current
390
  // output file's smallest key is a range tombstone start key.
391
  InternalKey range_tombstone_lower_bound_;
392
393
  // Used for calls to compaction->KeyRangeNotExistsBeyondOutputLevel() in
394
  // CompactionOutputs::AddRangeDels().
395
  // level_ptrs_[i] holds index of the file that was checked during the last
396
  // call to compaction->KeyRangeNotExistsBeyondOutputLevel(). This allows
397
  // future calls to the function to pick up where it left off, since each
398
  // range tombstone added to output file within each subcompaction is in
399
  // increasing key range.
400
  std::vector<size_t> level_ptrs_;
401
};
402
403
// helper struct to concatenate the last level and proximal level outputs
404
// which could be replaced by std::ranges::join_view() in c++20
405
struct OutputIterator {
406
 public:
407
  explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a,
408
                          const std::vector<CompactionOutputs::Output>& b)
409
10.0k
      : a_(a), b_(b) {
410
10.0k
    within_a = !a_.empty();
411
10.0k
    idx_ = 0;
412
10.0k
  }
413
414
10.0k
  OutputIterator begin() { return *this; }
415
416
10.0k
  OutputIterator end() { return *this; }
417
418
0
  size_t size() { return a_.size() + b_.size(); }
419
420
5.09k
  const CompactionOutputs::Output& operator*() const {
421
5.09k
    return within_a ? a_[idx_] : b_[idx_];
422
5.09k
  }
423
424
5.09k
  OutputIterator& operator++() {
425
5.09k
    idx_++;
426
5.09k
    if (within_a && idx_ >= a_.size()) {
427
0
      within_a = false;
428
0
      idx_ = 0;
429
0
    }
430
5.09k
    assert(within_a || idx_ <= b_.size());
431
5.09k
    return *this;
432
5.09k
  }
433
434
15.1k
  bool operator!=(const OutputIterator& /*rhs*/) const {
435
15.1k
    return within_a || idx_ < b_.size();
436
15.1k
  }
437
438
 private:
439
  const std::vector<CompactionOutputs::Output>& a_;
440
  const std::vector<CompactionOutputs::Output>& b_;
441
  bool within_a;
442
  size_t idx_;
443
};
444
445
}  // namespace ROCKSDB_NAMESPACE