Coverage Report

Created: 2026-05-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/memtable/wbwi_memtable.h
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
#include "db/memtable.h"
8
#include "rocksdb/utilities/write_batch_with_index.h"
9
10
namespace ROCKSDB_NAMESPACE {
11
// An implementation of the ReadOnlyMemTable interface based on the content
12
// of the given write batch with index (WBWI) object. This can be used to ingest
13
// a transaction (which is based on WBWI) into the DB as an immutable memtable.
14
//
15
// REQUIRES: overwrite_key to be true for the WBWI
16
// Since the keys in WBWI do not have sequence number, this class is responsible
17
// for assigning sequence numbers to the keys. This memtable needs to be
18
// assigned a range of sequence numbers through AssignSequenceNumbers(seqno)
19
// before being available for reads.
20
//
21
// The sequence number assignment uses the update count for each key
22
// tracked in WBWI (see WBWIIterator::GetUpdateCount()). For each key, the
23
// sequence number assigned is seqno.lower_bound + update_count - 1. So more
24
// recent updates will have higher sequence number.
25
//
26
// Since WBWI with overwrite mode keeps track of the most recent update for
27
// each key, this memtable contains one update per key usually. However, there
28
// are two exceptions:
29
// 1. Merge operations: Each Merge operation do not overwrite existing entries,
30
// if a user uses Merge, multiple entries may be kept.
31
// 2. Overwriten SingleDelete: this memtable needs to emit an extra
32
// SingleDelete even when the SD is overwritten by another update.
33
// Consider the following scenario:
34
// - WBWI has SD(k) then PUT(k, v1)
35
// - DB has PUT(k, v2) in L1
36
// - flush WBWI adds PUT(k, v1) into L0
37
// - live memtable contains SD(k)
38
// - flush live memtable and compact it with L0 will drop SD(k) and PUT(k, v1)
39
// - the PUT(k, v2) in L1 incorrectly becomes visible
40
// So during flush, iterator from this memtable will need emit overwritten
41
// single deletion. This SD will be assigned seqno.lower_bound.
42
class WBWIMemTable final : public ReadOnlyMemTable {
43
 public:
44
  struct SeqnoRange {
45
    SequenceNumber lower_bound = kMaxSequenceNumber;
46
    SequenceNumber upper_bound = kMaxSequenceNumber;
47
  };
48
  WBWIMemTable(const std::shared_ptr<WriteBatchWithIndex>& wbwi,
49
               const Comparator* cmp, uint32_t cf_id,
50
               const ImmutableOptions* immutable_options,
51
               const MutableCFOptions* cf_options,
52
               const WriteBatchWithIndex::CFStat& stat)
53
0
      : wbwi_(wbwi),
54
0
        comparator_(cmp),
55
0
        ikey_comparator_(comparator_),
56
0
        moptions_(*immutable_options, *cf_options),
57
0
        clock_(immutable_options->clock),
58
        // We need to include overwritten_sd_count in num_entries_ since flush
59
        // verifies number of entries processed and that iterator for this
60
        // memtable will emit overwritten SingleDelete entries during flush, See
61
        // comment above WBWIMemTableIterator for more detail.
62
0
        num_entries_(stat.entry_count + stat.overwritten_sd_count),
63
0
        cf_id_(cf_id) {
64
0
    assert(wbwi->GetOverwriteKey());
65
0
  }
66
67
  // No copying allowed
68
  WBWIMemTable(const WBWIMemTable&) = delete;
69
  WBWIMemTable& operator=(const WBWIMemTable&) = delete;
70
71
0
  ~WBWIMemTable() override { assert(refs_ == 0); }
72
73
0
  const char* Name() const override { return "WBWIMemTable"; }
74
75
0
  size_t ApproximateMemoryUsage() override {
76
    // FIXME: we can calculate for each CF or just divide evenly among CFs
77
    // Used in ReportFlushInputSize(), MemPurgeDecider, flush job event logging,
78
    // and InternalStats::HandleCurSizeAllMemTables
79
0
    return 0;
80
0
  }
81
82
0
  size_t MemoryAllocatedBytes() const override {
83
    // FIXME: similar to ApproximateMemoryUsage().
84
    //   Used in MemTableList to trim memtable history.
85
0
    return 0;
86
0
  }
87
88
  void UniqueRandomSample(
89
      const uint64_t& /* target_sample_size */,
90
0
      std::unordered_set<const char*>* /* entries */) override {
91
    // TODO: support mempurge
92
0
    assert(false);
93
0
  }
94
95
  InternalIterator* NewIterator(const ReadOptions&,
96
                                UnownedPtr<const SeqnoToTimeMapping>,
97
                                Arena* arena,
98
                                const SliceTransform* /* prefix_extractor */,
99
                                bool for_flush) override;
100
101
  // Returns an iterator that wraps a MemTableIterator and logically strips the
102
  // user-defined timestamp of each key. This API is only used by flush when
103
  // user-defined timestamps in MemTable only feature is enabled.
104
  InternalIterator* NewTimestampStrippingIterator(
105
      const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena,
106
0
      const SliceTransform*, size_t) override {
107
    // TODO: support UDT
108
0
    assert(false);
109
0
    return NewErrorInternalIterator(
110
0
        Status::NotSupported(
111
0
            "WBWIMemTable does not support NewTimestampStrippingIterator."),
112
0
        arena);
113
0
  }
114
115
  FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator(
116
0
      const ReadOptions&, SequenceNumber, bool) override {
117
    // TODO: support DeleteRange
118
0
    assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
119
0
    return nullptr;
120
0
  }
121
122
  FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator(
123
0
      const ReadOptions&, SequenceNumber, size_t) override {
124
    // TODO: support UDT
125
0
    assert(false);
126
0
    return nullptr;
127
0
  }
128
129
  // FIXME: not a good practice to use default parameter with virtual function
130
  using ReadOnlyMemTable::Get;
131
  bool Get(const LookupKey& key, std::string* value,
132
           PinnableWideColumns* columns, std::string* timestamp, Status* s,
133
           MergeContext* merge_context,
134
           SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq,
135
           const ReadOptions& read_opts, bool immutable_memtable,
136
           ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
137
           bool do_merge = true,
138
           const BlobFetcher* blob_fetcher = nullptr) override;
139
140
  void MultiGet(const ReadOptions& read_options, MultiGetRange* range,
141
                ReadCallback* callback, bool immutable_memtable,
142
                const BlobFetcher* blob_fetcher = nullptr) override;
143
144
0
  uint64_t NumEntries() const override { return num_entries_; }
145
146
0
  uint64_t NumDeletion() const override {
147
    // FIXME: this is used for stats and event logging
148
0
    return 0;
149
0
  }
150
151
0
  uint64_t NumRangeDeletion() const override {
152
    // FIXME
153
0
    assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
154
0
    return 0;
155
0
  }
156
157
0
  uint64_t GetDataSize() const override {
158
    // FIXME: used in event logging in flush_job
159
0
    return 0;
160
0
  }
161
162
0
  SequenceNumber GetEarliestSequenceNumber() override {
163
0
    return assigned_seqno_.lower_bound;
164
0
  }
165
166
0
  bool IsEmpty() const override {
167
    // Ideally also check that wbwi contains updates from this CF. For now, we
168
    // only create WBWIMemTable for CFs with updates in wbwi.
169
0
    return wbwi_->GetWriteBatch()->Count() == 0;
170
0
  }
171
172
0
  SequenceNumber GetFirstSequenceNumber() override {
173
0
    return assigned_seqno_.lower_bound;
174
0
  }
175
176
0
  uint64_t GetMinLogContainingPrepSection() override {
177
    // FIXME: used to retain WAL with pending Prepare
178
0
    return min_prep_log_referenced_;
179
0
  }
180
181
0
  void MarkImmutable() override {}
182
183
0
  void MarkFlushed() override {}
184
185
0
  MemTableStats ApproximateStats(const Slice&, const Slice&) override {
186
    // FIXME: used for query planning
187
0
    return {};
188
0
  }
189
190
0
  const InternalKeyComparator& GetInternalKeyComparator() const override {
191
0
    return ikey_comparator_;
192
0
  }
193
194
0
  uint64_t ApproximateOldestKeyTime() const override {
195
    // FIXME: can use the time when this is added to the DB.
196
0
    return kUnknownOldestAncesterTime;
197
0
  }
198
199
0
  bool IsFragmentedRangeTombstonesConstructed() const override {
200
0
    assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
201
0
    return true;
202
0
  }
203
204
0
  Slice GetNewestUDT() const override {
205
    // FIXME: support UDT
206
0
    assert(false);
207
0
    return Slice();
208
0
  }
209
210
  // Assign a sequence number to the entries in this memtable.
211
0
  void AssignSequenceNumbers(const SeqnoRange& seqno_range) {
212
    // Not expecting to assign seqno multiple times.
213
0
    assert(assigned_seqno_.lower_bound == kMaxSequenceNumber);
214
0
    assert(assigned_seqno_.upper_bound == kMaxSequenceNumber);
215
216
0
    assigned_seqno_ = seqno_range;
217
218
0
    assert(assigned_seqno_.lower_bound <= assigned_seqno_.upper_bound);
219
0
    assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
220
0
  }
221
222
0
  void SetMinPrepLog(uint64_t min_prep_log) {
223
0
    min_prep_log_referenced_ = min_prep_log;
224
0
  }
225
226
 private:
227
  inline InternalIterator* NewIterator() const;
228
229
  std::shared_ptr<WriteBatchWithIndex> wbwi_;
230
  const Comparator* comparator_;
231
  InternalKeyComparator ikey_comparator_;
232
  SeqnoRange assigned_seqno_;
233
  const ImmutableMemTableOptions moptions_;
234
  SystemClock* clock_;
235
  uint64_t min_prep_log_referenced_{0};
236
  uint64_t num_entries_;
237
  // WBWI can contains updates to multiple CFs. `cf_id_` determines which CF
238
  // this memtable is for.
239
  const uint32_t cf_id_;
240
};
241
242
class WBWIMemTableIterator final : public InternalIterator {
243
 public:
244
  WBWIMemTableIterator(std::unique_ptr<WBWIIterator>&& it,
245
                       const WBWIMemTable::SeqnoRange& assigned_seqno,
246
                       const Comparator* comparator, bool for_flush)
247
0
      : it_(std::move(it)),
248
0
        assigned_seqno_(assigned_seqno),
249
0
        comparator_(comparator),
250
0
        emit_overwritten_single_del_(for_flush) {
251
0
    assert(assigned_seqno_.lower_bound <= assigned_seqno_.upper_bound);
252
0
    assert(assigned_seqno_.upper_bound < kMaxSequenceNumber);
253
0
    s_.PermitUncheckedError();
254
0
  }
255
256
  // No copying allowed
257
  WBWIMemTableIterator(const WBWIMemTableIterator&) = delete;
258
  WBWIMemTableIterator& operator=(const WBWIMemTableIterator&) = delete;
259
260
0
  bool Valid() const override { return valid_; }
261
262
0
  void SeekToFirst() override {
263
0
    it_->SeekToFirst();
264
0
    UpdateKey();
265
0
  }
266
267
0
  void SeekToLast() override {
268
0
    assert(!emit_overwritten_single_del_);
269
0
    it_->SeekToLast();
270
0
    UpdateKey();
271
0
  }
272
273
0
  void Seek(const Slice& target) override {
274
    // `emit_overwritten_single_del_` is only used for flush, which does
275
    // sequential forward scan from the beginning.
276
0
    assert(!emit_overwritten_single_del_);
277
0
    Slice target_user_key = ExtractUserKey(target);
278
    // Moves to first update >= target_user_key
279
0
    it_->Seek(target_user_key);
280
0
    SequenceNumber target_seqno = GetInternalKeySeqno(target);
281
    // Move to the first entry with seqno <= target_seqno for the same
282
    // user key or a different user key.
283
0
    while (it_->Valid() &&
284
0
           comparator_->Compare(it_->Entry().key, target_user_key) == 0 &&
285
0
           target_seqno < CurrentKeySeqno()) {
286
0
      it_->Next();
287
0
    }
288
0
    UpdateKey();
289
0
  }
290
291
0
  void SeekForPrev(const Slice& target) override {
292
0
    assert(!emit_overwritten_single_del_);
293
0
    Slice target_user_key = ExtractUserKey(target);
294
    // Moves to last update <= target_user_key
295
0
    it_->SeekForPrev(target_user_key);
296
0
    SequenceNumber target_seqno = GetInternalKeySeqno(target);
297
    // Move to the first entry with seqno >= target_seqno for the same
298
    // user key or a different user key.
299
0
    while (it_->Valid() &&
300
0
           comparator_->Compare(it_->Entry().key, target_user_key) == 0 &&
301
0
           CurrentKeySeqno() < target_seqno) {
302
0
      it_->Prev();
303
0
    }
304
0
    UpdateKey();
305
0
  }
306
307
0
  void Next() override {
308
0
    assert(Valid());
309
0
    if (emit_overwritten_single_del_) {
310
0
      if (it_->HasOverWrittenSingleDel() && !at_overwritten_single_del_) {
311
        // Merge and SingleDelete on the same key is undefined behavior.
312
0
        assert(it_->Entry().type != kMergeRecord);
313
0
        UpdateSingleDeleteKey();
314
0
        return;
315
0
      }
316
0
      at_overwritten_single_del_ = false;
317
0
    }
318
319
0
    it_->Next();
320
0
    UpdateKey();
321
0
  }
322
323
0
  bool NextAndGetResult(IterateResult* result) override {
324
0
    assert(Valid());
325
0
    Next();
326
0
    bool is_valid = Valid();
327
0
    if (is_valid) {
328
0
      result->key = key();
329
0
      result->bound_check_result = IterBoundCheck::kUnknown;
330
0
      result->value_prepared = true;
331
0
    }
332
0
    return is_valid;
333
0
  }
334
335
0
  void Prev() override {
336
0
    assert(!emit_overwritten_single_del_);
337
0
    assert(Valid());
338
0
    it_->Prev();
339
0
    UpdateKey();
340
0
  }
341
342
0
  Slice key() const override {
343
0
    assert(Valid());
344
0
    return key_;
345
0
  }
346
347
0
  Slice value() const override {
348
0
    assert(Valid());
349
0
    return it_->Entry().value;
350
0
  }
351
352
0
  Status status() const override {
353
0
    assert(it_->status().ok());
354
0
    return s_;
355
0
  }
356
357
0
  bool IsValuePinned() const override { return true; }
358
359
 private:
360
  static const std::unordered_map<WriteType, ValueType> WriteTypeToValueTypeMap;
361
362
0
  SequenceNumber CurrentKeySeqno() {
363
0
    assert(it_->Valid());
364
0
    assert(it_->GetUpdateCount() >= 1);
365
0
    auto seq = assigned_seqno_.lower_bound + it_->GetUpdateCount() - 1;
366
0
    assert(seq <= assigned_seqno_.upper_bound);
367
0
    return seq;
368
0
  }
369
370
  // If it_ is valid, udate key_ to an internal key containing it_ current
371
  // key, CurrentKeySeqno() and a type corresponding to it_ current entry type.
372
0
  void UpdateKey() {
373
0
    valid_ = it_->Valid();
374
0
    if (!Valid()) {
375
0
      key_.clear();
376
0
      return;
377
0
    }
378
0
    auto t = WriteTypeToValueTypeMap.find(it_->Entry().type);
379
0
    assert(t != WriteTypeToValueTypeMap.end());
380
0
    if (t == WriteTypeToValueTypeMap.end()) {
381
0
      key_.clear();
382
0
      valid_ = false;
383
0
      s_ = Status::Corruption("Unexpected write_batch_with_index entry type " +
384
0
                              std::to_string(it_->Entry().type));
385
0
      return;
386
0
    }
387
0
    key_buf_.SetInternalKey(it_->Entry().key, CurrentKeySeqno(), t->second);
388
0
    key_ = key_buf_.GetInternalKey();
389
0
  }
390
391
0
  void UpdateSingleDeleteKey() {
392
0
    assert(it_->Valid());
393
0
    assert(Valid());
394
    // The key that overwrites this SingleDelete will be assigned at least
395
    // seqno lower_bound + 1 (see CurrentKeySeqno()).
396
0
    key_buf_.SetInternalKey(it_->Entry().key, assigned_seqno_.lower_bound,
397
0
                            kTypeSingleDeletion);
398
0
    key_ = key_buf_.GetInternalKey();
399
0
    at_overwritten_single_del_ = true;
400
0
  }
401
402
  std::unique_ptr<WBWIIterator> it_;
403
  const WBWIMemTable::SeqnoRange assigned_seqno_;
404
  const Comparator* comparator_;
405
  IterKey key_buf_;
406
  // The current internal key.
407
  Slice key_;
408
  Status s_;
409
  bool valid_ = false;
410
  bool at_overwritten_single_del_ = false;
411
  bool emit_overwritten_single_del_ = false;
412
};
413
414
}  // namespace ROCKSDB_NAMESPACE