Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/db/range_del_aggregator.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2018-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <algorithm>
9
#include <iterator>
10
#include <list>
11
#include <map>
12
#include <set>
13
#include <string>
14
#include <vector>
15
16
#include "db/compaction/compaction_iteration_stats.h"
17
#include "db/dbformat.h"
18
#include "db/pinned_iterators_manager.h"
19
#include "db/range_del_aggregator.h"
20
#include "db/range_tombstone_fragmenter.h"
21
#include "db/version_edit.h"
22
#include "rocksdb/comparator.h"
23
#include "rocksdb/types.h"
24
#include "table/internal_iterator.h"
25
#include "table/table_builder.h"
26
#include "util/heap.h"
27
#include "util/kv_map.h"
28
29
namespace ROCKSDB_NAMESPACE {
30
31
class TruncatedRangeDelIterator {
32
 public:
33
  TruncatedRangeDelIterator(
34
      std::unique_ptr<FragmentedRangeTombstoneIterator> iter,
35
      const InternalKeyComparator* icmp, const InternalKey* smallest,
36
      const InternalKey* largest);
37
38
0
  void SetRangeDelReadSeqno(SequenceNumber read_seqno) {
39
0
    iter_->SetRangeDelReadSeqno(read_seqno);
40
0
  }
41
42
  bool Valid() const;
43
44
254k
  void Next() { iter_->TopNext(); }
45
0
  void Prev() { iter_->TopPrev(); }
46
47
74.7M
  void InternalNext() { iter_->Next(); }
48
49
  // Seeks to the tombstone with the highest visible sequence number that covers
50
  // target (a user key). If no such tombstone exists, the position will be at
51
  // the earliest tombstone that ends after target.
52
  // REQUIRES: target is a user key.
53
  void Seek(const Slice& target);
54
55
  // Seeks to the first range tombstone with end_key() > target.
56
  void SeekInternalKey(const Slice& target);
57
58
  // Seeks to the tombstone with the highest visible sequence number that covers
59
  // target (a user key). If no such tombstone exists, the position will be at
60
  // the latest tombstone that starts before target.
61
  void SeekForPrev(const Slice& target);
62
63
  void SeekToFirst();
64
  void SeekToLast();
65
66
149M
  ParsedInternalKey start_key() const {
67
149M
    return (smallest_ == nullptr ||
68
149M
            icmp_->Compare(*smallest_, iter_->parsed_start_key()) <= 0)
69
149M
               ? iter_->parsed_start_key()
70
149M
               : *smallest_;
71
149M
  }
72
73
75.1M
  ParsedInternalKey end_key() const {
74
75.1M
    return (largest_ == nullptr ||
75
75.1M
            icmp_->Compare(iter_->parsed_end_key(), *largest_) <= 0)
76
75.1M
               ? iter_->parsed_end_key()
77
75.1M
               : *largest_;
78
75.1M
  }
79
80
149M
  SequenceNumber seq() const { return iter_->seq(); }
81
0
  Slice timestamp() const {
82
0
    assert(icmp_->user_comparator()->timestamp_size());
83
0
    return iter_->timestamp();
84
0
  }
85
2.90k
  void SetTimestampUpperBound(const Slice* ts_upper_bound) {
86
2.90k
    iter_->SetTimestampUpperBound(ts_upper_bound);
87
2.90k
  }
88
89
  std::map<SequenceNumber, std::unique_ptr<TruncatedRangeDelIterator>>
90
  SplitBySnapshot(const std::vector<SequenceNumber>& snapshots);
91
92
2.90k
  SequenceNumber upper_bound() const { return iter_->upper_bound(); }
93
94
2.90k
  SequenceNumber lower_bound() const { return iter_->lower_bound(); }
95
96
 private:
97
  std::unique_ptr<FragmentedRangeTombstoneIterator> iter_;
98
  const InternalKeyComparator* icmp_;
99
  const ParsedInternalKey* smallest_ = nullptr;
100
  const ParsedInternalKey* largest_ = nullptr;
101
  std::list<ParsedInternalKey> pinned_bounds_;
102
103
  const InternalKey* smallest_ikey_;
104
  const InternalKey* largest_ikey_;
105
};
106
107
struct SeqMaxComparator {
108
  bool operator()(const TruncatedRangeDelIterator* a,
109
0
                  const TruncatedRangeDelIterator* b) const {
110
0
    return a->seq() > b->seq();
111
0
  }
112
};
113
114
struct StartKeyMinComparator {
115
25.4k
  explicit StartKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
116
117
  bool operator()(const TruncatedRangeDelIterator* a,
118
0
                  const TruncatedRangeDelIterator* b) const {
119
0
    return icmp->Compare(a->start_key(), b->start_key()) > 0;
120
0
  }
121
122
  const InternalKeyComparator* icmp;
123
};
124
125
class ForwardRangeDelIterator {
126
 public:
127
  explicit ForwardRangeDelIterator(const InternalKeyComparator* icmp);
128
129
  bool ShouldDelete(const ParsedInternalKey& parsed);
130
  void Invalidate();
131
132
  void AddNewIter(TruncatedRangeDelIterator* iter,
133
2.59k
                  const ParsedInternalKey& parsed) {
134
2.59k
    iter->Seek(parsed.user_key);
135
2.59k
    PushIter(iter, parsed);
136
2.59k
    assert(active_iters_.size() == active_seqnums_.size());
137
2.59k
  }
138
139
118k
  size_t UnusedIdx() const { return unused_idx_; }
140
2.59k
  void IncUnusedIdx() { unused_idx_++; }
141
142
 private:
143
  using ActiveSeqSet =
144
      std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
145
146
  struct EndKeyMinComparator {
147
15.9k
    explicit EndKeyMinComparator(const InternalKeyComparator* c) : icmp(c) {}
148
149
    bool operator()(const ActiveSeqSet::const_iterator& a,
150
0
                    const ActiveSeqSet::const_iterator& b) const {
151
0
      return icmp->Compare((*a)->end_key(), (*b)->end_key()) > 0;
152
0
    }
153
154
    const InternalKeyComparator* icmp;
155
  };
156
157
  void PushIter(TruncatedRangeDelIterator* iter,
158
55.6k
                const ParsedInternalKey& parsed) {
159
55.6k
    if (!iter->Valid()) {
160
      // The iterator has been fully consumed, so we don't need to add it to
161
      // either of the heaps.
162
1.03k
      return;
163
1.03k
    }
164
54.6k
    int cmp = icmp_->Compare(parsed, iter->start_key());
165
54.6k
    if (cmp < 0) {
166
3.70k
      PushInactiveIter(iter);
167
50.9k
    } else {
168
50.9k
      PushActiveIter(iter);
169
50.9k
    }
170
54.6k
  }
171
172
50.9k
  void PushActiveIter(TruncatedRangeDelIterator* iter) {
173
50.9k
    auto seq_pos = active_seqnums_.insert(iter);
174
50.9k
    active_iters_.push(seq_pos);
175
50.9k
  }
176
177
49.5k
  TruncatedRangeDelIterator* PopActiveIter() {
178
49.5k
    auto active_top = active_iters_.top();
179
49.5k
    auto iter = *active_top;
180
49.5k
    active_iters_.pop();
181
49.5k
    active_seqnums_.erase(active_top);
182
49.5k
    return iter;
183
49.5k
  }
184
185
3.70k
  void PushInactiveIter(TruncatedRangeDelIterator* iter) {
186
3.70k
    inactive_iters_.push(iter);
187
3.70k
  }
188
189
3.49k
  TruncatedRangeDelIterator* PopInactiveIter() {
190
3.49k
    auto* iter = inactive_iters_.top();
191
3.49k
    inactive_iters_.pop();
192
3.49k
    return iter;
193
3.49k
  }
194
195
  const InternalKeyComparator* icmp_;
196
  size_t unused_idx_;
197
  ActiveSeqSet active_seqnums_;
198
  BinaryHeap<ActiveSeqSet::const_iterator, EndKeyMinComparator> active_iters_;
199
  BinaryHeap<TruncatedRangeDelIterator*, StartKeyMinComparator> inactive_iters_;
200
};
201
202
class ReverseRangeDelIterator {
203
 public:
204
  explicit ReverseRangeDelIterator(const InternalKeyComparator* icmp);
205
206
  bool ShouldDelete(const ParsedInternalKey& parsed);
207
  void Invalidate();
208
209
  void AddNewIter(TruncatedRangeDelIterator* iter,
210
0
                  const ParsedInternalKey& parsed) {
211
0
    iter->SeekForPrev(parsed.user_key);
212
0
    PushIter(iter, parsed);
213
0
    assert(active_iters_.size() == active_seqnums_.size());
214
0
  }
215
216
0
  size_t UnusedIdx() const { return unused_idx_; }
217
0
  void IncUnusedIdx() { unused_idx_++; }
218
219
 private:
220
  using ActiveSeqSet =
221
      std::multiset<TruncatedRangeDelIterator*, SeqMaxComparator>;
222
223
  struct EndKeyMaxComparator {
224
15.9k
    explicit EndKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {}
225
226
    bool operator()(const TruncatedRangeDelIterator* a,
227
0
                    const TruncatedRangeDelIterator* b) const {
228
0
      return icmp->Compare(a->end_key(), b->end_key()) < 0;
229
0
    }
230
231
    const InternalKeyComparator* icmp;
232
  };
233
  struct StartKeyMaxComparator {
234
15.9k
    explicit StartKeyMaxComparator(const InternalKeyComparator* c) : icmp(c) {}
235
236
    bool operator()(const ActiveSeqSet::const_iterator& a,
237
0
                    const ActiveSeqSet::const_iterator& b) const {
238
0
      return icmp->Compare((*a)->start_key(), (*b)->start_key()) < 0;
239
0
    }
240
241
    const InternalKeyComparator* icmp;
242
  };
243
244
  void PushIter(TruncatedRangeDelIterator* iter,
245
0
                const ParsedInternalKey& parsed) {
246
0
    if (!iter->Valid()) {
247
      // The iterator has been fully consumed, so we don't need to add it to
248
      // either of the heaps.
249
0
    } else if (icmp_->Compare(iter->end_key(), parsed) <= 0) {
250
0
      PushInactiveIter(iter);
251
0
    } else {
252
0
      PushActiveIter(iter);
253
0
    }
254
0
  }
255
256
0
  void PushActiveIter(TruncatedRangeDelIterator* iter) {
257
0
    auto seq_pos = active_seqnums_.insert(iter);
258
0
    active_iters_.push(seq_pos);
259
0
  }
260
261
0
  TruncatedRangeDelIterator* PopActiveIter() {
262
0
    auto active_top = active_iters_.top();
263
0
    auto iter = *active_top;
264
0
    active_iters_.pop();
265
0
    active_seqnums_.erase(active_top);
266
0
    return iter;
267
0
  }
268
269
0
  void PushInactiveIter(TruncatedRangeDelIterator* iter) {
270
0
    inactive_iters_.push(iter);
271
0
  }
272
273
0
  TruncatedRangeDelIterator* PopInactiveIter() {
274
0
    auto* iter = inactive_iters_.top();
275
0
    inactive_iters_.pop();
276
0
    return iter;
277
0
  }
278
279
  const InternalKeyComparator* icmp_;
280
  size_t unused_idx_;
281
  ActiveSeqSet active_seqnums_;
282
  BinaryHeap<ActiveSeqSet::const_iterator, StartKeyMaxComparator> active_iters_;
283
  BinaryHeap<TruncatedRangeDelIterator*, EndKeyMaxComparator> inactive_iters_;
284
};
285
286
enum class RangeDelPositioningMode { kForwardTraversal, kBackwardTraversal };
287
class RangeDelAggregator {
288
 public:
289
  explicit RangeDelAggregator(const InternalKeyComparator* icmp)
290
24.6k
      : icmp_(icmp) {}
291
24.6k
  virtual ~RangeDelAggregator() {}
292
293
  virtual void AddTombstones(
294
      std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
295
      const InternalKey* smallest = nullptr,
296
      const InternalKey* largest = nullptr) = 0;
297
298
130k
  bool ShouldDelete(const Slice& ikey, RangeDelPositioningMode mode) {
299
130k
    ParsedInternalKey parsed;
300
301
130k
    Status pik_status =
302
130k
        ParseInternalKey(ikey, &parsed, false /* log_err_key */);  // TODO
303
130k
    assert(pik_status.ok());
304
130k
    if (!pik_status.ok()) {
305
0
      return false;
306
0
    }
307
308
130k
    return ShouldDelete(parsed, mode);
309
130k
  }
310
  virtual bool ShouldDelete(const ParsedInternalKey& parsed,
311
                            RangeDelPositioningMode mode) = 0;
312
313
  virtual void InvalidateRangeDelMapPositions() = 0;
314
315
  virtual bool IsEmpty() const = 0;
316
317
8.21k
  bool AddFile(uint64_t file_number) {
318
8.21k
    return files_seen_.insert(file_number).second;
319
8.21k
  }
320
321
 protected:
322
  class StripeRep {
323
   public:
324
    StripeRep(const InternalKeyComparator* icmp, SequenceNumber upper_bound,
325
              SequenceNumber lower_bound)
326
        : icmp_(icmp),
327
          forward_iter_(icmp),
328
          reverse_iter_(icmp),
329
          upper_bound_(upper_bound),
330
15.9k
          lower_bound_(lower_bound) {}
331
332
2.90k
    void AddTombstones(std::unique_ptr<TruncatedRangeDelIterator> input_iter) {
333
2.90k
      iters_.push_back(std::move(input_iter));
334
2.90k
    }
335
336
139k
    bool IsEmpty() const { return iters_.empty(); }
337
338
    bool ShouldDelete(const ParsedInternalKey& parsed,
339
                      RangeDelPositioningMode mode);
340
341
21.0k
    void Invalidate() {
342
21.0k
      if (!IsEmpty()) {
343
2.90k
        InvalidateForwardIter();
344
2.90k
        InvalidateReverseIter();
345
2.90k
      }
346
21.0k
    }
347
348
    // If user-defined timestamp is enabled, `start` and `end` are user keys
349
    // with timestamp.
350
    bool IsRangeOverlapped(const Slice& start, const Slice& end);
351
352
   private:
353
118k
    bool InStripe(SequenceNumber seq) const {
354
118k
      return lower_bound_ <= seq && seq <= upper_bound_;
355
118k
    }
356
357
2.90k
    void InvalidateForwardIter() { forward_iter_.Invalidate(); }
358
359
121k
    void InvalidateReverseIter() { reverse_iter_.Invalidate(); }
360
361
    const InternalKeyComparator* icmp_;
362
    std::vector<std::unique_ptr<TruncatedRangeDelIterator>> iters_;
363
    ForwardRangeDelIterator forward_iter_;
364
    ReverseRangeDelIterator reverse_iter_;
365
    SequenceNumber upper_bound_;
366
    SequenceNumber lower_bound_;
367
  };
368
369
  const InternalKeyComparator* icmp_;
370
371
 private:
372
  std::set<uint64_t> files_seen_;
373
};
374
375
class ReadRangeDelAggregator final : public RangeDelAggregator {
376
 public:
377
  ReadRangeDelAggregator(const InternalKeyComparator* icmp,
378
                         SequenceNumber upper_bound)
379
      : RangeDelAggregator(icmp),
380
13.0k
        rep_(icmp, upper_bound, 0 /* lower_bound */) {}
381
13.0k
  ~ReadRangeDelAggregator() override {}
382
383
  using RangeDelAggregator::ShouldDelete;
384
  void AddTombstones(
385
      std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
386
      const InternalKey* smallest = nullptr,
387
      const InternalKey* largest = nullptr) override;
388
389
  bool ShouldDelete(const ParsedInternalKey& parsed,
390
0
                    RangeDelPositioningMode mode) final override {
391
0
    if (rep_.IsEmpty()) {
392
0
      return false;
393
0
    }
394
0
    return ShouldDeleteImpl(parsed, mode);
395
0
  }
396
397
  bool IsRangeOverlapped(const Slice& start, const Slice& end);
398
399
9.08k
  void InvalidateRangeDelMapPositions() override { rep_.Invalidate(); }
400
401
0
  bool IsEmpty() const override { return rep_.IsEmpty(); }
402
403
 private:
404
  StripeRep rep_;
405
406
  bool ShouldDeleteImpl(const ParsedInternalKey& parsed,
407
                        RangeDelPositioningMode mode);
408
};
409
410
class CompactionRangeDelAggregator : public RangeDelAggregator {
411
 public:
412
  CompactionRangeDelAggregator(const InternalKeyComparator* icmp,
413
                               const std::vector<SequenceNumber>& snapshots,
414
                               const std::string* full_history_ts_low = nullptr,
415
                               const std::string* trim_ts = nullptr)
416
11.6k
      : RangeDelAggregator(icmp), snapshots_(&snapshots) {
417
11.6k
    if (full_history_ts_low) {
418
2.16k
      ts_upper_bound_ = *full_history_ts_low;
419
2.16k
    }
420
11.6k
    if (trim_ts) {
421
2.16k
      trim_ts_ = *trim_ts;
422
      // Range tombstone newer than `trim_ts` or `full_history_ts_low` should
423
      // not be considered in ShouldDelete().
424
2.16k
      if (ts_upper_bound_.empty()) {
425
2.16k
        ts_upper_bound_ = trim_ts_;
426
2.16k
      } else if (!trim_ts_.empty() && icmp->user_comparator()->CompareTimestamp(
427
0
                                          trim_ts_, ts_upper_bound_) < 0) {
428
0
        ts_upper_bound_ = trim_ts_;
429
0
      }
430
2.16k
    }
431
11.6k
  }
432
11.6k
  ~CompactionRangeDelAggregator() override {}
433
434
  void AddTombstones(
435
      std::unique_ptr<FragmentedRangeTombstoneIterator> input_iter,
436
      const InternalKey* smallest = nullptr,
437
      const InternalKey* largest = nullptr) override;
438
439
  using RangeDelAggregator::ShouldDelete;
440
  bool ShouldDelete(const ParsedInternalKey& parsed,
441
                    RangeDelPositioningMode mode) override;
442
443
  bool IsRangeOverlapped(const Slice& start, const Slice& end);
444
445
9.47k
  void InvalidateRangeDelMapPositions() override {
446
9.47k
    for (auto& rep : reps_) {
447
2.90k
      rep.second.Invalidate();
448
2.90k
    }
449
9.47k
  }
450
451
1.77k
  bool IsEmpty() const override {
452
1.77k
    for (const auto& rep : reps_) {
453
318
      if (!rep.second.IsEmpty()) {
454
318
        return false;
455
318
      }
456
318
    }
457
1.45k
    return true;
458
1.77k
  }
459
460
  // Creates an iterator over all the range tombstones in the aggregator, for
461
  // use in compaction.
462
  //
463
  // NOTE: the internal key boundaries are used for optimization purposes to
464
  // reduce the number of tombstones that are passed to the fragmenter; they do
465
  // not guarantee that the resulting iterator only contains range tombstones
466
  // that cover keys in the provided range. If required, these bounds must be
467
  // enforced during iteration.
468
  std::unique_ptr<FragmentedRangeTombstoneIterator> NewIterator(
469
      const Slice* lower_bound = nullptr, const Slice* upper_bound = nullptr);
470
471
 private:
472
  std::vector<std::unique_ptr<TruncatedRangeDelIterator>> parent_iters_;
473
  std::map<SequenceNumber, StripeRep> reps_;
474
475
  const std::vector<SequenceNumber>* snapshots_;
476
  // min over full_history_ts_low and trim_ts_
477
  Slice ts_upper_bound_{};
478
  Slice trim_ts_{};
479
};
480
481
}  // namespace ROCKSDB_NAMESPACE