Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/table/compaction_merging_iterator.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
#include "table/compaction_merging_iterator.h"
7
8
#include "db/internal_stats.h"
9
10
namespace ROCKSDB_NAMESPACE {
11
class CompactionMergingIterator : public InternalIterator {
12
 public:
13
  CompactionMergingIterator(
14
      const InternalKeyComparator* comparator, InternalIterator** children,
15
      int n, bool is_arena_mode,
16
      std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
17
                            std::unique_ptr<TruncatedRangeDelIterator>**>>&
18
          range_tombstones,
19
      InternalStats* internal_stats)
20
3.07k
      : is_arena_mode_(is_arena_mode),
21
3.07k
        comparator_(comparator),
22
3.07k
        current_(nullptr),
23
3.07k
        minHeap_(CompactionHeapItemComparator(comparator_)),
24
3.07k
        pinned_iters_mgr_(nullptr),
25
3.07k
        internal_stats_(internal_stats),
26
3.07k
        num_sorted_runs_recorded_(0) {
27
3.07k
    children_.resize(n);
28
12.2k
    for (int i = 0; i < n; i++) {
29
9.16k
      children_[i].level = i;
30
9.16k
      children_[i].iter.Set(children[i]);
31
9.16k
      assert(children_[i].type == HeapItem::ITERATOR);
32
9.16k
    }
33
3.07k
    assert(range_tombstones.size() == static_cast<size_t>(n));
34
9.16k
    for (auto& p : range_tombstones) {
35
9.16k
      range_tombstone_iters_.push_back(std::move(p.first));
36
9.16k
    }
37
3.07k
    pinned_heap_item_.resize(n);
38
12.2k
    for (int i = 0; i < n; ++i) {
39
9.16k
      if (range_tombstones[i].second) {
40
        // for LevelIterator
41
2.13k
        *range_tombstones[i].second = &range_tombstone_iters_[i];
42
2.13k
      }
43
9.16k
      pinned_heap_item_[i].level = i;
44
9.16k
      pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START;
45
9.16k
    }
46
3.07k
    if (internal_stats_) {
47
3.07k
      TEST_SYNC_POINT("CompactionMergingIterator::UpdateInternalStats");
48
      // The size of children_ or range_tombstone_iters_ (n) should not change
49
      // but to be safe, we can record the size here so we decrement by the
50
      // correct amount at destruction time
51
3.07k
      num_sorted_runs_recorded_ = n;
52
3.07k
      internal_stats_->IncrNumRunningCompactionSortedRuns(
53
3.07k
          num_sorted_runs_recorded_);
54
3.07k
      assert(num_sorted_runs_recorded_ <=
55
3.07k
             internal_stats_->NumRunningCompactionSortedRuns());
56
3.07k
    }
57
3.07k
  }
58
59
5.20k
  void considerStatus(const Status& s) {
60
5.20k
    if (!s.ok() && status_.ok()) {
61
0
      status_ = s;
62
0
    }
63
5.20k
  }
64
65
3.07k
  ~CompactionMergingIterator() override {
66
3.07k
    if (internal_stats_) {
67
3.07k
      assert(num_sorted_runs_recorded_ == range_tombstone_iters_.size());
68
3.07k
      assert(num_sorted_runs_recorded_ <=
69
3.07k
             internal_stats_->NumRunningCompactionSortedRuns());
70
3.07k
      internal_stats_->DecrNumRunningCompactionSortedRuns(
71
3.07k
          num_sorted_runs_recorded_);
72
3.07k
    }
73
74
3.07k
    range_tombstone_iters_.clear();
75
76
9.16k
    for (auto& child : children_) {
77
9.16k
      child.iter.DeleteIter(is_arena_mode_);
78
9.16k
    }
79
3.07k
    status_.PermitUncheckedError();
80
3.07k
  }
81
82
13.9k
  bool Valid() const override { return current_ != nullptr && status_.ok(); }
83
84
4.07k
  Status status() const override { return status_; }
85
86
  void SeekToFirst() override;
87
88
  void Seek(const Slice& target) override;
89
90
  void Next() override;
91
92
6.95k
  Slice key() const override {
93
6.95k
    assert(Valid());
94
6.95k
    return current_->key();
95
6.95k
  }
96
97
6.95k
  Slice value() const override {
98
6.95k
    assert(Valid());
99
6.95k
    if (LIKELY(current_->type == HeapItem::ITERATOR)) {
100
6.95k
      return current_->iter.value();
101
6.95k
    } else {
102
0
      return dummy_tombstone_val;
103
0
    }
104
6.95k
  }
105
106
  // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result
107
  // from current child iterator. Potentially as long as one of child iterator
108
  // report out of bound is not possible, we know current key is within bound.
109
0
  bool MayBeOutOfLowerBound() override {
110
0
    assert(Valid());
111
0
    return current_->type == HeapItem::DELETE_RANGE_START ||
112
0
           current_->iter.MayBeOutOfLowerBound();
113
0
  }
114
115
0
  IterBoundCheck UpperBoundCheckResult() override {
116
0
    assert(Valid());
117
0
    return current_->type == HeapItem::DELETE_RANGE_START
118
0
               ? IterBoundCheck::kUnknown
119
0
               : current_->iter.UpperBoundCheckResult();
120
0
  }
121
122
0
  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
123
0
    pinned_iters_mgr_ = pinned_iters_mgr;
124
0
    for (auto& child : children_) {
125
0
      child.iter.SetPinnedItersMgr(pinned_iters_mgr);
126
0
    }
127
0
  }
128
129
13.9k
  bool IsDeleteRangeSentinelKey() const override {
130
13.9k
    assert(Valid());
131
13.9k
    return current_->type == HeapItem::DELETE_RANGE_START;
132
13.9k
  }
133
134
  // Compaction uses the above subset of InternalIterator interface.
135
0
  void SeekToLast() override { assert(false); }
136
137
0
  void SeekForPrev(const Slice&) override { assert(false); }
138
139
0
  void Prev() override { assert(false); }
140
141
0
  bool NextAndGetResult(IterateResult*) override {
142
0
    assert(false);
143
0
    return false;
144
0
  }
145
146
0
  bool IsKeyPinned() const override {
147
0
    assert(false);
148
0
    return false;
149
0
  }
150
151
0
  bool IsValuePinned() const override {
152
0
    assert(false);
153
0
    return false;
154
0
  }
155
156
0
  bool PrepareValue() override {
157
0
    assert(false);
158
0
    return false;
159
0
  }
160
161
 private:
162
  struct HeapItem {
163
18.3k
    HeapItem() = default;
164
165
    IteratorWrapper iter;
166
    size_t level = 0;
167
    std::string tombstone_str;
168
    enum Type { ITERATOR, DELETE_RANGE_START };
169
    Type type = ITERATOR;
170
171
    explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter)
172
0
        : level(_level), type(Type::ITERATOR) {
173
0
      iter.Set(_iter);
174
0
    }
175
176
0
    void SetTombstoneForCompaction(const ParsedInternalKey&& pik) {
177
0
      tombstone_str.clear();
178
0
      AppendInternalKey(&tombstone_str, pik);
179
0
    }
180
181
28.6k
    [[nodiscard]] Slice key() const {
182
28.6k
      return type == ITERATOR ? iter.key() : tombstone_str;
183
28.6k
    }
184
  };
