Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/compaction/compaction_iterator.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
#pragma once
6
7
#include <algorithm>
8
#include <cinttypes>
9
#include <deque>
10
#include <string>
11
#include <unordered_set>
12
#include <vector>
13
14
#include "db/compaction/compaction.h"
15
#include "db/compaction/compaction_iteration_stats.h"
16
#include "db/merge_helper.h"
17
#include "db/pinned_iterators_manager.h"
18
#include "db/range_del_aggregator.h"
19
#include "db/snapshot_checker.h"
20
#include "options/cf_options.h"
21
#include "rocksdb/compaction_filter.h"
22
23
namespace ROCKSDB_NAMESPACE {
24
25
class BlobFileBuilder;
26
class BlobFetcher;
27
class PrefetchBufferCollection;
28
29
// A wrapper of internal iterator whose purpose is to count how
30
// many entries there are in the iterator.
31
class SequenceIterWrapper : public InternalIterator {
32
 public:
33
  SequenceIterWrapper(InternalIterator* iter, const Comparator* cmp,
34
                      bool need_count_entries)
35
18.8k
      : icmp_(cmp),
36
18.8k
        inner_iter_(iter),
37
18.8k
        need_count_entries_(need_count_entries) {}
38
313k
  bool Valid() const override { return inner_iter_->Valid(); }
39
17.8k
  Status status() const override { return inner_iter_->status(); }
40
239k
  void Next() override {
41
239k
    if (!inner_iter_->IsDeleteRangeSentinelKey()) {
42
239k
      num_itered_++;
43
239k
    }
44
239k
    inner_iter_->Next();
45
239k
  }
46
0
  void Seek(const Slice& target) override {
47
0
    if (!need_count_entries_) {
48
0
      has_num_itered_ = false;
49
0
      inner_iter_->Seek(target);
50
0
    } else {
51
      // Need to count total number of entries,
52
      // so we do Next() rather than Seek().
53
0
      while (inner_iter_->Valid() &&
54
0
             icmp_.Compare(inner_iter_->key(), target) < 0) {
55
0
        Next();
56
0
      }
57
0
    }
58
0
  }
59
239k
  Slice key() const override { return inner_iter_->key(); }
60
239k
  Slice value() const override { return inner_iter_->value(); }
61
62
  // Unused InternalIterator methods
63
0
  void SeekToFirst() override { assert(false); }
64
0
  void Prev() override { assert(false); }
65
0
  void SeekForPrev(const Slice& /* target */) override { assert(false); }
66
0
  void SeekToLast() override { assert(false); }
67
68
18.8k
  uint64_t NumItered() const { return num_itered_; }
69
3.07k
  bool HasNumItered() const { return has_num_itered_; }
70
239k
  bool IsDeleteRangeSentinelKey() const override {
71
239k
    assert(Valid());
72
239k
    return inner_iter_->IsDeleteRangeSentinelKey();
73
239k
  }
74
75
 private:
76
  InternalKeyComparator icmp_;
77
  InternalIterator* inner_iter_;  // not owned
78
  uint64_t num_itered_ = 0;
79
  bool need_count_entries_;
80
  bool has_num_itered_ = true;
81
};
82
83
class CompactionIterator {
84
 public:
85
  // A wrapper around Compaction. Has a much smaller interface, only what
86
  // CompactionIterator uses. Tests can override it.
87
  class CompactionProxy {
88
   public:
89
3.07k
    virtual ~CompactionProxy() = default;
90
91
    virtual int level() const = 0;
92
93
    virtual bool KeyNotExistsBeyondOutputLevel(
94
        const Slice& user_key, std::vector<size_t>* level_ptrs) const = 0;
95
96
    virtual bool bottommost_level() const = 0;
97
98
    virtual int number_levels() const = 0;
99
100
    // Result includes timestamp if user-defined timestamp is enabled.
101
    virtual Slice GetLargestUserKey() const = 0;
102
103
    virtual bool allow_ingest_behind() const = 0;
104
105
    virtual bool allow_mmap_reads() const = 0;
106
107
    virtual bool enable_blob_garbage_collection() const = 0;
108
109
    virtual double blob_garbage_collection_age_cutoff() const = 0;
110
111
    virtual uint64_t blob_compaction_readahead_size() const = 0;
112
113
    virtual const Version* input_version() const = 0;
114
115
    virtual bool DoesInputReferenceBlobFiles() const = 0;
116
117
    virtual const Compaction* real_compaction() const = 0;
118
119
    virtual bool SupportsPerKeyPlacement() const = 0;
120
  };
121
122
  class RealCompaction : public CompactionProxy {
123
   public:
124
    explicit RealCompaction(const Compaction* compaction)
125
3.07k
        : compaction_(compaction) {
126
3.07k
      assert(compaction_);
127
3.07k
    }
128
129
3.07k
    int level() const override { return compaction_->level(); }
130
131
    bool KeyNotExistsBeyondOutputLevel(
132
1.62k
        const Slice& user_key, std::vector<size_t>* level_ptrs) const override {
133
1.62k
      return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs);
134
1.62k
    }
135
136
3.07k
    bool bottommost_level() const override {
137
3.07k
      return compaction_->bottommost_level();
138
3.07k
    }
139
140
3.07k
    int number_levels() const override { return compaction_->number_levels(); }
141
142
    // Result includes timestamp if user-defined timestamp is enabled.
143
0
    Slice GetLargestUserKey() const override {
144
0
      return compaction_->GetLargestUserKey();
145
0
    }
146
147
4.69k
    bool allow_ingest_behind() const override {
148
4.69k
      return compaction_->immutable_options().cf_allow_ingest_behind ||
149
4.69k
             compaction_->immutable_options().allow_ingest_behind;
150
4.69k
    }
151
152
3.07k
    bool allow_mmap_reads() const override {
153
3.07k
      return compaction_->immutable_options().allow_mmap_reads;
154
3.07k
    }
155
156
3.07k
    bool enable_blob_garbage_collection() const override {
157
3.07k
      return compaction_->enable_blob_garbage_collection();
158
3.07k
    }
159
160
0
    double blob_garbage_collection_age_cutoff() const override {
161
0
      return compaction_->blob_garbage_collection_age_cutoff();
162
0
    }
163
164
3.07k
    uint64_t blob_compaction_readahead_size() const override {
165
3.07k
      return compaction_->mutable_cf_options().blob_compaction_readahead_size;
166
3.07k
    }
167
168
6.14k
    const Version* input_version() const override {
169
6.14k
      return compaction_->input_version();
170
6.14k
    }
171
172
0
    bool DoesInputReferenceBlobFiles() const override {
173
0
      return compaction_->DoesInputReferenceBlobFiles();
174
0
    }
175
176
0
    const Compaction* real_compaction() const override { return compaction_; }
177
178
0
    bool SupportsPerKeyPlacement() const override {
179
0
      return compaction_->SupportsPerKeyPlacement();
180
0
    }
181
182
   private:
183
    const Compaction* compaction_;
184
  };
185
186
  // @param must_count_input_entries Controls input entry counting accuracy vs
187
  // performance:
188
  //   - If true: `NumInputEntryScanned()` always returns the exact count of
189
  //   input keys
190
  //     scanned. The iterator will use sequential `Next()` calls instead of
191
  //     `Seek()` to maintain count accuracy as `Seek()` will not count the
192
  //     skipped input entries, which is slower but guarantees correctness.
193
  //   - If false: `NumInputEntryScanned()` returns the count only if no
194
  //   `Seek()` operations
195
  //     were performed on the input iterator. When compaction filters request
196
  //     skipping ranges of keys or other optimizations trigger seek operations,
197
  //     the count becomes unreliable. Always call `HasNumInputEntryScanned()`
198
  //     first to verify if the count is accurate before using
199
  //     `NumInputEntryScanned()`.
200
  CompactionIterator(
201
      InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper,
202
      SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots,
203
      SequenceNumber earliest_snapshot,
204
      SequenceNumber earliest_write_conflict_snapshot,
205
      SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker,
206
      Env* env, bool report_detailed_time,
207
      CompactionRangeDelAggregator* range_del_agg,
208
      BlobFileBuilder* blob_file_builder, bool allow_data_in_errors,
209
      bool enforce_single_del_contracts,
210
      const std::atomic<bool>& manual_compaction_canceled,
211
      bool must_count_input_entries, const Compaction* compaction = nullptr,
212
      const CompactionFilter* compaction_filter = nullptr,
213
      const std::atomic<bool>* shutting_down = nullptr,
214
      const std::shared_ptr<Logger> info_log = nullptr,
215
      const std::string* full_history_ts_low = nullptr,
216
      std::optional<SequenceNumber> preserve_seqno_min = {});
217
218
  // Constructor with custom CompactionProxy, used for tests.
219
  CompactionIterator(InternalIterator* input, const Comparator* cmp,
220
                     MergeHelper* merge_helper, SequenceNumber last_sequence,
221
                     std::vector<SequenceNumber>* snapshots,
222
                     SequenceNumber earliest_snapshot,
223
                     SequenceNumber earliest_write_conflict_snapshot,
224
                     SequenceNumber job_snapshot,
225
                     const SnapshotChecker* snapshot_checker, Env* env,
226
                     bool report_detailed_time,
227
                     CompactionRangeDelAggregator* range_del_agg,
228
                     BlobFileBuilder* blob_file_builder,
229
                     bool allow_data_in_errors,
230
                     bool enforce_single_del_contracts,
231
                     const std::atomic<bool>& manual_compaction_canceled,
232
                     std::unique_ptr<CompactionProxy> compaction,
233
                     bool must_count_input_entries,
234
                     const CompactionFilter* compaction_filter = nullptr,
235
                     const std::atomic<bool>* shutting_down = nullptr,
236
                     const std::shared_ptr<Logger> info_log = nullptr,
237
                     const std::string* full_history_ts_low = nullptr,
238
                     std::optional<SequenceNumber> preserve_seqno_min = {});
239
240
  ~CompactionIterator();
241
242
  void ResetRecordCounts();
243
244
  // Seek to the beginning of the compaction iterator output.
245
  //
246
  // REQUIRED: Call only once.
247
  void SeekToFirst();
248
249
  // Produces the next record in the compaction.
250
  //
251
  // REQUIRED: SeekToFirst() has been called.
252
  void Next();
253
254
  // Getters
255
41.5k
  const Slice& key() const { return key_; }
256
37.7k
  const Slice& value() const { return value_; }
257
19.7k
  const Status& status() const { return status_; }
258
39.6k
  const ParsedInternalKey& ikey() const { return ikey_; }
259
540k
  inline bool Valid() const { return validity_info_.IsValid(); }
260
0
  const Slice& user_key() const {
261
0
    if (UNLIKELY(is_range_del_)) {
262
0
      return ikey_.user_key;
263
0
    }
264
0
    return current_user_key_;
265
0
  }
266
9.76k
  const CompactionIterationStats& iter_stats() const { return iter_stats_; }
267
3.07k
  bool HasNumInputEntryScanned() const { return input_.HasNumItered(); }
268
269
  // This method should only be used when `HasNumInputEntryScanned()` returns
270
  // true, unless `must_count_input_entries=true` was specified during iterator
271
  // creation (which ensures the count is always accurate).
272
18.8k
  uint64_t NumInputEntryScanned() const { return input_.NumItered(); }
273
274
  // Returns true if the current valid key was already scanned/counted during
275
  // a lookahead operation in a previous iteration.
276
  //
277
  // REQUIRED: Valid() must be true
278
0
  bool IsCurrentKeyAlreadyScanned() const {
279
0
    assert(Valid());
280
0
    return at_next_ || merge_out_iter_.Valid();
281
0
  }
282
283
0
  Status InputStatus() const { return input_.status(); }
284
285
1.89k
  bool IsDeleteRangeSentinelKey() const { return is_range_del_; }
286
287
 private:
288
  // Processes the input stream to find the next output
289
  void NextFromInput();
290
291
  // Do final preparations before presenting the output to the callee.
292
  void PrepareOutput();
293
294
  // Passes the output value to the blob file builder (if any), and replaces it
295
  // with the corresponding blob reference if it has been actually written to a
296
  // blob file (i.e. if it passed the value size check). Returns true if the
297
  // value got extracted to a blob file, false otherwise.
298
  bool ExtractLargeValueIfNeededImpl();
299
300
  // Extracts large values as described above, and updates the internal key's
301
  // type to kTypeBlobIndex if the value got extracted. Should only be called
302
  // for regular values (kTypeValue).
303
  void ExtractLargeValueIfNeeded();
304
305
  // Relocates valid blobs residing in the oldest blob files if garbage
306
  // collection is enabled. Relocated blobs are written to new blob files or
307
  // inlined in the LSM tree depending on the current settings (i.e.
308
  // enable_blob_files and min_blob_size). Should only be called for blob
309
  // references (kTypeBlobIndex).
310
  //
311
  // Note: the stacked BlobDB implementation's compaction filter based GC
312
  // algorithm is also called from here.
313
  void GarbageCollectBlobIfNeeded();
314
315
  // Invoke compaction filter if needed.
316
  // Return true on success, false on failures (e.g.: kIOError).
317
  bool InvokeFilterIfNeeded(bool* need_skip, Slice* skip_until);
318
319
  // Given a sequence number, return the sequence number of the
320
  // earliest snapshot that this sequence number is visible in.
321
  // The snapshots themselves are arranged in ascending order of
322
  // sequence numbers.
323
  // Employ a sequential search because the total number of
324
  // snapshots are typically small.
325
  inline SequenceNumber findEarliestVisibleSnapshot(
326
      SequenceNumber in, SequenceNumber* prev_snapshot);
327
328
71.8k
  inline bool KeyCommitted(SequenceNumber sequence) {
329
71.8k
    return snapshot_checker_ == nullptr ||
330
0
           snapshot_checker_->CheckInSnapshot(sequence, job_snapshot_) ==
331
0
               SnapshotCheckerResult::kInSnapshot;
332
71.8k
  }
333
334
  bool DefinitelyInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
335
336
  bool DefinitelyNotInSnapshot(SequenceNumber seq, SequenceNumber snapshot);
337
338
  // Extract user-defined timestamp from user key if possible and compare it
339
  // with *full_history_ts_low_ if applicable.
340
71.8k
  inline void UpdateTimestampAndCompareWithFullHistoryLow() {
341
71.8k
    if (!timestamp_size_) {
342
71.8k
      return;
343
71.8k
    }
344
0
    Slice ts = ExtractTimestampFromUserKey(ikey_.user_key, timestamp_size_);
345
0
    curr_ts_.assign(ts.data(), ts.size());
346
0
    if (full_history_ts_low_) {
347
0
      cmp_with_history_ts_low_ =
348
0
          cmp_->CompareTimestamp(ts, *full_history_ts_low_);
349
0
    }
350
0
  }
351
352
  static uint64_t ComputeBlobGarbageCollectionCutoffFileNumber(
353
      const CompactionProxy* compaction);
354
  static std::unique_ptr<BlobFetcher> CreateBlobFetcherIfNeeded(
355
      const CompactionProxy* compaction);
356
  static std::unique_ptr<PrefetchBufferCollection>
357
  CreatePrefetchBufferCollectionIfNeeded(const CompactionProxy* compaction);
358
359
  SequenceIterWrapper input_;
360
  const Comparator* cmp_;
361
  MergeHelper* merge_helper_;
362
  const std::vector<SequenceNumber>* snapshots_;
363
  // List of snapshots released during compaction.
364
  // findEarliestVisibleSnapshot() find them out from return of
365
  // snapshot_checker, and make sure they will not be returned as
366
  // earliest visible snapshot of an older value.
367
  // See WritePreparedTransactionTest::ReleaseSnapshotDuringCompaction3.
368
  std::unordered_set<SequenceNumber> released_snapshots_;
369
  const SequenceNumber earliest_write_conflict_snapshot_;
370
  const SequenceNumber job_snapshot_;
371
  const SnapshotChecker* const snapshot_checker_;
372
  Env* env_;
373
  SystemClock* clock_;
374
  const bool report_detailed_time_;
375
  CompactionRangeDelAggregator* range_del_agg_;
376
  BlobFileBuilder* blob_file_builder_;
377
  std::unique_ptr<CompactionProxy> compaction_;
378
  const CompactionFilter* compaction_filter_;
379
  const std::atomic<bool>* shutting_down_;
380
  const std::atomic<bool>& manual_compaction_canceled_;
381
  const bool bottommost_level_;
382
  const bool visible_at_tip_;
383
  const SequenceNumber earliest_snapshot_;
384
385
  std::shared_ptr<Logger> info_log_;
386
387
  const bool allow_data_in_errors_;
388
389
  const bool enforce_single_del_contracts_;
390
391
  // Comes from comparator.
392
  const size_t timestamp_size_;
393
394
  // Lower bound timestamp to retain full history in terms of user-defined
395
  // timestamp. If a key's timestamp is older than full_history_ts_low_, then
396
  // the key *may* be eligible for garbage collection (GC). The skipping logic
397
  // is in `NextFromInput()` and `PrepareOutput()`.
398
  // If nullptr, NO GC will be performed and all history will be preserved.
399
  const std::string* const full_history_ts_low_;
400
401
  // State
402
  //
403
  enum ValidContext : uint8_t {
404
    kMerge1 = 0,
405
    kMerge2 = 1,
406
    kParseKeyError = 2,
407
    kCurrentKeyUncommitted = 3,
408
    kKeepSDAndClearPut = 4,
409
    kKeepTsHistory = 5,
410
    kKeepSDForConflictCheck = 6,
411
    kKeepSDForSnapshot = 7,
412
    kKeepSD = 8,
413
    kKeepDel = 9,
414
    kNewUserKey = 10,
415
    kRangeDeletion = 11,
416
    kSwapPreferredSeqno = 12,
417
  };
418
419
  struct ValidityInfo {
420
540k
    inline bool IsValid() const { return rep & 1; }
421
0
    ValidContext GetContext() const {
422
0
      return static_cast<ValidContext>(rep >> 1);
423
0
    }
424
37.7k
    inline void SetValid(uint8_t ctx) { rep = (ctx << 1) | 1; }
425
56.5k
    inline void Invalidate() { rep = 0; }
426
427
    uint8_t rep{0};
428
  } validity_info_;
429
430
  // Points to a copy of the current compaction iterator output (current_key_)
431
  // if valid.
432
  Slice key_;
433
  // Points to the value in the underlying iterator that corresponds to the
434
  // current output.
435
  Slice value_;
436
  // The status is OK unless compaction iterator encounters a merge operand
437
  // while not having a merge operator defined.
438
  Status status_;
439
  // Stores the user key, sequence number and type of the current compaction
440
  // iterator output (or current key in the underlying iterator during
441
  // NextFromInput()).
442
  ParsedInternalKey ikey_;
443
444
  // Stores whether current_user_key_ is valid. If so, current_user_key_
445
  // stores the user key of the last key seen by the iterator.
446
  // If false, treat the next key to read as a new user key.
447
  bool has_current_user_key_ = false;
448
  // If false, the iterator holds a copy of the current compaction iterator
449
  // output (or current key in the underlying iterator during NextFromInput()).
450
  bool at_next_ = false;
451
452
  // A copy of the current internal key.
453
  IterKey current_key_;
454
  Slice current_user_key_;
455
  std::string curr_ts_;
456
  SequenceNumber current_user_key_sequence_;
457
  SequenceNumber current_user_key_snapshot_;
458
459
  // True if the iterator has already returned a record for the current key.
460
  bool has_outputted_key_ = false;
461
462
  // Truncate the value of the next key and output it without applying any
463
  // compaction rules. This is an optimization for outputting a put after
464
  // a single delete. See more in `NextFromInput()` under Optimization 3.
465
  bool clear_and_output_next_key_ = false;
466
467
  MergeOutputIterator merge_out_iter_;
468
  Status merge_until_status_;
469
  // PinnedIteratorsManager used to pin input_ Iterator blocks while reading
470
  // merge operands and then releasing them after consuming them.
471
  PinnedIteratorsManager pinned_iters_mgr_;
472
473
  uint64_t blob_garbage_collection_cutoff_file_number_;
474
475
  std::unique_ptr<BlobFetcher> blob_fetcher_;
476
  std::unique_ptr<PrefetchBufferCollection> prefetch_buffers_;
477
478
  std::string blob_index_;
479
  PinnableSlice blob_value_;
480
  std::string compaction_filter_value_;
481
  InternalKey compaction_filter_skip_until_;
482
  // "level_ptrs" holds indices that remember which file of an associated
483
  // level we were last checking during the last call to compaction->
484
  // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function
485
  // to pick off where it left off since each subcompaction's key range is
486
  // increasing so a later call to the function must be looking for a key that
487
  // is in or beyond the last file checked during the previous call
488
  std::vector<size_t> level_ptrs_;
489
  CompactionIterationStats iter_stats_;
490
491
  // Used to avoid purging uncommitted values. The application can specify
492
  // uncommitted values by providing a SnapshotChecker object.
493
  bool current_key_committed_;
494
495
  // Saved result of ucmp->CompareTimestamp(current_ts_, *full_history_ts_low_)
496
  int cmp_with_history_ts_low_;
497
498
  const int level_;
499
500
  // True if the previous internal key (same user key)'s sequence number has
501
  // just been zeroed out during bottommost compaction.
502
  bool last_key_seq_zeroed_{false};
503
504
  // Max seqno that can be zeroed out at last level (various reasons)
505
  const SequenceNumber preserve_seqno_after_ = kMaxSequenceNumber;
506
507
239k
  void AdvanceInputIter() { input_.Next(); }
508
509
0
  void SkipUntil(const Slice& skip_until) { input_.Seek(skip_until); }
510
511
259k
  bool IsShuttingDown() {
512
    // This is a best-effort facility, so memory_order_relaxed is sufficient.
513
259k
    return shutting_down_ && shutting_down_->load(std::memory_order_relaxed);
514
259k
  }
515
516
295k
  bool IsPausingManualCompaction() {
517
    // This is a best-effort facility, so memory_order_relaxed is sufficient.
518
295k
    return manual_compaction_canceled_.load(std::memory_order_relaxed);
519
295k
  }
520
521
  // Stores whether the current compaction iterator output
522
  // is a range tombstone start key.
523
  bool is_range_del_{false};
524
};
525
526
inline bool CompactionIterator::DefinitelyInSnapshot(SequenceNumber seq,
527
3.52k
                                                     SequenceNumber snapshot) {
528
3.52k
  return DataIsDefinitelyInSnapshot(seq, snapshot, snapshot_checker_);
529
3.52k
}
530
531
inline bool CompactionIterator::DefinitelyNotInSnapshot(
532
0
    SequenceNumber seq, SequenceNumber snapshot) {
533
0
  return DataIsDefinitelyNotInSnapshot(seq, snapshot, snapshot_checker_);
534
0
}
535
536
}  // namespace ROCKSDB_NAMESPACE