/src/rocksdb/table/compaction_merging_iterator.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | #include "table/compaction_merging_iterator.h" |
7 | | |
8 | | #include "db/internal_stats.h" |
9 | | |
10 | | namespace ROCKSDB_NAMESPACE { |
11 | | class CompactionMergingIterator : public InternalIterator { |
12 | | public: |
13 | | CompactionMergingIterator( |
14 | | const InternalKeyComparator* comparator, InternalIterator** children, |
15 | | int n, bool is_arena_mode, |
16 | | std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>, |
17 | | std::unique_ptr<TruncatedRangeDelIterator>**>>& |
18 | | range_tombstones, |
19 | | InternalStats* internal_stats) |
20 | 3.07k | : is_arena_mode_(is_arena_mode), |
21 | 3.07k | comparator_(comparator), |
22 | 3.07k | current_(nullptr), |
23 | 3.07k | minHeap_(CompactionHeapItemComparator(comparator_)), |
24 | 3.07k | pinned_iters_mgr_(nullptr), |
25 | 3.07k | internal_stats_(internal_stats), |
26 | 3.07k | num_sorted_runs_recorded_(0) { |
27 | 3.07k | children_.resize(n); |
28 | 12.2k | for (int i = 0; i < n; i++) { |
29 | 9.16k | children_[i].level = i; |
30 | 9.16k | children_[i].iter.Set(children[i]); |
31 | 9.16k | assert(children_[i].type == HeapItem::ITERATOR); |
32 | 9.16k | } |
33 | 3.07k | assert(range_tombstones.size() == static_cast<size_t>(n)); |
34 | 9.16k | for (auto& p : range_tombstones) { |
35 | 9.16k | range_tombstone_iters_.push_back(std::move(p.first)); |
36 | 9.16k | } |
37 | 3.07k | pinned_heap_item_.resize(n); |
38 | 12.2k | for (int i = 0; i < n; ++i) { |
39 | 9.16k | if (range_tombstones[i].second) { |
40 | | // for LevelIterator |
41 | 2.13k | *range_tombstones[i].second = &range_tombstone_iters_[i]; |
42 | 2.13k | } |
43 | 9.16k | pinned_heap_item_[i].level = i; |
44 | 9.16k | pinned_heap_item_[i].type = HeapItem::DELETE_RANGE_START; |
45 | 9.16k | } |
46 | 3.07k | if (internal_stats_) { |
47 | 3.07k | TEST_SYNC_POINT("CompactionMergingIterator::UpdateInternalStats"); |
48 | | // The size of children_ or range_tombstone_iters_ (n) should not change |
49 | | // but to be safe, we can record the size here so we decrement by the |
50 | | // correct amount at destruction time |
51 | 3.07k | num_sorted_runs_recorded_ = n; |
52 | 3.07k | internal_stats_->IncrNumRunningCompactionSortedRuns( |
53 | 3.07k | num_sorted_runs_recorded_); |
54 | 3.07k | assert(num_sorted_runs_recorded_ <= |
55 | 3.07k | internal_stats_->NumRunningCompactionSortedRuns()); |
56 | 3.07k | } |
57 | 3.07k | } |
58 | | |
59 | 5.20k | void considerStatus(const Status& s) { |
60 | 5.20k | if (!s.ok() && status_.ok()) { |
61 | 0 | status_ = s; |
62 | 0 | } |
63 | 5.20k | } |
64 | | |
65 | 3.07k | ~CompactionMergingIterator() override { |
66 | 3.07k | if (internal_stats_) { |
67 | 3.07k | assert(num_sorted_runs_recorded_ == range_tombstone_iters_.size()); |
68 | 3.07k | assert(num_sorted_runs_recorded_ <= |
69 | 3.07k | internal_stats_->NumRunningCompactionSortedRuns()); |
70 | 3.07k | internal_stats_->DecrNumRunningCompactionSortedRuns( |
71 | 3.07k | num_sorted_runs_recorded_); |
72 | 3.07k | } |
73 | | |
74 | 3.07k | range_tombstone_iters_.clear(); |
75 | | |
76 | 9.16k | for (auto& child : children_) { |
77 | 9.16k | child.iter.DeleteIter(is_arena_mode_); |
78 | 9.16k | } |
79 | 3.07k | status_.PermitUncheckedError(); |
80 | 3.07k | } |
81 | | |
82 | 13.9k | bool Valid() const override { return current_ != nullptr && status_.ok(); } |
83 | | |
84 | 4.07k | Status status() const override { return status_; } |
85 | | |
86 | | void SeekToFirst() override; |
87 | | |
88 | | void Seek(const Slice& target) override; |
89 | | |
90 | | void Next() override; |
91 | | |
92 | 6.95k | Slice key() const override { |
93 | 6.95k | assert(Valid()); |
94 | 6.95k | return current_->key(); |
95 | 6.95k | } |
96 | | |
97 | 6.95k | Slice value() const override { |
98 | 6.95k | assert(Valid()); |
99 | 6.95k | if (LIKELY(current_->type == HeapItem::ITERATOR)) { |
100 | 6.95k | return current_->iter.value(); |
101 | 6.95k | } else { |
102 | 0 | return dummy_tombstone_val; |
103 | 0 | } |
104 | 6.95k | } |
105 | | |
106 | | // Here we simply relay MayBeOutOfLowerBound/MayBeOutOfUpperBound result |
107 | | // from current child iterator. Potentially as long as one of child iterator |
108 | | // report out of bound is not possible, we know current key is within bound. |
109 | 0 | bool MayBeOutOfLowerBound() override { |
110 | 0 | assert(Valid()); |
111 | 0 | return current_->type == HeapItem::DELETE_RANGE_START || |
112 | 0 | current_->iter.MayBeOutOfLowerBound(); |
113 | 0 | } |
114 | | |
115 | 0 | IterBoundCheck UpperBoundCheckResult() override { |
116 | 0 | assert(Valid()); |
117 | 0 | return current_->type == HeapItem::DELETE_RANGE_START |
118 | 0 | ? IterBoundCheck::kUnknown |
119 | 0 | : current_->iter.UpperBoundCheckResult(); |
120 | 0 | } |
121 | | |
122 | 0 | void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { |
123 | 0 | pinned_iters_mgr_ = pinned_iters_mgr; |
124 | 0 | for (auto& child : children_) { |
125 | 0 | child.iter.SetPinnedItersMgr(pinned_iters_mgr); |
126 | 0 | } |
127 | 0 | } |
128 | | |
129 | 13.9k | bool IsDeleteRangeSentinelKey() const override { |
130 | 13.9k | assert(Valid()); |
131 | 13.9k | return current_->type == HeapItem::DELETE_RANGE_START; |
132 | 13.9k | } |
133 | | |
134 | | // Compaction uses the above subset of InternalIterator interface. |
135 | 0 | void SeekToLast() override { assert(false); } |
136 | | |
137 | 0 | void SeekForPrev(const Slice&) override { assert(false); } |
138 | | |
139 | 0 | void Prev() override { assert(false); } |
140 | | |
141 | 0 | bool NextAndGetResult(IterateResult*) override { |
142 | 0 | assert(false); |
143 | 0 | return false; |
144 | 0 | } |
145 | | |
146 | 0 | bool IsKeyPinned() const override { |
147 | 0 | assert(false); |
148 | 0 | return false; |
149 | 0 | } |
150 | | |
151 | 0 | bool IsValuePinned() const override { |
152 | 0 | assert(false); |
153 | 0 | return false; |
154 | 0 | } |
155 | | |
156 | 0 | bool PrepareValue() override { |
157 | 0 | assert(false); |
158 | 0 | return false; |
159 | 0 | } |
160 | | |
161 | | private: |
162 | | struct HeapItem { |
163 | 18.3k | HeapItem() = default; |
164 | | |
165 | | IteratorWrapper iter; |
166 | | size_t level = 0; |
167 | | std::string tombstone_str; |
168 | | enum Type { ITERATOR, DELETE_RANGE_START }; |
169 | | Type type = ITERATOR; |
170 | | |
171 | | explicit HeapItem(size_t _level, InternalIteratorBase<Slice>* _iter) |
172 | 0 | : level(_level), type(Type::ITERATOR) { |
173 | 0 | iter.Set(_iter); |
174 | 0 | } |
175 | | |
176 | 0 | void SetTombstoneForCompaction(const ParsedInternalKey&& pik) { |
177 | 0 | tombstone_str.clear(); |
178 | 0 | AppendInternalKey(&tombstone_str, pik); |
179 | 0 | } |
180 | | |
181 | 28.6k | [[nodiscard]] Slice key() const { |
182 | 28.6k | return type == ITERATOR ? iter.key() : tombstone_str; |
183 | 28.6k | } |
184 | | }; |
185 | | |
186 | | class CompactionHeapItemComparator { |
187 | | public: |
188 | | explicit CompactionHeapItemComparator( |
189 | | const InternalKeyComparator* comparator) |
190 | 3.07k | : comparator_(comparator) {} |
191 | | |
192 | 10.8k | bool operator()(HeapItem* a, HeapItem* b) const { |
193 | 10.8k | int r = comparator_->Compare(a->key(), b->key()); |
194 | | // For each file, we assume all range tombstone start keys come before |
195 | | // its file boundary sentinel key (file's meta.largest key). |
196 | | // In the case when meta.smallest = meta.largest and range tombstone start |
197 | | // key is truncated at meta.smallest, the start key will have op_type = |
198 | | // kMaxValid to make it smaller (see TruncatedRangeDelIterator |
199 | | // constructor). The following assertion validates this assumption. |
200 | 10.8k | assert(a->type == b->type || r != 0); |
201 | 10.8k | return r > 0; |
202 | 10.8k | } |
203 | | |
204 | | private: |
205 | | const InternalKeyComparator* comparator_; |
206 | | }; |
207 | | |
208 | | using CompactionMinHeap = BinaryHeap<HeapItem*, CompactionHeapItemComparator>; |
209 | | bool is_arena_mode_; |
210 | | const InternalKeyComparator* comparator_; |
211 | | // HeapItem for all child point iterators. |
212 | | std::vector<HeapItem> children_; |
213 | | // HeapItem for range tombstones. pinned_heap_item_[i] corresponds to the |
214 | | // current range tombstone from range_tombstone_iters_[i]. |
215 | | std::vector<HeapItem> pinned_heap_item_; |
216 | | // range_tombstone_iters_[i] contains range tombstones in the sorted run that |
217 | | // corresponds to children_[i]. range_tombstone_iters_[i] == |
218 | | // nullptr means the sorted run of children_[i] does not have range |
219 | | // tombstones (or the current SSTable does not have range tombstones in the |
220 | | // case of LevelIterator). |
221 | | std::vector<std::unique_ptr<TruncatedRangeDelIterator>> |
222 | | range_tombstone_iters_; |
223 | | // Used as value for range tombstone keys |
224 | | std::string dummy_tombstone_val{}; |
225 | | |
226 | | // Skip file boundary sentinel keys. |
227 | | void FindNextVisibleKey(); |
228 | | |
229 | | // top of minHeap_ |
230 | | HeapItem* current_; |
231 | | // If any of the children have non-ok status, this is one of them. |
232 | | Status status_; |
233 | | CompactionMinHeap minHeap_; |
234 | | PinnedIteratorsManager* pinned_iters_mgr_; |
235 | | InternalStats* internal_stats_; |
236 | | uint64_t num_sorted_runs_recorded_; |
237 | | // Process a child that is not in the min heap. |
238 | | // If valid, add to the min heap. Otherwise, check status. |
239 | | void AddToMinHeapOrCheckStatus(HeapItem*); |
240 | | |
241 | 10.0k | HeapItem* CurrentForward() const { |
242 | 10.0k | return !minHeap_.empty() ? minHeap_.top() : nullptr; |
243 | 10.0k | } |
244 | | |
245 | 0 | void InsertRangeTombstoneAtLevel(size_t level) { |
246 | 0 | if (range_tombstone_iters_[level]->Valid()) { |
247 | 0 | pinned_heap_item_[level].SetTombstoneForCompaction( |
248 | 0 | range_tombstone_iters_[level]->start_key()); |
249 | 0 | minHeap_.push(&pinned_heap_item_[level]); |
250 | 0 | } |
251 | 0 | } |
252 | | }; |
253 | | |
254 | 3.07k | void CompactionMergingIterator::SeekToFirst() { |
255 | 3.07k | minHeap_.clear(); |
256 | 3.07k | status_ = Status::OK(); |
257 | 9.16k | for (auto& child : children_) { |
258 | 9.16k | child.iter.SeekToFirst(); |
259 | 9.16k | AddToMinHeapOrCheckStatus(&child); |
260 | 9.16k | } |
261 | | |
262 | 12.2k | for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { |
263 | 9.16k | if (range_tombstone_iters_[i]) { |
264 | 0 | range_tombstone_iters_[i]->SeekToFirst(); |
265 | 0 | InsertRangeTombstoneAtLevel(i); |
266 | 0 | } |
267 | 9.16k | } |
268 | | |
269 | 3.07k | FindNextVisibleKey(); |
270 | 3.07k | current_ = CurrentForward(); |
271 | 3.07k | } |
272 | | |
273 | 0 | void CompactionMergingIterator::Seek(const Slice& target) { |
274 | 0 | minHeap_.clear(); |
275 | 0 | status_ = Status::OK(); |
276 | 0 | for (auto& child : children_) { |
277 | 0 | child.iter.Seek(target); |
278 | 0 | AddToMinHeapOrCheckStatus(&child); |
279 | 0 | } |
280 | |
|
281 | 0 | ParsedInternalKey pik; |
282 | 0 | ParseInternalKey(target, &pik, false /* log_err_key */) |
283 | 0 | .PermitUncheckedError(); |
284 | 0 | for (size_t i = 0; i < range_tombstone_iters_.size(); ++i) { |
285 | 0 | if (range_tombstone_iters_[i]) { |
286 | 0 | range_tombstone_iters_[i]->Seek(pik.user_key); |
287 | | // For compaction, output keys should all be after seek target. |
288 | 0 | while (range_tombstone_iters_[i]->Valid() && |
289 | 0 | comparator_->Compare(range_tombstone_iters_[i]->start_key(), pik) < |
290 | 0 | 0) { |
291 | 0 | range_tombstone_iters_[i]->Next(); |
292 | 0 | } |
293 | 0 | InsertRangeTombstoneAtLevel(i); |
294 | 0 | } |
295 | 0 | } |
296 | |
|
297 | 0 | FindNextVisibleKey(); |
298 | 0 | current_ = CurrentForward(); |
299 | 0 | } |
300 | | |
301 | 6.95k | void CompactionMergingIterator::Next() { |
302 | 6.95k | assert(Valid()); |
303 | | // For the heap modifications below to be correct, current_ must be the |
304 | | // current top of the heap. |
305 | 6.95k | assert(current_ == CurrentForward()); |
306 | | // as the current points to the current record. move the iterator forward. |
307 | 6.95k | if (current_->type == HeapItem::ITERATOR) { |
308 | 6.95k | current_->iter.Next(); |
309 | 6.95k | if (current_->iter.Valid()) { |
310 | | // current is still valid after the Next() call above. Call |
311 | | // replace_top() to restore the heap property. When the same child |
312 | | // iterator yields a sequence of keys, this is cheap. |
313 | 3.26k | assert(current_->iter.status().ok()); |
314 | 3.26k | minHeap_.replace_top(current_); |
315 | 3.68k | } else { |
316 | | // current stopped being valid, remove it from the heap. |
317 | 3.68k | considerStatus(current_->iter.status()); |
318 | 3.68k | minHeap_.pop(); |
319 | 3.68k | } |
320 | 6.95k | } else { |
321 | 0 | assert(current_->type == HeapItem::DELETE_RANGE_START); |
322 | 0 | size_t level = current_->level; |
323 | 0 | assert(range_tombstone_iters_[level]); |
324 | 0 | range_tombstone_iters_[level]->Next(); |
325 | 0 | if (range_tombstone_iters_[level]->Valid()) { |
326 | 0 | pinned_heap_item_[level].SetTombstoneForCompaction( |
327 | 0 | range_tombstone_iters_[level]->start_key()); |
328 | 0 | minHeap_.replace_top(&pinned_heap_item_[level]); |
329 | 0 | } else { |
330 | 0 | minHeap_.pop(); |
331 | 0 | } |
332 | 0 | } |
333 | 6.95k | FindNextVisibleKey(); |
334 | 6.95k | current_ = CurrentForward(); |
335 | 6.95k | } |
336 | | |
337 | 10.0k | void CompactionMergingIterator::FindNextVisibleKey() { |
338 | 12.4k | while (!minHeap_.empty()) { |
339 | 10.2k | HeapItem* current = minHeap_.top(); |
340 | | // IsDeleteRangeSentinelKey() here means file boundary sentinel keys. |
341 | 10.2k | if (current->type != HeapItem::ITERATOR || |
342 | 10.2k | !current->iter.IsDeleteRangeSentinelKey()) { |
343 | 7.88k | return; |
344 | 7.88k | } |
345 | | // range tombstone start keys from the same SSTable should have been |
346 | | // exhausted |
347 | 10.2k | assert(!range_tombstone_iters_[current->level] || |
348 | 2.39k | !range_tombstone_iters_[current->level]->Valid()); |
349 | | // current->iter is a LevelIterator, and it enters a new SST file in the |
350 | | // Next() call here. |
351 | 2.39k | current->iter.Next(); |
352 | 2.39k | if (current->iter.Valid()) { |
353 | 867 | assert(current->iter.status().ok()); |
354 | 867 | minHeap_.replace_top(current); |
355 | 1.52k | } else { |
356 | 1.52k | considerStatus(current->iter.status()); |
357 | 1.52k | minHeap_.pop(); |
358 | 1.52k | } |
359 | 2.39k | if (range_tombstone_iters_[current->level]) { |
360 | 0 | InsertRangeTombstoneAtLevel(current->level); |
361 | 0 | } |
362 | 2.39k | } |
363 | 10.0k | } |
364 | | |
365 | 9.16k | void CompactionMergingIterator::AddToMinHeapOrCheckStatus(HeapItem* child) { |
366 | 9.16k | if (child->iter.Valid()) { |
367 | 9.16k | assert(child->iter.status().ok()); |
368 | 9.16k | minHeap_.push(child); |
369 | 9.16k | } else { |
370 | 0 | considerStatus(child->iter.status()); |
371 | 0 | } |
372 | 9.16k | } |
373 | | |
374 | | InternalIterator* NewCompactionMergingIterator( |
375 | | const InternalKeyComparator* comparator, InternalIterator** children, int n, |
376 | | std::vector<std::pair<std::unique_ptr<TruncatedRangeDelIterator>, |
377 | | std::unique_ptr<TruncatedRangeDelIterator>**>>& |
378 | | range_tombstone_iters, |
379 | 3.07k | Arena* arena, InternalStats* stats) { |
380 | 3.07k | assert(n >= 0); |
381 | 3.07k | if (n == 0) { |
382 | 0 | return NewEmptyInternalIterator<Slice>(arena); |
383 | 3.07k | } else { |
384 | 3.07k | if (arena == nullptr) { |
385 | 3.07k | return new CompactionMergingIterator(comparator, children, n, |
386 | 3.07k | false /* is_arena_mode */, |
387 | 3.07k | range_tombstone_iters, stats); |
388 | 3.07k | } else { |
389 | 0 | auto mem = arena->AllocateAligned(sizeof(CompactionMergingIterator)); |
390 | 0 | return new (mem) CompactionMergingIterator(comparator, children, n, |
391 | 0 | true /* is_arena_mode */, |
392 | 0 | range_tombstone_iters, stats); |
393 | 0 | } |
394 | 3.07k | } |
395 | 3.07k | } |
396 | | } // namespace ROCKSDB_NAMESPACE |