185
186
  class CompactionHeapItemComparator {
187
   public:
188
    explicit CompactionHeapItemComparator(
189
        const InternalKeyComparator* comparator)
190
3.07k
        : comparator_(comparator) {}
191
192
10.8k
    bool operator()(HeapItem* a, HeapItem* b) const {
193
10.8k
      int r = comparator_->Compare(a->key(), b->key());
194
      // For each file, we assume all range tombstone start keys come before
195
      // its file boundary sentinel key (file's meta.largest key).
196
      // In the case when meta.smallest = meta.largest and range tombstone start
197
      // key is truncated at meta.smallest, the start key will have op_type =
198
      // kMaxValid to make it smaller (see TruncatedRangeDelIterator
199
      // constructor). The following assertion validates this assumption.
200
10.8k
      assert(a->type == b->type || r != 0);
201
10.8k
      return r > 0;
202
10.8k
    }
203
204
   private:
205
    const InternalKeyComparator* comparator_;
206
  };
207
208
  using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>;
209
  bool is_arena_mode_;
210
  const InternalKeyComparator* comparator_;
211
  // HeapItem for all child point iterators.
212
  std::vector<HeapItem> children_;
213
  // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the
214
  // current range tombstone from range_tombstone_iters_[i].
215
  std::vector<HeapItem> pinned_heap_item_;
