/src/rocksdb/memtable/wbwi_memtable.h
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #pragma once |
7 | | #include "db/memtable.h" |
8 | | #include "rocksdb/utilities/write_batch_with_index.h" |
9 | | |
10 | | namespace ROCKSDB_NAMESPACE { |
11 | | // An implementation of the ReadOnlyMemTable interface based on the content |
12 | | // of the given write batch with index (WBWI) object. This can be used to ingest |
13 | | // a transaction (which is based on WBWI) into the DB as an immutable memtable. |
14 | | // |
15 | | // REQUIRES: overwrite_key to be true for the WBWI |
16 | | // Since the keys in WBWI do not have sequence number, this class is responsible |
17 | | // for assigning sequence numbers to the keys. This memtable needs to be |
18 | | // assigned a range of sequence numbers through AssignSequenceNumbers(seqno) |
19 | | // before being available for reads. |
20 | | // |
21 | | // The sequence number assignment uses the update count for each key |
22 | | // tracked in WBWI (see WBWIIterator::GetUpdateCount()). For each key, the |
23 | | // sequence number assigned is seqno.lower_bound + update_count - 1. So more |
24 | | // recent updates will have higher sequence number. |
25 | | // |
26 | | // Since WBWI with overwrite mode keeps track of the most recent update for |
27 | | // each key, this memtable contains one update per key usually. However, there |
28 | | // are two exceptions: |
29 | | // 1. Merge operations: Each Merge operation do not overwrite existing entries, |
30 | | // if a user uses Merge, multiple entries may be kept. |
31 | | // 2. Overwriten SingleDelete: this memtable needs to emit an extra |
32 | | // SingleDelete even when the SD is overwritten by another update. |
33 | | // Consider the following scenario: |
34 | | // - WBWI has SD(k) then PUT(k, v1) |
35 | | // - DB has PUT(k, v2) in L1 |
36 | | // - flush WBWI adds PUT(k, v1) into L0 |
37 | | // - live memtable contains SD(k) |
38 | | // - flush live memtable and compact it with L0 will drop SD(k) and PUT(k, v1) |
39 | | // - the PUT(k, v2) in L1 incorrectly becomes visible |
40 | | // So during flush, iterator from this memtable will need emit overwritten |
41 | | // single deletion. This SD will be assigned seqno.lower_bound. |
42 | | class WBWIMemTable final : public ReadOnlyMemTable { |
43 | | public: |
44 | | struct SeqnoRange { |
45 | | SequenceNumber lower_bound = kMaxSequenceNumber; |
46 | | SequenceNumber upper_bound = kMaxSequenceNumber; |
47 | | }; |
48 | | WBWIMemTable(const std::shared_ptr<WriteBatchWithIndex>& wbwi, |
49 | | const Comparator* cmp, uint32_t cf_id, |
50 | | const ImmutableOptions* immutable_options, |
51 | | const MutableCFOptions* cf_options, |
52 | | const WriteBatchWithIndex::CFStat& stat) |
53 | 0 | : wbwi_(wbwi), |
54 | 0 | comparator_(cmp), |
55 | 0 | ikey_comparator_(comparator_), |
56 | 0 | moptions_(*immutable_options, *cf_options), |
57 | 0 | clock_(immutable_options->clock), |
58 | | // We need to include overwritten_sd_count in num_entries_ since flush |
59 | | // verifies number of entries processed and that iterator for this |
60 | | // memtable will emit overwritten SingleDelete entries during flush, See |
61 | | // comment above WBWIMemTableIterator for more detail. |
62 | 0 | num_entries_(stat.entry_count + stat.overwritten_sd_count), |
63 | 0 | cf_id_(cf_id) { |
64 | 0 | assert(wbwi->GetOverwriteKey()); |
65 | 0 | } |
66 | | |
67 | | // No copying allowed |
68 | | WBWIMemTable(const WBWIMemTable&) = delete; |
69 | | WBWIMemTable& operator=(const WBWIMemTable&) = delete; |
70 | | |
71 | 0 | ~WBWIMemTable() override { assert(refs_ == 0); } |
72 | | |
73 | 0 | const char* Name() const override { return "WBWIMemTable"; } |
74 | | |
75 | 0 | size_t ApproximateMemoryUsage() override { |
76 | | // FIXME: we can calculate for each CF or just divide evenly among CFs |
77 | | // Used in ReportFlushInputSize(), MemPurgeDecider, flush job event logging, |
78 | | // and InternalStats::HandleCurSizeAllMemTables |
79 | 0 | return 0; |
80 | 0 | } |
81 | | |
82 | 0 | size_t MemoryAllocatedBytes() const override { |
83 | | // FIXME: similar to ApproximateMemoryUsage(). |
84 | | // Used in MemTableList to trim memtable history. |
85 | 0 | return 0; |
86 | 0 | } |
87 | | |
88 | | void UniqueRandomSample( |
89 | | const uint64_t& /* target_sample_size */, |
90 | 0 | std::unordered_set<const char*>* /* entries */) override { |
91 | | // TODO: support mempurge |
92 | 0 | assert(false); |
93 | 0 | } |
94 | | |
95 | | InternalIterator* NewIterator(const ReadOptions&, |
96 | | UnownedPtr<const SeqnoToTimeMapping>, |
97 | | Arena* arena, |
98 | | const SliceTransform* /* prefix_extractor */, |
99 | | bool for_flush) override; |
100 | | |
101 | | // Returns an iterator that wraps a MemTableIterator and logically strips the |
102 | | // user-defined timestamp of each key. This API is only used by flush when |
103 | | // user-defined timestamps in MemTable only feature is enabled. |
104 | | InternalIterator* NewTimestampStrippingIterator( |
105 | | const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena, |
106 | 0 | const SliceTransform*, size_t) override { |
107 | | // TODO: support UDT |
108 | 0 | assert(false); |
109 | 0 | return NewErrorInternalIterator( |
110 | 0 | Status::NotSupported( |
111 | 0 | "WBWIMemTable does not support NewTimestampStrippingIterator."), |
112 | 0 | arena); |
113 | 0 | } |
114 | | |
115 | | FragmentedRangeTombstoneIterator* NewRangeTombstoneIterator( |
116 | 0 | const ReadOptions&, SequenceNumber, bool) override { |
117 | | // TODO: support DeleteRange |
118 | 0 | assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); |
119 | 0 | return nullptr; |
120 | 0 | } |
121 | | |
122 | | FragmentedRangeTombstoneIterator* NewTimestampStrippingRangeTombstoneIterator( |
123 | 0 | const ReadOptions&, SequenceNumber, size_t) override { |
124 | | // TODO: support UDT |
125 | 0 | assert(false); |
126 | 0 | return nullptr; |
127 | 0 | } |
128 | | |
129 | | // FIXME: not a good practice to use default parameter with virtual function |
130 | | using ReadOnlyMemTable::Get; |
131 | | bool Get(const LookupKey& key, std::string* value, |
132 | | PinnableWideColumns* columns, std::string* timestamp, Status* s, |
133 | | MergeContext* merge_context, |
134 | | SequenceNumber* max_covering_tombstone_seq, SequenceNumber* seq, |
135 | | const ReadOptions& read_opts, bool immutable_memtable, |
136 | | ReadCallback* callback = nullptr, bool* is_blob_index = nullptr, |
137 | | bool do_merge = true, |
138 | | const BlobFetcher* blob_fetcher = nullptr) override; |
139 | | |
140 | | void MultiGet(const ReadOptions& read_options, MultiGetRange* range, |
141 | | ReadCallback* callback, bool immutable_memtable, |
142 | | const BlobFetcher* blob_fetcher = nullptr) override; |
143 | | |
144 | 0 | uint64_t NumEntries() const override { return num_entries_; } |
145 | | |
146 | 0 | uint64_t NumDeletion() const override { |
147 | | // FIXME: this is used for stats and event logging |
148 | 0 | return 0; |
149 | 0 | } |
150 | | |
151 | 0 | uint64_t NumRangeDeletion() const override { |
152 | | // FIXME |
153 | 0 | assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); |
154 | 0 | return 0; |
155 | 0 | } |
156 | | |
157 | 0 | uint64_t GetDataSize() const override { |
158 | | // FIXME: used in event logging in flush_job |
159 | 0 | return 0; |
160 | 0 | } |
161 | | |
162 | 0 | SequenceNumber GetEarliestSequenceNumber() override { |
163 | 0 | return assigned_seqno_.lower_bound; |
164 | 0 | } |
165 | | |
166 | 0 | bool IsEmpty() const override { |
167 | | // Ideally also check that wbwi contains updates from this CF. For now, we |
168 | | // only create WBWIMemTable for CFs with updates in wbwi. |
169 | 0 | return wbwi_->GetWriteBatch()->Count() == 0; |
170 | 0 | } |
171 | | |
172 | 0 | SequenceNumber GetFirstSequenceNumber() override { |
173 | 0 | return assigned_seqno_.lower_bound; |
174 | 0 | } |
175 | | |
176 | 0 | uint64_t GetMinLogContainingPrepSection() override { |
177 | | // FIXME: used to retain WAL with pending Prepare |
178 | 0 | return min_prep_log_referenced_; |
179 | 0 | } |
180 | | |
181 | 0 | void MarkImmutable() override {} |
182 | | |
183 | 0 | void MarkFlushed() override {} |
184 | | |
185 | 0 | MemTableStats ApproximateStats(const Slice&, const Slice&) override { |
186 | | // FIXME: used for query planning |
187 | 0 | return {}; |
188 | 0 | } |
189 | | |
190 | 0 | const InternalKeyComparator& GetInternalKeyComparator() const override { |
191 | 0 | return ikey_comparator_; |
192 | 0 | } |
193 | | |
194 | 0 | uint64_t ApproximateOldestKeyTime() const override { |
195 | | // FIXME: can use the time when this is added to the DB. |
196 | 0 | return kUnknownOldestAncesterTime; |
197 | 0 | } |
198 | | |
199 | 0 | bool IsFragmentedRangeTombstonesConstructed() const override { |
200 | 0 | assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); |
201 | 0 | return true; |
202 | 0 | } |
203 | | |
204 | 0 | Slice GetNewestUDT() const override { |
205 | | // FIXME: support UDT |
206 | 0 | assert(false); |
207 | 0 | return Slice(); |
208 | 0 | } |
209 | | |
210 | | // Assign a sequence number to the entries in this memtable. |
211 | 0 | void AssignSequenceNumbers(const SeqnoRange& seqno_range) { |
212 | | // Not expecting to assign seqno multiple times. |
213 | 0 | assert(assigned_seqno_.lower_bound == kMaxSequenceNumber); |
214 | 0 | assert(assigned_seqno_.upper_bound == kMaxSequenceNumber); |
215 | |
|
216 | 0 | assigned_seqno_ = seqno_range; |
217 | |
|
218 | 0 | assert(assigned_seqno_.lower_bound <= assigned_seqno_.upper_bound); |
219 | 0 | assert(assigned_seqno_.upper_bound != kMaxSequenceNumber); |
220 | 0 | } |
221 | | |
222 | 0 | void SetMinPrepLog(uint64_t min_prep_log) { |
223 | 0 | min_prep_log_referenced_ = min_prep_log; |
224 | 0 | } |
225 | | |
226 | | private: |
227 | | inline InternalIterator* NewIterator() const; |
228 | | |
229 | | std::shared_ptr<WriteBatchWithIndex> wbwi_; |
230 | | const Comparator* comparator_; |
231 | | InternalKeyComparator ikey_comparator_; |
232 | | SeqnoRange assigned_seqno_; |
233 | | const ImmutableMemTableOptions moptions_; |
234 | | SystemClock* clock_; |
235 | | uint64_t min_prep_log_referenced_{0}; |
236 | | uint64_t num_entries_; |
237 | | // WBWI can contains updates to multiple CFs. `cf_id_` determines which CF |
238 | | // this memtable is for. |
239 | | const uint32_t cf_id_; |
240 | | }; |
241 | | |
242 | | class WBWIMemTableIterator final : public InternalIterator { |
243 | | public: |
244 | | WBWIMemTableIterator(std::unique_ptr<WBWIIterator>&& it, |
245 | | const WBWIMemTable::SeqnoRange& assigned_seqno, |
246 | | const Comparator* comparator, bool for_flush) |
247 | 0 | : it_(std::move(it)), |
248 | 0 | assigned_seqno_(assigned_seqno), |
249 | 0 | comparator_(comparator), |
250 | 0 | emit_overwritten_single_del_(for_flush) { |
251 | 0 | assert(assigned_seqno_.lower_bound <= assigned_seqno_.upper_bound); |
252 | 0 | assert(assigned_seqno_.upper_bound < kMaxSequenceNumber); |
253 | 0 | s_.PermitUncheckedError(); |
254 | 0 | } |
255 | | |
256 | | // No copying allowed |
257 | | WBWIMemTableIterator(const WBWIMemTableIterator&) = delete; |
258 | | WBWIMemTableIterator& operator=(const WBWIMemTableIterator&) = delete; |
259 | | |
260 | 0 | bool Valid() const override { return valid_; } |
261 | | |
262 | 0 | void SeekToFirst() override { |
263 | 0 | it_->SeekToFirst(); |
264 | 0 | UpdateKey(); |
265 | 0 | } |
266 | | |
267 | 0 | void SeekToLast() override { |
268 | 0 | assert(!emit_overwritten_single_del_); |
269 | 0 | it_->SeekToLast(); |
270 | 0 | UpdateKey(); |
271 | 0 | } |
272 | | |
273 | 0 | void Seek(const Slice& target) override { |
274 | | // `emit_overwritten_single_del_` is only used for flush, which does |
275 | | // sequential forward scan from the beginning. |
276 | 0 | assert(!emit_overwritten_single_del_); |
277 | 0 | Slice target_user_key = ExtractUserKey(target); |
278 | | // Moves to first update >= target_user_key |
279 | 0 | it_->Seek(target_user_key); |
280 | 0 | SequenceNumber target_seqno = GetInternalKeySeqno(target); |
281 | | // Move to the first entry with seqno <= target_seqno for the same |
282 | | // user key or a different user key. |
283 | 0 | while (it_->Valid() && |
284 | 0 | comparator_->Compare(it_->Entry().key, target_user_key) == 0 && |
285 | 0 | target_seqno < CurrentKeySeqno()) { |
286 | 0 | it_->Next(); |
287 | 0 | } |
288 | 0 | UpdateKey(); |
289 | 0 | } |
290 | | |
291 | 0 | void SeekForPrev(const Slice& target) override { |
292 | 0 | assert(!emit_overwritten_single_del_); |
293 | 0 | Slice target_user_key = ExtractUserKey(target); |
294 | | // Moves to last update <= target_user_key |
295 | 0 | it_->SeekForPrev(target_user_key); |
296 | 0 | SequenceNumber target_seqno = GetInternalKeySeqno(target); |
297 | | // Move to the first entry with seqno >= target_seqno for the same |
298 | | // user key or a different user key. |
299 | 0 | while (it_->Valid() && |
300 | 0 | comparator_->Compare(it_->Entry().key, target_user_key) == 0 && |
301 | 0 | CurrentKeySeqno() < target_seqno) { |
302 | 0 | it_->Prev(); |
303 | 0 | } |
304 | 0 | UpdateKey(); |
305 | 0 | } |
306 | | |
307 | 0 | void Next() override { |
308 | 0 | assert(Valid()); |
309 | 0 | if (emit_overwritten_single_del_) { |
310 | 0 | if (it_->HasOverWrittenSingleDel() && !at_overwritten_single_del_) { |
311 | | // Merge and SingleDelete on the same key is undefined behavior. |
312 | 0 | assert(it_->Entry().type != kMergeRecord); |
313 | 0 | UpdateSingleDeleteKey(); |
314 | 0 | return; |
315 | 0 | } |
316 | 0 | at_overwritten_single_del_ = false; |
317 | 0 | } |
318 | | |
319 | 0 | it_->Next(); |
320 | 0 | UpdateKey(); |
321 | 0 | } |
322 | | |
323 | 0 | bool NextAndGetResult(IterateResult* result) override { |
324 | 0 | assert(Valid()); |
325 | 0 | Next(); |
326 | 0 | bool is_valid = Valid(); |
327 | 0 | if (is_valid) { |
328 | 0 | result->key = key(); |
329 | 0 | result->bound_check_result = IterBoundCheck::kUnknown; |
330 | 0 | result->value_prepared = true; |
331 | 0 | } |
332 | 0 | return is_valid; |
333 | 0 | } |
334 | | |
335 | 0 | void Prev() override { |
336 | 0 | assert(!emit_overwritten_single_del_); |
337 | 0 | assert(Valid()); |
338 | 0 | it_->Prev(); |
339 | 0 | UpdateKey(); |
340 | 0 | } |
341 | | |
342 | 0 | Slice key() const override { |
343 | 0 | assert(Valid()); |
344 | 0 | return key_; |
345 | 0 | } |
346 | | |
347 | 0 | Slice value() const override { |
348 | 0 | assert(Valid()); |
349 | 0 | return it_->Entry().value; |
350 | 0 | } |
351 | | |
352 | 0 | Status status() const override { |
353 | 0 | assert(it_->status().ok()); |
354 | 0 | return s_; |
355 | 0 | } |
356 | | |
357 | 0 | bool IsValuePinned() const override { return true; } |
358 | | |
359 | | private: |
360 | | static const std::unordered_map<WriteType, ValueType> WriteTypeToValueTypeMap; |
361 | | |
362 | 0 | SequenceNumber CurrentKeySeqno() { |
363 | 0 | assert(it_->Valid()); |
364 | 0 | assert(it_->GetUpdateCount() >= 1); |
365 | 0 | auto seq = assigned_seqno_.lower_bound + it_->GetUpdateCount() - 1; |
366 | 0 | assert(seq <= assigned_seqno_.upper_bound); |
367 | 0 | return seq; |
368 | 0 | } |
369 | | |
370 | | // If it_ is valid, udate key_ to an internal key containing it_ current |
371 | | // key, CurrentKeySeqno() and a type corresponding to it_ current entry type. |
372 | 0 | void UpdateKey() { |
373 | 0 | valid_ = it_->Valid(); |
374 | 0 | if (!Valid()) { |
375 | 0 | key_.clear(); |
376 | 0 | return; |
377 | 0 | } |
378 | 0 | auto t = WriteTypeToValueTypeMap.find(it_->Entry().type); |
379 | 0 | assert(t != WriteTypeToValueTypeMap.end()); |
380 | 0 | if (t == WriteTypeToValueTypeMap.end()) { |
381 | 0 | key_.clear(); |
382 | 0 | valid_ = false; |
383 | 0 | s_ = Status::Corruption("Unexpected write_batch_with_index entry type " + |
384 | 0 | std::to_string(it_->Entry().type)); |
385 | 0 | return; |
386 | 0 | } |
387 | 0 | key_buf_.SetInternalKey(it_->Entry().key, CurrentKeySeqno(), t->second); |
388 | 0 | key_ = key_buf_.GetInternalKey(); |
389 | 0 | } |
390 | | |
391 | 0 | void UpdateSingleDeleteKey() { |
392 | 0 | assert(it_->Valid()); |
393 | 0 | assert(Valid()); |
394 | | // The key that overwrites this SingleDelete will be assigned at least |
395 | | // seqno lower_bound + 1 (see CurrentKeySeqno()). |
396 | 0 | key_buf_.SetInternalKey(it_->Entry().key, assigned_seqno_.lower_bound, |
397 | 0 | kTypeSingleDeletion); |
398 | 0 | key_ = key_buf_.GetInternalKey(); |
399 | 0 | at_overwritten_single_del_ = true; |
400 | 0 | } |
401 | | |
402 | | std::unique_ptr<WBWIIterator> it_; |
403 | | const WBWIMemTable::SeqnoRange assigned_seqno_; |
404 | | const Comparator* comparator_; |
405 | | IterKey key_buf_; |
406 | | // The current internal key. |
407 | | Slice key_; |
408 | | Status s_; |
409 | | bool valid_ = false; |
410 | | bool at_overwritten_single_del_ = false; |
411 | | bool emit_overwritten_single_del_ = false; |
412 | | }; |
413 | | |
414 | | } // namespace ROCKSDB_NAMESPACE |