/src/rocksdb/db/range_del_aggregator.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2018-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #pragma once |
7 | | |
8 | | #include <algorithm> |
9 | | #include <iterator> |
10 | | #include <list> |
11 | | #include <map> |
12 | | #include <set> |
13 | | #include <string> |
14 | | #include <vector> |
15 | | |
16 | | #include "db/compaction/compaction_iteration_stats.h" |
17 | | #include "db/dbformat.h" |
18 | | #include "db/pinned_iterators_manager.h" |
19 | | #include "db/range_del_aggregator.h" |
20 | | #include "db/range_tombstone_fragmenter.h" |
21 | | #include "db/version_edit.h" |
22 | | #include "rocksdb/comparator.h" |
23 | | #include "rocksdb/types.h" |
24 | | #include "table/internal_iterator.h" |
25 | | #include "table/table_builder.h" |
26 | | #include "util/heap.h" |
27 | | #include "util/kv_map.h" |
28 | | |
29 | | namespace ROCKSDB_NAMESPACE { |
30 | | |
31 | | class TruncatedRangeDelIterator { |
32 | | public: |
33 | | TruncatedRangeDelIterator( |
34 | | std::unique_ptr<FragmentedRangeTombstoneIterator> iter, |
35 | | const InternalKeyComparator* icmp, const InternalKey* smallest, |
36 | | const InternalKey* largest); |
37 | | |
38 | 0 | void SetRangeDelReadSeqno(SequenceNumber read_seqno) { |
39 | 0 | iter_->SetRangeDelReadSeqno(read_seqno); |
40 | 0 | } |
41 | | |
42 | | bool Valid() const; |
43 | | |
44 | 254k | void Next() { iter_->TopNext(); } |
45 | 0 | void Prev() { iter_->TopPrev(); } |
46 | | |
47 | 74.7M | void InternalNext() { iter_->Next(); } |
48 | | |
49 | | // Seeks to the tombstone with the highest visible sequence number that covers |
50 | | // target (a user key). If no such tombstone exists, the position will be at |
51 | | // the earliest tombstone that ends after target. |
52 | | // REQUIRES: target is a user key. |
53 | | void Seek(const Slice& target); |
54 | | |
55 | | // Seeks to the first range tombstone with end_key() > target. |
56 | | void SeekInternalKey(const Slice& target); |
57 | | |
58 | | // Seeks to the tombstone with the highest visible sequence number that covers |
59 | | // target (a user key). If no such tombstone exists, the position will be at |
60 | | // the latest tombstone that starts before target. |
61 | | void SeekForPrev(const Slice& target); |
62 | | |
63 | | void SeekToFirst(); |
64 | | void SeekToLast(); |
65 | | |
66 | 149M | ParsedInternalKey start_key() const { |
67 | 149M | return (smallest_ == nullptr || |
68 | 149M | icmp_->Compare(*smallest_, iter_->parsed_start_key()) <= 0) |
69 | 149M | ? iter_->parsed_start_key() |
70 | 149M | : *smallest_; |
71 | 149M | } |
72 | | |
73 | 75.1M | ParsedInternalKey end_key() const { |
74 | 75.1M | return (largest_ == nullptr || |
75 | 75.1M | icmp_->Compare(iter_->parsed_end_key(), *largest_) <= 0) |
76 | 75.1M | ? iter_->parsed_end_key() |
77 | 75.1M | : *largest_; |
78 | 75.1M | } |
79 | | |
80 | 149M | SequenceNumber seq() const { return iter_->seq(); } |
81 | 0 | Slice timestamp() const { |
82 | 0 | assert(icmp_->user_comparator()->timestamp_size()); |
83 | 0 | return iter_->timestamp(); |
84 | 0 | } |
85 | 2.90k | void SetTimestampUpperBound(const Slice* ts_upper_bound) { |
86 | 2.90k | iter_->SetTimestampUpperBound(ts_upper_bound); |
87 | 2.90k | } |
88 | | |
89 | | std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>> |
90 | | SplitBySnapshot(const std::vector<SequenceNumber>& snapshots); |
91 | | |
92 | 2.90k | SequenceNumber upper_bound() const { return iter_->upper_bound(); } |
93 | | |
94 | 2.90k | SequenceNumber lower_bound() const { return iter_->lower_bound(); } |
95 | | |
96 | | private: |
97 | | std::unique_ptr<FragmentedRangeTombstoneIterator> iter_; |
98 | | const InternalKeyComparator* icmp_; |
99 | | const ParsedInternalKey* smallest_ = nullptr; |
100 | | const ParsedInternalKey* largest_ = nullptr; |
101 | | std::list<ParsedInternalKey> pinned_bounds_; |
102 | | |
103 | | const InternalKey* smallest_ikey_; |
104 | | const InternalKey* largest_ikey_; |
105 | | }; |
106 | | |
107 | | struct SeqMaxComparator { |
108 | | bool operator()(const TruncatedRangeDelIterator* a, |
109 | 0 | const TruncatedRangeDelIterator* b) const { |
110 | 0 | return a->seq() > b->seq(); |
111 | 0 | } |
112 | | }; |
113 | | |
114 | | struct StartKeyMinComparator { |
115 | 25.4k | explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} |
116 | | |
117 | | bool operator()(const TruncatedRangeDelIterator* a, |
118 | 0 | const TruncatedRangeDelIterator* b) const { |
119 | 0 | return icmp->Compare(a->start_key(), b->start_key()) > 0; |
120 | 0 | } |
121 | | |
122 | | const InternalKeyComparator* icmp; |
123 | | }; |
124 | | |
125 | | class ForwardRangeDelIterator { |
126 | | public: |
127 | | explicit ForwardRangeDelIterator(const InternalKeyComparator* icmp); |
128 | | |
129 | | bool ShouldDelete(const ParsedInternalKey& parsed); |
130 | | void Invalidate(); |
131 | | |
132 | | void AddNewIter(TruncatedRangeDelIterator* iter, |
133 | 2.59k | const ParsedInternalKey& parsed) { |
134 | 2.59k | iter->Seek(parsed.user_key); |
135 | 2.59k | PushIter(iter, parsed); |
136 | 2.59k | assert(active_iters_.size() == active_seqnums_.size()); |
137 | 2.59k | } |
138 | | |
139 | 118k | size_t UnusedIdx() const { return unused_idx_; } |
140 | 2.59k | void IncUnusedIdx() { unused_idx_++; } |
141 | | |
142 | | private: |
143 | | using ActiveSeqSet = |
144 | | std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>; |
145 | | |
146 | | struct EndKeyMinComparator { |
147 | 15.9k | explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {} |
148 | | |
149 | | bool operator()(const ActiveSeqSet::const_iterator& a, |
150 | 0 | const ActiveSeqSet::const_iterator& b) const { |
151 | 0 | return icmp->Compare((*a)->end_key(), (*b)->end_key()) > 0; |
152 | 0 | } |
153 | | |
154 | | const InternalKeyComparator* icmp; |
155 | | }; |
156 | | |
157 | | void PushIter(TruncatedRangeDelIterator* iter, |
158 | 55.6k | const ParsedInternalKey& parsed) { |
159 | 55.6k | if (!iter->Valid()) { |
160 | | // The iterator has been fully consumed, so we don't need to add it to |
161 | | // either of the heaps. |
162 | 1.03k | return; |
163 | 1.03k | } |
164 | 54.6k | int cmp = icmp_->Compare(parsed, iter->start_key()); |
165 | 54.6k | if (cmp < 0) { |
166 | 3.70k | PushInactiveIter(iter); |
167 | 50.9k | } else { |
168 | 50.9k | PushActiveIter(iter); |
169 | 50.9k | } |
170 | 54.6k | } |
171 | | |
172 | 50.9k | void PushActiveIter(TruncatedRangeDelIterator* iter) { |
173 | 50.9k | auto seq_pos = active_seqnums_.insert(iter); |
174 | 50.9k | active_iters_.push(seq_pos); |
175 | 50.9k | } |
176 | | |
177 | 49.5k | TruncatedRangeDelIterator* PopActiveIter() { |
178 | 49.5k | auto active_top = active_iters_.top(); |
179 | 49.5k | auto iter = *active_top; |
180 | 49.5k | active_iters_.pop(); |
181 | 49.5k | active_seqnums_.erase(active_top); |
182 | 49.5k | return iter; |
183 | 49.5k | } |
184 | | |
185 | 3.70k | void PushInactiveIter(TruncatedRangeDelIterator* iter) { |
186 | 3.70k | inactive_iters_.push(iter); |
187 | 3.70k | } |
188 | | |
189 | 3.49k | TruncatedRangeDelIterator* PopInactiveIter() { |
190 | 3.49k | auto* iter = inactive_iters_.top(); |
191 | 3.49k | inactive_iters_.pop(); |
192 | 3.49k | return iter; |
193 | 3.49k | } |
194 | | |
195 | | const InternalKeyComparator* icmp_; |
196 | | size_t unused_idx_; |
197 | | ActiveSeqSet active_seqnums_; |
198 | | BinaryHeap<ActiveSeqSet::const_iterator, EndKeyMinComparator> active_iters_; |
199 | | BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> inactive_iters_; |
200 | | }; |
201 | | |
202 | | class ReverseRangeDelIterator { |
203 | | public: |
204 | | explicit ReverseRangeDelIterator(const InternalKeyComparator* icmp); |
205 | | |
206 | | bool ShouldDelete(const ParsedInternalKey& parsed); |
207 | | void Invalidate(); |
208 | | |
209 | | void AddNewIter(TruncatedRangeDelIterator* iter, |
210 | 0 | const ParsedInternalKey& parsed) { |
211 | 0 | iter->SeekForPrev(parsed.user_key); |
212 | 0 | PushIter(iter, parsed); |
213 | 0 | assert(active_iters_.size() == active_seqnums_.size()); |
214 | 0 | } |
215 | | |
216 | 0 | size_t UnusedIdx() const { return unused_idx_; } |
217 | 0 | void IncUnusedIdx() { unused_idx_++; } |
218 | | |
219 | | private: |
220 | | using ActiveSeqSet = |
221 | | std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>; |
222 | | |
223 | | struct EndKeyMaxComparator { |
224 | 15.9k | explicit EndKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {} |
225 | | |
226 | | bool operator()(const TruncatedRangeDelIterator* a, |
227 | 0 | const TruncatedRangeDelIterator* b) const { |
228 | 0 | return icmp->Compare(a->end_key(), b->end_key()) < 0; |
229 | 0 | } |
230 | | |
231 | | const InternalKeyComparator* icmp; |
232 | | }; |
233 | | struct StartKeyMaxComparator { |
234 | 15.9k | explicit StartKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {} |
235 | | |
236 | | bool operator()(const ActiveSeqSet::const_iterator& a, |
237 | 0 | const ActiveSeqSet::const_iterator& b) const { |
238 | 0 | return icmp->Compare((*a)->start_key(), (*b)->start_key()) < 0; |
239 | 0 | } |
240 | | |
241 | | const InternalKeyComparator* icmp; |
242 | | }; |
243 | | |
244 | | void PushIter(TruncatedRangeDelIterator* iter, |
245 | 0 | const ParsedInternalKey& parsed) { |
246 | 0 | if (!iter->Valid()) { |
247 | | // The iterator has been fully consumed, so we don't need to add it to |
248 | | // either of the heaps. |
249 | 0 | } else if (icmp_->Compare(iter->end_key(), parsed) <= 0) { |
250 | 0 | PushInactiveIter(iter); |
251 | 0 | } else { |
252 | 0 | PushActiveIter(iter); |
253 | 0 | } |
254 | 0 | } |
255 | | |
256 | 0 | void PushActiveIter(TruncatedRangeDelIterator* iter) { |
257 | 0 | auto seq_pos = active_seqnums_.insert(iter); |
258 | 0 | active_iters_.push(seq_pos); |
259 | 0 | } |
260 | | |
261 | 0 | TruncatedRangeDelIterator* PopActiveIter() { |
262 | 0 | auto active_top = active_iters_.top(); |
263 | 0 | auto iter = *active_top; |
264 | 0 | active_iters_.pop(); |
265 | 0 | active_seqnums_.erase(active_top); |
266 | 0 | return iter; |
267 | 0 | } |
268 | | |
269 | 0 | void PushInactiveIter(TruncatedRangeDelIterator* iter) { |
270 | 0 | inactive_iters_.push(iter); |
271 | 0 | } |
272 | | |
273 | 0 | TruncatedRangeDelIterator* PopInactiveIter() { |
274 | 0 | auto* iter = inactive_iters_.top(); |
275 | 0 | inactive_iters_.pop(); |
276 | 0 | return iter; |
277 | 0 | } |
278 | | |
279 | | const InternalKeyComparator* icmp_; |
280 | | size_t unused_idx_; |
281 | | ActiveSeqSet active_seqnums_; |
282 | | BinaryHeap<ActiveSeqSet::const_iterator, StartKeyMaxComparator> active_iters_; |
283 | | BinaryHeap<TruncatedRangeDelIterator*, EndKeyMaxComparator> inactive_iters_; |
284 | | }; |
285 | | |
286 | | enum class RangeDelPositioningMode { kForwardTraversal, kBackwardTraversal }; |
287 | | class RangeDelAggregator { |
288 | | public: |
289 | | explicit RangeDelAggregator(const InternalKeyComparator* icmp) |
290 | 24.6k | : icmp_(icmp) {} |
291 | 24.6k | virtual ~RangeDelAggregator() {} |
292 | | |
293 | | virtual void AddTombstones( |
294 | | std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter, |
295 | | const InternalKey* smallest = nullptr, |
296 | | const InternalKey* largest = nullptr) = 0; |
297 | | |
298 | 130k | bool ShouldDelete(const Slice& ikey, RangeDelPositioningMode mode) { |
299 | 130k | ParsedInternalKey parsed; |
300 | | |
301 | 130k | Status pik_status = |
302 | 130k | ParseInternalKey(ikey, &parsed, false /* log_err_key */); // TODO |
303 | 130k | assert(pik_status.ok()); |
304 | 130k | if (!pik_status.ok()) { |
305 | 0 | return false; |
306 | 0 | } |
307 | | |
308 | 130k | return ShouldDelete(parsed, mode); |
309 | 130k | } |
310 | | virtual bool ShouldDelete(const ParsedInternalKey& parsed, |
311 | | RangeDelPositioningMode mode) = 0; |
312 | | |
313 | | virtual void InvalidateRangeDelMapPositions() = 0; |
314 | | |
315 | | virtual bool IsEmpty() const = 0; |
316 | | |
317 | 8.21k | bool AddFile(uint64_t file_number) { |
318 | 8.21k | return files_seen_.insert(file_number).second; |
319 | 8.21k | } |
320 | | |
321 | | protected: |
322 | | class StripeRep { |
323 | | public: |
324 | | StripeRep(const InternalKeyComparator* icmp, SequenceNumber upper_bound, |
325 | | SequenceNumber lower_bound) |
326 | | : icmp_(icmp), |
327 | | forward_iter_(icmp), |
328 | | reverse_iter_(icmp), |
329 | | upper_bound_(upper_bound), |
330 | 15.9k | lower_bound_(lower_bound) {} |
331 | | |
332 | 2.90k | void AddTombstones(std::unique_ptr<TruncatedRangeDelIterator> input_iter) { |
333 | 2.90k | iters_.push_back(std::move(input_iter)); |
334 | 2.90k | } |
335 | | |
336 | 139k | bool IsEmpty() const { return iters_.empty(); } |
337 | | |
338 | | bool ShouldDelete(const ParsedInternalKey& parsed, |
339 | | RangeDelPositioningMode mode); |
340 | | |
341 | 21.0k | void Invalidate() { |
342 | 21.0k | if (!IsEmpty()) { |
343 | 2.90k | InvalidateForwardIter(); |
344 | 2.90k | InvalidateReverseIter(); |
345 | 2.90k | } |
346 | 21.0k | } |
347 | | |
348 | | // If user-defined timestamp is enabled, `start` and `end` are user keys |
349 | | // with timestamp. |
350 | | bool IsRangeOverlapped(const Slice& start, const Slice& end); |
351 | | |
352 | | private: |
353 | 118k | bool InStripe(SequenceNumber seq) const { |
354 | 118k | return lower_bound_ <= seq && seq <= upper_bound_; |
355 | 118k | } |
356 | | |
357 | 2.90k | void InvalidateForwardIter() { forward_iter_.Invalidate(); } |
358 | | |
359 | 121k | void InvalidateReverseIter() { reverse_iter_.Invalidate(); } |
360 | | |
361 | | const InternalKeyComparator* icmp_; |
362 | | std::vector<std::unique_ptr<TruncatedRangeDelIterator>> iters_; |
363 | | ForwardRangeDelIterator forward_iter_; |
364 | | ReverseRangeDelIterator reverse_iter_; |
365 | | SequenceNumber upper_bound_; |
366 | | SequenceNumber lower_bound_; |
367 | | }; |
368 | | |
369 | | const InternalKeyComparator* icmp_; |
370 | | |
371 | | private: |
372 | | std::set<uint64_t> files_seen_; |
373 | | }; |
374 | | |
375 | | class ReadRangeDelAggregator final : public RangeDelAggregator { |
376 | | public: |
377 | | ReadRangeDelAggregator(const InternalKeyComparator* icmp, |
378 | | SequenceNumber upper_bound) |
379 | | : RangeDelAggregator(icmp), |
380 | 13.0k | rep_(icmp, upper_bound, 0 /* lower_bound */) {} |
381 | 13.0k | ~ReadRangeDelAggregator() override {} |
382 | | |
383 | | using RangeDelAggregator::ShouldDelete; |
384 | | void AddTombstones( |
385 | | std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter, |
386 | | const InternalKey* smallest = nullptr, |
387 | | const InternalKey* largest = nullptr) override; |
388 | | |
389 | | bool ShouldDelete(const ParsedInternalKey& parsed, |
390 | 0 | RangeDelPositioningMode mode) final override { |
391 | 0 | if (rep_.IsEmpty()) { |
392 | 0 | return false; |
393 | 0 | } |
394 | 0 | return ShouldDeleteImpl(parsed, mode); |
395 | 0 | } |
396 | | |
397 | | bool IsRangeOverlapped(const Slice& start, const Slice& end); |
398 | | |
399 | 9.08k | void InvalidateRangeDelMapPositions() override { rep_.Invalidate(); } |
400 | | |
401 | 0 | bool IsEmpty() const override { return rep_.IsEmpty(); } |
402 | | |
403 | | private: |
404 | | StripeRep rep_; |
405 | | |
406 | | bool ShouldDeleteImpl(const ParsedInternalKey& parsed, |
407 | | RangeDelPositioningMode mode); |
408 | | }; |
409 | | |
410 | | class CompactionRangeDelAggregator : public RangeDelAggregator { |
411 | | public: |
412 | | CompactionRangeDelAggregator(const InternalKeyComparator* icmp, |
413 | | const std::vector<SequenceNumber>& snapshots, |
414 | | const std::string* full_history_ts_low = nullptr, |
415 | | const std::string* trim_ts = nullptr) |
416 | 11.6k | : RangeDelAggregator(icmp), snapshots_(&snapshots) { |
417 | 11.6k | if (full_history_ts_low) { |
418 | 2.16k | ts_upper_bound_ = *full_history_ts_low; |
419 | 2.16k | } |
420 | 11.6k | if (trim_ts) { |
421 | 2.16k | trim_ts_ = *trim_ts; |
422 | | // Range tombstone newer than `trim_ts` or `full_history_ts_low` should |
423 | | // not be considered in ShouldDelete(). |
424 | 2.16k | if (ts_upper_bound_.empty()) { |
425 | 2.16k | ts_upper_bound_ = trim_ts_; |
426 | 2.16k | } else if (!trim_ts_.empty() && icmp->user_comparator()->CompareTimestamp( |
427 | 0 | trim_ts_, ts_upper_bound_) < 0) { |
428 | 0 | ts_upper_bound_ = trim_ts_; |
429 | 0 | } |
430 | 2.16k | } |
431 | 11.6k | } |
432 | 11.6k | ~CompactionRangeDelAggregator() override {} |
433 | | |
434 | | void AddTombstones( |
435 | | std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter, |
436 | | const InternalKey* smallest = nullptr, |
437 | | const InternalKey* largest = nullptr) override; |
438 | | |
439 | | using RangeDelAggregator::ShouldDelete; |
440 | | bool ShouldDelete(const ParsedInternalKey& parsed, |
441 | | RangeDelPositioningMode mode) override; |
442 | | |
443 | | bool IsRangeOverlapped(const Slice& start, const Slice& end); |
444 | | |
445 | 9.47k | void InvalidateRangeDelMapPositions() override { |
446 | 9.47k | for (auto& rep : reps_) { |
447 | 2.90k | rep.second.Invalidate(); |
448 | 2.90k | } |
449 | 9.47k | } |
450 | | |
451 | 1.77k | bool IsEmpty() const override { |
452 | 1.77k | for (const auto& rep : reps_) { |
453 | 318 | if (!rep.second.IsEmpty()) { |
454 | 318 | return false; |
455 | 318 | } |
456 | 318 | } |
457 | 1.45k | return true; |
458 | 1.77k | } |
459 | | |
460 | | // Creates an iterator over all the range tombstones in the aggregator, for |
461 | | // use in compaction. |
462 | | // |
463 | | // NOTE: the internal key boundaries are used for optimization purposes to |
464 | | // reduce the number of tombstones that are passed to the fragmenter; they do |
465 | | // not guarantee that the resulting iterator only contains range tombstones |
466 | | // that cover keys in the provided range. If required, these bounds must be |
467 | | // enforced during iteration. |
468 | | std::unique_ptr<FragmentedRangeTombstoneIterator> NewIterator( |
469 | | const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr); |
470 | | |
471 | | private: |
472 | | std::vector<std::unique_ptr<TruncatedRangeDelIterator>> parent_iters_; |
473 | | std::map<SequenceNumber, StripeRep> reps_; |
474 | | |
475 | | const std::vector<SequenceNumber>* snapshots_; |
476 | | // min over full_history_ts_low and trim_ts_ |
477 | | Slice ts_upper_bound_{}; |
478 | | Slice trim_ts_{}; |
479 | | }; |
480 | | |
481 | | } // namespace ROCKSDB_NAMESPACE |