216
  // range_tombstone_iters_[i] contains range tombstones in the sorted run that
217
  // corresponds to children_[i]. range_tombstone_iters_[i] ==
218
  // nullptr means the sorted run of children_[i] does not have range
219
  // tombstones (or the current SSTable does not have range tombstones in the
220
  // case of LevelIterator).
221
  std::vector<std::unique_ptr<TruncatedRangeDelIterator>>
222
      range_tombstone_iters_;
223
  // Used as value for range tombstone keys
224
  std::string dummy_tombstone_val{};
225
226
  // Skip file boundary sentinel keys.
227
  void FindNextVisibleKey();
228
229
  // top of minHeap_
230
  HeapItem* current_;
231
  // If any of the children have non-ok status, this is one of them.
232
  Status status_;
233
  CompactionMinHeap minHeap_;
234
  PinnedIteratorsManager* pinned_iters_mgr_;
235
  InternalStats* internal_stats_;
236
  uint64_t num_sorted_runs_recorded_;
237
  // Process a child that is not in the min heap.
238
  // If valid, add to the min heap. Otherwise, check status.
239
  void AddToMinHeapOrCheckStatus(HeapItem*);
240
241
10.0k
  HeapItem* CurrentForward() const {
242
10.0k
    return !minHeap_.empty() ? minHeap_.top() : nullptr;
243
10.0k
  }
244
245
0
  void InsertRangeTombstoneAtLevel(size_t level) {
246
0
    if (range_tombstone_iters_[level]->Valid()) {
247
0
      pinned_heap_item_[level].SetTombstoneForCompaction(
248
0
          range_tombstone_iters_[level]->start_key());
249
0
      minHeap_.push(&pinned_heap_item_[level]);
250
0
    }
251
0
  }
252
};
253
254
3.07k
void CompactionMergingIterator::SeekToFirst() {
255
3.07k
  minHeap_.clear();
256
3.07k
  status_ = Status::OK();
257
9.16k
  for (auto& child : children_) {
258
9.16k
    child.iter.SeekToFirst();
259
9.16k
    AddToMinHeapOrCheckStatus(&child);
260
9.16k
  }
261
262
12.2k
  for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
263
9.16k
    if (range_tombstone_iters_[i]) {
264
0
      range_tombstone_iters_[i]->SeekToFirst();
265
0
      InsertRangeTombstoneAtLevel(i);
266
0
    }
267
9.16k
  }
268
269
3.07k
  FindNextVisibleKey();
270
3.07k
  current_ = CurrentForward();
271
3.07k
}
272
273
0
void CompactionMergingIterator::Seek(const Slice& target) {
274
0
  minHeap_.clear();
275
0
  status_ = Status::OK();
276
0
  for (auto& child : children_) {
277
0
    child.iter.Seek(target);
278
0
    AddToMinHeapOrCheckStatus(&child);
279
0
  }
280
281
0
  ParsedInternalKey pik;
282
0
  ParseInternalKey(target, &pik, false /* log_err_key */)
283
0
      .PermitUncheckedError();
284
0
  for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) {
285
0
    if (range_tombstone_iters_[i]) {
286
0
      range_tombstone_iters_[i]->Seek(pik.user_key);
287
      // For compaction, output keys should all be after seek target.
288
0
      while (range_tombstone_iters_[i]->Valid() &&
289
0
             comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) <
290
0
                 0) {
291
0
        range_tombstone_iters_[i]->Next();
292
0
      }
293
0
      InsertRangeTombstoneAtLevel(i);
294
0
    }
295
0
  }
296
297
0
  FindNextVisibleKey();
298
0
  current_ = CurrentForward();
299
0
}
300
301
6.95k
void CompactionMergingIterator::Next() {
302
6.95k
  assert(Valid());
303
  // For the heap modifications below to be correct, current_ must be the
304
  // current top of the heap.
305
6.95k
  assert(current_ == CurrentForward());
306
  // as the current points to the current record. move the iterator forward.
307
6.95k
  if (current_->type == HeapItem::ITERATOR) {
308
6.95k
    current_->iter.Next();
309
6.95k
    if (current_->iter.Valid()) {
310
      // current is still valid after the Next() call above.  Call
311
      // replace_top() to restore the heap property.  When the same child
312
      // iterator yields a sequence of keys, this is cheap.
313
3.26k
      assert(current_->iter.status().ok());
314
3.26k
      minHeap_.replace_top(current_);
315
3.68k
    } else {
316
      // current stopped being valid, remove it from the heap.
317
3.68k
      considerStatus(current_->iter.status());
318
3.68k
      minHeap_.pop();
319
3.68k
    }
320
6.95k
  } else {
321
0
    assert(current_->type == HeapItem::DELETE_RANGE_START);
322
0
    size_t level = current_->level;
323
0
    assert(range_tombstone_iters_[level]);
324
0
    range_tombstone_iters_[level]->Next();
325
0
    if (range_tombstone_iters_[level]->Valid()) {
326
0
      pinned_heap_item_[level].SetTombstoneForCompaction(
327
0
          range_tombstone_iters_[level]->start_key());
328
0
      minHeap_.replace_top(&pinned_heap_item_[level]);
329
0
    } else {
330
0
      minHeap_.pop();
331
0
    }
332
0
  }
333
6.95k
  FindNextVisibleKey();
334
6.95k
  current_ = CurrentForward();
335
6.95k
}
336
337
10.0k
void CompactionMergingIterator::FindNextVisibleKey() {
338
12.4k
  while (!minHeap_.empty()) {
339
10.2k
    HeapItem* current = minHeap_.top();
340
    // IsDeleteRangeSentinelKey() here means file boundary sentinel keys.
341
10.2k
    if (current->type != HeapItem::ITERATOR ||
342
10.2k
        !current->iter.IsDeleteRangeSentinelKey()) {
343
7.88k
      return;
344
7.88k
    }
345
    // range tombstone start keys from the same SSTable should have been
346
    // exhausted
347
10.2k
    assert(!range_tombstone_iters_[current->level] ||
348
2.39k
           !range_tombstone_iters_[current->level]->Valid());
349
    // current->iter is a LevelIterator, and it enters a new SST file in the
350
    // Next() call here.
351
2.39k
    current->iter.Next();
352
2.39k
    if (current->iter.Valid()) {
353
867
      assert(current->iter.status().ok());
354
867
      minHeap_.replace_top(current);
355
1.52k
    } else {
356
1.52k
      considerStatus(current->iter.status());
357
1.52k
      minHeap_.pop();
358
1.52k
    }
359
2.39k
    if (range_tombstone_iters_[current->level]) {
360
0
      InsertRangeTombstoneAtLevel(current->level);
361
0
    }
362
2.39k
  }
363
10.0k
}
364
365
9.16k
void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) {
366
9.16k
  if (child->iter.Valid()) {
367
9.16k
    assert(child->iter.status().ok());
368
9.16k
    minHeap_.push(child);
369
9.16k
  } else {
370
0
    considerStatus(child->iter.status());
371
0
  }
372
9.16k
}
373
374
InternalIterator* NewCompactionMergingIterator(
375
    const InternalKeyComparator* comparator, InternalIterator** children, int n,
376
    std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>,
377
                          std::unique_ptr<TruncatedRangeDelIterator>**>>&
378
        range_tombstone_iters,
379
3.07k
    Arena* arena, InternalStats* stats) {
380
3.07k
  assert(n >= 0);
381
3.07k
  if (n == 0) {
382
0
    return NewEmptyInternalIterator<Slice>(arena);
383
3.07k
  } else {
384
3.07k
    if (arena == nullptr) {
385
3.07k
      return new CompactionMergingIterator(comparator, children, n,
386
3.07k
                                           false /* is_arena_mode */,
387
3.07k
                                           range_tombstone_iters, stats);
388
3.07k
    } else {
389
0
      auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator));
390
0
      return new (mem) CompactionMergingIterator(comparator, children, n,
391
0
                                                 true /* is_arena_mode */,
392
0
                                                 range_tombstone_iters, stats);
393
0
    }
394
3.07k
  }
395
3.07k
}
396
}  // namespace ROCKSDB_NAMESPACE