/src/rocksdb/db/memtable.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/memtable.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <array> |
14 | | #include <limits> |
15 | | #include <memory> |
16 | | #include <optional> |
17 | | |
18 | | #include "db/dbformat.h" |
19 | | #include "db/kv_checksum.h" |
20 | | #include "db/merge_context.h" |
21 | | #include "db/merge_helper.h" |
22 | | #include "db/pinned_iterators_manager.h" |
23 | | #include "db/range_tombstone_fragmenter.h" |
24 | | #include "db/read_callback.h" |
25 | | #include "db/wide/wide_column_serialization.h" |
26 | | #include "logging/logging.h" |
27 | | #include "memory/arena.h" |
28 | | #include "memory/memory_usage.h" |
29 | | #include "monitoring/perf_context_imp.h" |
30 | | #include "monitoring/statistics_impl.h" |
31 | | #include "port/lang.h" |
32 | | #include "port/port.h" |
33 | | #include "rocksdb/comparator.h" |
34 | | #include "rocksdb/env.h" |
35 | | #include "rocksdb/iterator.h" |
36 | | #include "rocksdb/merge_operator.h" |
37 | | #include "rocksdb/slice_transform.h" |
38 | | #include "rocksdb/types.h" |
39 | | #include "rocksdb/write_buffer_manager.h" |
40 | | #include "table/internal_iterator.h" |
41 | | #include "table/iterator_wrapper.h" |
42 | | #include "table/merging_iterator.h" |
43 | | #include "util/autovector.h" |
44 | | #include "util/coding.h" |
45 | | #include "util/mutexlock.h" |
46 | | |
47 | | namespace ROCKSDB_NAMESPACE { |
48 | | |
49 | | ImmutableMemTableOptions::ImmutableMemTableOptions( |
50 | | const ImmutableOptions& ioptions, |
51 | | const MutableCFOptions& mutable_cf_options) |
52 | | : arena_block_size(mutable_cf_options.arena_block_size), |
53 | | memtable_prefix_bloom_bits( |
54 | | static_cast<uint32_t>( |
55 | | static_cast<double>(mutable_cf_options.write_buffer_size) * |
56 | | mutable_cf_options.memtable_prefix_bloom_size_ratio) * |
57 | | 8u), |
58 | | memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size), |
59 | | memtable_whole_key_filtering( |
60 | | mutable_cf_options.memtable_whole_key_filtering), |
61 | | inplace_update_support(ioptions.inplace_update_support), |
62 | | inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks), |
63 | | inplace_callback(ioptions.inplace_callback), |
64 | | max_successive_merges(mutable_cf_options.max_successive_merges), |
65 | | strict_max_successive_merges( |
66 | | mutable_cf_options.strict_max_successive_merges), |
67 | | statistics(ioptions.stats), |
68 | | merge_operator(ioptions.merge_operator.get()), |
69 | | info_log(ioptions.logger), |
70 | | protection_bytes_per_key( |
71 | | mutable_cf_options.memtable_protection_bytes_per_key), |
72 | | allow_data_in_errors(ioptions.allow_data_in_errors), |
73 | 17.3k | paranoid_memory_checks(mutable_cf_options.paranoid_memory_checks) {} |
74 | | |
75 | | MemTable::MemTable(const InternalKeyComparator& cmp, |
76 | | const ImmutableOptions& ioptions, |
77 | | const MutableCFOptions& mutable_cf_options, |
78 | | WriteBufferManager* write_buffer_manager, |
79 | | SequenceNumber latest_seq, uint32_t column_family_id) |
80 | | : comparator_(cmp), |
81 | | moptions_(ioptions, mutable_cf_options), |
82 | | refs_(0), |
83 | | kArenaBlockSize(Arena::OptimizeBlockSize(moptions_.arena_block_size)), |
84 | | mem_tracker_(write_buffer_manager), |
85 | | arena_(moptions_.arena_block_size, |
86 | | (write_buffer_manager != nullptr && |
87 | | (write_buffer_manager->enabled() || |
88 | | write_buffer_manager->cost_to_cache())) |
89 | | ? &mem_tracker_ |
90 | | : nullptr, |
91 | | mutable_cf_options.memtable_huge_page_size), |
92 | | table_(ioptions.memtable_factory->CreateMemTableRep( |
93 | | comparator_, &arena_, mutable_cf_options.prefix_extractor.get(), |
94 | | ioptions.logger, column_family_id)), |
95 | | range_del_table_(SkipListFactory().CreateMemTableRep( |
96 | | comparator_, &arena_, nullptr /* transform */, ioptions.logger, |
97 | | column_family_id)), |
98 | | is_range_del_table_empty_(true), |
99 | | data_size_(0), |
100 | | num_entries_(0), |
101 | | num_deletes_(0), |
102 | | num_range_deletes_(0), |
103 | | write_buffer_size_(mutable_cf_options.write_buffer_size), |
104 | | flush_in_progress_(false), |
105 | | flush_completed_(false), |
106 | | file_number_(0), |
107 | | first_seqno_(0), |
108 | | earliest_seqno_(latest_seq), |
109 | | creation_seq_(latest_seq), |
110 | | mem_next_logfile_number_(0), |
111 | | min_prep_log_referenced_(0), |
112 | | locks_(moptions_.inplace_update_support |
113 | | ? moptions_.inplace_update_num_locks |
114 | | : 0), |
115 | | prefix_extractor_(mutable_cf_options.prefix_extractor.get()), |
116 | | flush_state_(FLUSH_NOT_REQUESTED), |
117 | | clock_(ioptions.clock), |
118 | | insert_with_hint_prefix_extractor_( |
119 | | ioptions.memtable_insert_with_hint_prefix_extractor.get()), |
120 | | oldest_key_time_(std::numeric_limits<uint64_t>::max()), |
121 | | atomic_flush_seqno_(kMaxSequenceNumber), |
122 | | approximate_memory_usage_(0), |
123 | | memtable_max_range_deletions_( |
124 | 17.3k | mutable_cf_options.memtable_max_range_deletions) { |
125 | 17.3k | UpdateFlushState(); |
126 | | // something went wrong if we need to flush before inserting anything |
127 | 17.3k | assert(!ShouldScheduleFlush()); |
128 | | |
129 | | // use bloom_filter_ for both whole key and prefix bloom filter |
130 | 17.3k | if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) && |
131 | 17.3k | moptions_.memtable_prefix_bloom_bits > 0) { |
132 | 0 | bloom_filter_.reset( |
133 | 0 | new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits, |
134 | 0 | 6 /* hard coded 6 probes */, |
135 | 0 | moptions_.memtable_huge_page_size, ioptions.logger)); |
136 | 0 | } |
137 | | // Initialize cached_range_tombstone_ here since it could |
138 | | // be read before it is constructed in MemTable::Add(), which could also lead |
139 | | // to a data race on the global mutex table backing atomic shared_ptr. |
140 | 17.3k | auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>(); |
141 | 17.3k | size_t size = cached_range_tombstone_.Size(); |
142 | 572k | for (size_t i = 0; i < size; ++i) { |
143 | 555k | std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr = |
144 | 555k | cached_range_tombstone_.AccessAtCore(i); |
145 | 555k | auto new_local_cache_ref = std::make_shared< |
146 | 555k | const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache); |
147 | 555k | std::atomic_store_explicit( |
148 | 555k | local_cache_ref_ptr, |
149 | 555k | std::shared_ptr<FragmentedRangeTombstoneListCache>(new_local_cache_ref, |
150 | 555k | new_cache.get()), |
151 | 555k | std::memory_order_relaxed); |
152 | 555k | } |
153 | 17.3k | const Comparator* ucmp = cmp.user_comparator(); |
154 | 17.3k | assert(ucmp); |
155 | 17.3k | ts_sz_ = ucmp->timestamp_size(); |
156 | 17.3k | persist_user_defined_timestamps_ = ioptions.persist_user_defined_timestamps; |
157 | 17.3k | } |
158 | | |
159 | 17.3k | MemTable::~MemTable() { |
160 | 17.3k | mem_tracker_.FreeMem(); |
161 | 17.3k | assert(refs_ == 0); |
162 | 17.3k | } |
163 | | |
164 | 532 | size_t MemTable::ApproximateMemoryUsage() { |
165 | 532 | autovector<size_t> usages = { |
166 | 532 | arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(), |
167 | 532 | range_del_table_->ApproximateMemoryUsage(), |
168 | 532 | ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)}; |
169 | 532 | size_t total_usage = 0; |
170 | 2.12k | for (size_t usage : usages) { |
171 | | // If usage + total_usage >= kMaxSizet, return kMaxSizet. |
172 | | // the following variation is to avoid numeric overflow. |
173 | 2.12k | if (usage >= std::numeric_limits<size_t>::max() - total_usage) { |
174 | 0 | return std::numeric_limits<size_t>::max(); |
175 | 0 | } |
176 | 2.12k | total_usage += usage; |
177 | 2.12k | } |
178 | 532 | approximate_memory_usage_.store(total_usage, std::memory_order_relaxed); |
179 | | // otherwise, return the actual usage |
180 | 532 | return total_usage; |
181 | 532 | } |
182 | | |
183 | 2.20M | bool MemTable::ShouldFlushNow() { |
184 | | // This is set if memtable_max_range_deletions is > 0, |
185 | | // and that many range deletions are done |
186 | 2.20M | if (memtable_max_range_deletions_ > 0 && |
187 | 2.20M | num_range_deletes_.load(std::memory_order_relaxed) >= |
188 | 0 | static_cast<uint64_t>(memtable_max_range_deletions_)) { |
189 | 0 | return true; |
190 | 0 | } |
191 | | |
192 | 2.20M | size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed); |
193 | | // In a lot of times, we cannot allocate arena blocks that exactly matches the |
194 | | // buffer size. Thus we have to decide if we should over-allocate or |
195 | | // under-allocate. |
196 | | // This constant variable can be interpreted as: if we still have more than |
197 | | // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over |
198 | | // allocate one more block. |
199 | 2.20M | const double kAllowOverAllocationRatio = 0.6; |
200 | | |
201 | | // If arena still have room for new block allocation, we can safely say it |
202 | | // shouldn't flush. |
203 | 2.20M | auto allocated_memory = table_->ApproximateMemoryUsage() + |
204 | 2.20M | range_del_table_->ApproximateMemoryUsage() + |
205 | 2.20M | arena_.MemoryAllocatedBytes(); |
206 | | |
207 | 2.20M | approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed); |
208 | | |
209 | | // if we can still allocate one more block without exceeding the |
210 | | // over-allocation ratio, then we should not flush. |
211 | 2.20M | if (allocated_memory + kArenaBlockSize < |
212 | 2.20M | write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { |
213 | 2.20M | return false; |
214 | 2.20M | } |
215 | | |
216 | | // if user keeps adding entries that exceeds write_buffer_size, we need to |
217 | | // flush earlier even though we still have much available memory left. |
218 | 0 | if (allocated_memory > |
219 | 0 | write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { |
220 | 0 | return true; |
221 | 0 | } |
222 | | |
223 | | // In this code path, Arena has already allocated its "last block", which |
224 | | // means the total allocatedmemory size is either: |
225 | | // (1) "moderately" over allocated the memory (no more than `0.6 * arena |
226 | | // block size`. Or, |
227 | | // (2) the allocated memory is less than write buffer size, but we'll stop |
228 | | // here since if we allocate a new arena block, we'll over allocate too much |
229 | | // more (half of the arena block size) memory. |
230 | | // |
231 | | // In either case, to avoid over-allocate, the last block will stop allocation |
232 | | // when its usage reaches a certain ratio, which we carefully choose "0.75 |
233 | | // full" as the stop condition because it addresses the following issue with |
234 | | // great simplicity: What if the next inserted entry's size is |
235 | | // bigger than AllocatedAndUnused()? |
236 | | // |
237 | | // The answer is: if the entry size is also bigger than 0.25 * |
238 | | // kArenaBlockSize, a dedicated block will be allocated for it; otherwise |
239 | | // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty |
240 | | // and regular block. In either case, we *overly* over-allocated. |
241 | | // |
242 | | // Therefore, setting the last block to be at most "0.75 full" avoids both |
243 | | // cases. |
244 | | // |
245 | | // NOTE: the average percentage of waste space of this approach can be counted |
246 | | // as: "arena block size * 0.25 / write buffer size". User who specify a small |
247 | | // write buffer size and/or big arena block size may suffer. |
248 | 0 | return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; |
249 | 0 | } |
250 | | |
251 | 2.20M | void MemTable::UpdateFlushState() { |
252 | 2.20M | auto state = flush_state_.load(std::memory_order_relaxed); |
253 | 2.20M | if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) { |
254 | | // ignore CAS failure, because that means somebody else requested |
255 | | // a flush |
256 | 0 | flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED, |
257 | 0 | std::memory_order_relaxed, |
258 | 0 | std::memory_order_relaxed); |
259 | 0 | } |
260 | 2.20M | } |
261 | | |
262 | 2.18M | void MemTable::UpdateOldestKeyTime() { |
263 | 2.18M | uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed); |
264 | 2.18M | if (oldest_key_time == std::numeric_limits<uint64_t>::max()) { |
265 | 10.9k | int64_t current_time = 0; |
266 | 10.9k | auto s = clock_->GetCurrentTime(¤t_time); |
267 | 10.9k | if (s.ok()) { |
268 | 10.9k | assert(current_time >= 0); |
269 | | // If fail, the timestamp is already set. |
270 | 10.9k | oldest_key_time_.compare_exchange_strong( |
271 | 10.9k | oldest_key_time, static_cast<uint64_t>(current_time), |
272 | 10.9k | std::memory_order_relaxed, std::memory_order_relaxed); |
273 | 10.9k | } |
274 | 10.9k | } |
275 | 2.18M | } |
276 | | |
277 | | Status MemTable::VerifyEntryChecksum(const char* entry, |
278 | | uint32_t protection_bytes_per_key, |
279 | 0 | bool allow_data_in_errors) { |
280 | 0 | if (protection_bytes_per_key == 0) { |
281 | 0 | return Status::OK(); |
282 | 0 | } |
283 | 0 | uint32_t key_length; |
284 | 0 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
285 | 0 | if (key_ptr == nullptr) { |
286 | 0 | return Status::Corruption("Unable to parse internal key length"); |
287 | 0 | } |
288 | 0 | if (key_length < 8) { |
289 | 0 | return Status::Corruption("Memtable entry internal key length too short."); |
290 | 0 | } |
291 | 0 | Slice user_key = Slice(key_ptr, key_length - 8); |
292 | |
|
293 | 0 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
294 | 0 | ValueType type; |
295 | 0 | SequenceNumber seq; |
296 | 0 | UnPackSequenceAndType(tag, &seq, &type); |
297 | |
|
298 | 0 | uint32_t value_length = 0; |
299 | 0 | const char* value_ptr = GetVarint32Ptr( |
300 | 0 | key_ptr + key_length, key_ptr + key_length + 5, &value_length); |
301 | 0 | if (value_ptr == nullptr) { |
302 | 0 | return Status::Corruption("Unable to parse internal key value"); |
303 | 0 | } |
304 | 0 | Slice value = Slice(value_ptr, value_length); |
305 | |
|
306 | 0 | const char* checksum_ptr = value_ptr + value_length; |
307 | 0 | bool match = |
308 | 0 | ProtectionInfo64() |
309 | 0 | .ProtectKVO(user_key, value, type) |
310 | 0 | .ProtectS(seq) |
311 | 0 | .Verify(static_cast<uint8_t>(protection_bytes_per_key), checksum_ptr); |
312 | 0 | if (!match) { |
313 | 0 | std::string msg( |
314 | 0 | "Corrupted memtable entry, per key-value checksum verification " |
315 | 0 | "failed."); |
316 | 0 | if (allow_data_in_errors) { |
317 | 0 | msg.append("Unrecognized value type: " + |
318 | 0 | std::to_string(static_cast<int>(type)) + ". "); |
319 | 0 | msg.append("User key: " + user_key.ToString(/*hex=*/true) + ". "); |
320 | 0 | msg.append("seq: " + std::to_string(seq) + "."); |
321 | 0 | } |
322 | 0 | return Status::Corruption(msg.c_str()); |
323 | 0 | } |
324 | 0 | return Status::OK(); |
325 | 0 | } |
326 | | |
327 | | int MemTable::KeyComparator::operator()(const char* prefix_len_key1, |
328 | 0 | const char* prefix_len_key2) const { |
329 | | // Internal keys are encoded as length-prefixed strings. |
330 | 0 | Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); |
331 | 0 | Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); |
332 | 0 | return comparator.CompareKeySeq(k1, k2); |
333 | 0 | } |
334 | | |
335 | | int MemTable::KeyComparator::operator()( |
336 | 22.8M | const char* prefix_len_key, const KeyComparator::DecodedType& key) const { |
337 | | // Internal keys are encoded as length-prefixed strings. |
338 | 22.8M | Slice a = GetLengthPrefixedSlice(prefix_len_key); |
339 | 22.8M | return comparator.CompareKeySeq(a, key); |
340 | 22.8M | } |
341 | | |
342 | 0 | void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) { |
343 | 0 | throw std::runtime_error("concurrent insert not supported"); |
344 | 0 | } |
345 | | |
346 | 0 | Slice MemTableRep::UserKey(const char* key) const { |
347 | 0 | Slice slice = GetLengthPrefixedSlice(key); |
348 | 0 | return Slice(slice.data(), slice.size() - 8); |
349 | 0 | } |
350 | | |
351 | 0 | KeyHandle MemTableRep::Allocate(const size_t len, char** buf) { |
352 | 0 | *buf = allocator_->Allocate(len); |
353 | 0 | return static_cast<KeyHandle>(*buf); |
354 | 0 | } |
355 | | |
356 | | // Encode a suitable internal key target for "target" and return it. |
357 | | // Uses *scratch as scratch space, and the returned pointer will point |
358 | | // into this scratch space. |
359 | 870 | const char* EncodeKey(std::string* scratch, const Slice& target) { |
360 | 870 | scratch->clear(); |
361 | 870 | PutVarint32(scratch, static_cast<uint32_t>(target.size())); |
362 | 870 | scratch->append(target.data(), target.size()); |
363 | 870 | return scratch->data(); |
364 | 870 | } |
365 | | |
366 | | class MemTableIterator : public InternalIterator { |
367 | | public: |
368 | | MemTableIterator(const MemTable& mem, const ReadOptions& read_options, |
369 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, |
370 | | Arena* arena, bool use_range_del_table = false) |
371 | | : bloom_(nullptr), |
372 | | prefix_extractor_(mem.prefix_extractor_), |
373 | | comparator_(mem.comparator_), |
374 | | seqno_to_time_mapping_(seqno_to_time_mapping), |
375 | | status_(Status::OK()), |
376 | | logger_(mem.moptions_.info_log), |
377 | | ts_sz_(mem.ts_sz_), |
378 | | protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key), |
379 | | valid_(false), |
380 | | value_pinned_( |
381 | | !mem.GetImmutableMemTableOptions()->inplace_update_support), |
382 | | arena_mode_(arena != nullptr), |
383 | | paranoid_memory_checks_(mem.moptions_.paranoid_memory_checks), |
384 | 14.8k | allow_data_in_error(mem.moptions_.allow_data_in_errors) { |
385 | 14.8k | if (use_range_del_table) { |
386 | 3.12k | iter_ = mem.range_del_table_->GetIterator(arena); |
387 | 11.7k | } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek && |
388 | 11.7k | !read_options.auto_prefix_mode) { |
389 | | // Auto prefix mode is not implemented in memtable yet. |
390 | 0 | bloom_ = mem.bloom_filter_.get(); |
391 | 0 | iter_ = mem.table_->GetDynamicPrefixIterator(arena); |
392 | 11.7k | } else { |
393 | 11.7k | iter_ = mem.table_->GetIterator(arena); |
394 | 11.7k | } |
395 | 14.8k | status_.PermitUncheckedError(); |
396 | 14.8k | } |
397 | | // No copying allowed |
398 | | MemTableIterator(const MemTableIterator&) = delete; |
399 | | void operator=(const MemTableIterator&) = delete; |
400 | | |
401 | 14.8k | ~MemTableIterator() override { |
402 | | #ifndef NDEBUG |
403 | | // Assert that the MemTableIterator is never deleted while |
404 | | // Pinning is Enabled. |
405 | | assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled()); |
406 | | #endif |
407 | 14.8k | if (arena_mode_) { |
408 | 11.7k | iter_->~Iterator(); |
409 | 11.7k | } else { |
410 | 3.12k | delete iter_; |
411 | 3.12k | } |
412 | 14.8k | status_.PermitUncheckedError(); |
413 | 14.8k | } |
414 | | |
415 | | #ifndef NDEBUG |
416 | | void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { |
417 | | pinned_iters_mgr_ = pinned_iters_mgr; |
418 | | } |
419 | | PinnedIteratorsManager* pinned_iters_mgr_ = nullptr; |
420 | | #endif |
421 | | |
422 | 1.53M | bool Valid() const override { |
423 | | // If inner iter_ is not valid, then this iter should also not be valid. |
424 | 1.53M | assert(iter_->Valid() || !(valid_ && status_.ok())); |
425 | 1.53M | return valid_ && status_.ok(); |
426 | 1.53M | } |
427 | | |
428 | 381 | void Seek(const Slice& k) override { |
429 | 381 | PERF_TIMER_GUARD(seek_on_memtable_time); |
430 | 381 | PERF_COUNTER_ADD(seek_on_memtable_count, 1); |
431 | 381 | status_ = Status::OK(); |
432 | 381 | if (bloom_) { |
433 | | // iterator should only use prefix bloom filter |
434 | 0 | Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_)); |
435 | 0 | if (prefix_extractor_->InDomain(user_k_without_ts)) { |
436 | 0 | if (!bloom_->MayContain( |
437 | 0 | prefix_extractor_->Transform(user_k_without_ts))) { |
438 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
439 | 0 | valid_ = false; |
440 | 0 | return; |
441 | 0 | } else { |
442 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
443 | 0 | } |
444 | 0 | } |
445 | 0 | } |
446 | 381 | if (paranoid_memory_checks_) { |
447 | 0 | status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error); |
448 | 381 | } else { |
449 | 381 | iter_->Seek(k, nullptr); |
450 | 381 | } |
451 | 381 | valid_ = iter_->Valid(); |
452 | 381 | VerifyEntryChecksum(); |
453 | 381 | } |
454 | 489 | void SeekForPrev(const Slice& k) override { |
455 | 489 | PERF_TIMER_GUARD(seek_on_memtable_time); |
456 | 489 | PERF_COUNTER_ADD(seek_on_memtable_count, 1); |
457 | 489 | status_ = Status::OK(); |
458 | 489 | if (bloom_) { |
459 | 0 | Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_)); |
460 | 0 | if (prefix_extractor_->InDomain(user_k_without_ts)) { |
461 | 0 | if (!bloom_->MayContain( |
462 | 0 | prefix_extractor_->Transform(user_k_without_ts))) { |
463 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
464 | 0 | valid_ = false; |
465 | 0 | return; |
466 | 0 | } else { |
467 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
468 | 0 | } |
469 | 0 | } |
470 | 0 | } |
471 | 489 | if (paranoid_memory_checks_) { |
472 | 0 | status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error); |
473 | 489 | } else { |
474 | 489 | iter_->Seek(k, nullptr); |
475 | 489 | } |
476 | 489 | valid_ = iter_->Valid(); |
477 | 489 | VerifyEntryChecksum(); |
478 | 489 | if (!Valid() && status().ok()) { |
479 | 467 | SeekToLast(); |
480 | 467 | } |
481 | 511 | while (Valid() && comparator_.comparator.Compare(k, key()) < 0) { |
482 | 22 | Prev(); |
483 | 22 | } |
484 | 489 | } |
485 | 16.7k | void SeekToFirst() override { |
486 | 16.7k | status_ = Status::OK(); |
487 | 16.7k | iter_->SeekToFirst(); |
488 | 16.7k | valid_ = iter_->Valid(); |
489 | 16.7k | VerifyEntryChecksum(); |
490 | 16.7k | } |
491 | 467 | void SeekToLast() override { |
492 | 467 | status_ = Status::OK(); |
493 | 467 | iter_->SeekToLast(); |
494 | 467 | valid_ = iter_->Valid(); |
495 | 467 | VerifyEntryChecksum(); |
496 | 467 | } |
497 | 1.46M | void Next() override { |
498 | 1.46M | PERF_COUNTER_ADD(next_on_memtable_count, 1); |
499 | 1.46M | assert(Valid()); |
500 | 1.46M | if (paranoid_memory_checks_) { |
501 | 0 | status_ = iter_->NextAndValidate(allow_data_in_error); |
502 | 1.46M | } else { |
503 | 1.46M | iter_->Next(); |
504 | 1.46M | TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_); |
505 | 1.46M | } |
506 | 1.46M | valid_ = iter_->Valid(); |
507 | 1.46M | VerifyEntryChecksum(); |
508 | 1.46M | } |
509 | 189 | bool NextAndGetResult(IterateResult* result) override { |
510 | 189 | Next(); |
511 | 189 | bool is_valid = Valid(); |
512 | 189 | if (is_valid) { |
513 | 105 | result->key = key(); |
514 | 105 | result->bound_check_result = IterBoundCheck::kUnknown; |
515 | 105 | result->value_prepared = true; |
516 | 105 | } |
517 | 189 | return is_valid; |
518 | 189 | } |
519 | 582 | void Prev() override { |
520 | 582 | PERF_COUNTER_ADD(prev_on_memtable_count, 1); |
521 | 582 | assert(Valid()); |
522 | 582 | if (paranoid_memory_checks_) { |
523 | 0 | status_ = iter_->PrevAndValidate(allow_data_in_error); |
524 | 582 | } else { |
525 | 582 | iter_->Prev(); |
526 | 582 | } |
527 | 582 | valid_ = iter_->Valid(); |
528 | 582 | VerifyEntryChecksum(); |
529 | 582 | } |
530 | 2.21M | Slice key() const override { |
531 | 2.21M | assert(Valid()); |
532 | 2.21M | return GetLengthPrefixedSlice(iter_->key()); |
533 | 2.21M | } |
534 | | |
535 | 411 | uint64_t write_unix_time() const override { |
536 | 411 | assert(Valid()); |
537 | 411 | ParsedInternalKey pikey; |
538 | 411 | Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false); |
539 | 411 | if (!s.ok()) { |
540 | 0 | return std::numeric_limits<uint64_t>::max(); |
541 | 411 | } else if (kTypeValuePreferredSeqno == pikey.type) { |
542 | 0 | return ParsePackedValueForWriteTime(value()); |
543 | 411 | } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) { |
544 | 411 | return std::numeric_limits<uint64_t>::max(); |
545 | 411 | } |
546 | 0 | return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence); |
547 | 411 | } |
548 | | |
549 | 1.46M | Slice value() const override { |
550 | 1.46M | assert(Valid()); |
551 | 1.46M | Slice key_slice = GetLengthPrefixedSlice(iter_->key()); |
552 | 1.46M | return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); |
553 | 1.46M | } |
554 | | |
555 | 17.9k | Status status() const override { return status_; } |
556 | | |
557 | 750k | bool IsKeyPinned() const override { |
558 | | // memtable data is always pinned |
559 | 750k | return true; |
560 | 750k | } |
561 | | |
562 | 375k | bool IsValuePinned() const override { |
563 | | // memtable value is always pinned, except if we allow inplace update. |
564 | 375k | return value_pinned_; |
565 | 375k | } |
566 | | |
567 | | private: |
568 | | DynamicBloom* bloom_; |
569 | | const SliceTransform* const prefix_extractor_; |
570 | | const MemTable::KeyComparator comparator_; |
571 | | MemTableRep::Iterator* iter_; |
572 | | // The seqno to time mapping is owned by the SuperVersion. |
573 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping_; |
574 | | Status status_; |
575 | | Logger* logger_; |
576 | | size_t ts_sz_; |
577 | | uint32_t protection_bytes_per_key_; |
578 | | bool valid_; |
579 | | bool value_pinned_; |
580 | | bool arena_mode_; |
581 | | const bool paranoid_memory_checks_; |
582 | | const bool allow_data_in_error; |
583 | | |
584 | 1.48M | void VerifyEntryChecksum() { |
585 | 1.48M | if (protection_bytes_per_key_ > 0 && Valid()) { |
586 | 0 | status_ = MemTable::VerifyEntryChecksum(iter_->key(), |
587 | 0 | protection_bytes_per_key_); |
588 | 0 | if (!status_.ok()) { |
589 | 0 | ROCKS_LOG_ERROR(logger_, "In MemtableIterator: %s", status_.getState()); |
590 | 0 | } |
591 | 0 | } |
592 | 1.48M | } |
593 | | }; |
594 | | |
595 | | InternalIterator* MemTable::NewIterator( |
596 | | const ReadOptions& read_options, |
597 | 11.7k | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena) { |
598 | 11.7k | assert(arena != nullptr); |
599 | 11.7k | auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); |
600 | 11.7k | return new (mem) |
601 | 11.7k | MemTableIterator(*this, read_options, seqno_to_time_mapping, arena); |
602 | 11.7k | } |
603 | | |
604 | | FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator( |
605 | | const ReadOptions& read_options, SequenceNumber read_seq, |
606 | 12.1k | bool immutable_memtable) { |
607 | 12.1k | if (read_options.ignore_range_deletions || |
608 | 12.1k | is_range_del_table_empty_.load(std::memory_order_relaxed)) { |
609 | 9.00k | return nullptr; |
610 | 9.00k | } |
611 | 3.12k | return NewRangeTombstoneIteratorInternal(read_options, read_seq, |
612 | 3.12k | immutable_memtable); |
613 | 12.1k | } |
614 | | |
615 | | FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal( |
616 | | const ReadOptions& read_options, SequenceNumber read_seq, |
617 | 3.12k | bool immutable_memtable) { |
618 | 3.12k | if (immutable_memtable) { |
619 | | // Note that caller should already have verified that |
620 | | // !is_range_del_table_empty_ |
621 | 0 | assert(IsFragmentedRangeTombstonesConstructed()); |
622 | 0 | return new FragmentedRangeTombstoneIterator( |
623 | 0 | fragmented_range_tombstone_list_.get(), comparator_.comparator, |
624 | 0 | read_seq, read_options.timestamp); |
625 | 0 | } |
626 | | |
627 | | // takes current cache |
628 | 3.12k | std::shared_ptr<FragmentedRangeTombstoneListCache> cache = |
629 | 3.12k | std::atomic_load_explicit(cached_range_tombstone_.Access(), |
630 | 3.12k | std::memory_order_relaxed); |
631 | | // construct fragmented tombstone list if necessary |
632 | 3.12k | if (!cache->initialized.load(std::memory_order_acquire)) { |
633 | 3.12k | cache->reader_mutex.lock(); |
634 | 3.12k | if (!cache->tombstones) { |
635 | 3.12k | auto* unfragmented_iter = new MemTableIterator( |
636 | 3.12k | *this, read_options, nullptr /* seqno_to_time_mapping= */, |
637 | 3.12k | nullptr /* arena */, true /* use_range_del_table */); |
638 | 3.12k | cache->tombstones.reset(new FragmentedRangeTombstoneList( |
639 | 3.12k | std::unique_ptr<InternalIterator>(unfragmented_iter), |
640 | 3.12k | comparator_.comparator)); |
641 | 3.12k | cache->initialized.store(true, std::memory_order_release); |
642 | 3.12k | } |
643 | 3.12k | cache->reader_mutex.unlock(); |
644 | 3.12k | } |
645 | | |
646 | 3.12k | auto* fragmented_iter = new FragmentedRangeTombstoneIterator( |
647 | 3.12k | cache, comparator_.comparator, read_seq, read_options.timestamp); |
648 | 3.12k | return fragmented_iter; |
649 | 3.12k | } |
650 | | |
651 | 133 | void MemTable::ConstructFragmentedRangeTombstones() { |
652 | | // There should be no concurrent Construction. |
653 | | // We could also check fragmented_range_tombstone_list_ to avoid repeate |
654 | | // constructions. We just construct them here again to be safe. |
655 | 133 | if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) { |
656 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
657 | 0 | auto* unfragmented_iter = new MemTableIterator( |
658 | 0 | *this, ReadOptions(), nullptr /*seqno_to_time_mapping=*/, |
659 | 0 | nullptr /* arena */, true /* use_range_del_table */); |
660 | |
|
661 | 0 | fragmented_range_tombstone_list_ = |
662 | 0 | std::make_unique<FragmentedRangeTombstoneList>( |
663 | 0 | std::unique_ptr<InternalIterator>(unfragmented_iter), |
664 | 0 | comparator_.comparator); |
665 | 0 | } |
666 | 133 | } |
667 | | |
668 | 0 | port::RWMutex* MemTable::GetLock(const Slice& key) { |
669 | 0 | return &locks_[GetSliceRangedNPHash(key, locks_.size())]; |
670 | 0 | } |
671 | | |
672 | | MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey, |
673 | 0 | const Slice& end_ikey) { |
674 | 0 | uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey); |
675 | 0 | entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey); |
676 | 0 | if (entry_count == 0) { |
677 | 0 | return {0, 0}; |
678 | 0 | } |
679 | 0 | uint64_t n = num_entries_.load(std::memory_order_relaxed); |
680 | 0 | if (n == 0) { |
681 | 0 | return {0, 0}; |
682 | 0 | } |
683 | 0 | if (entry_count > n) { |
684 | | // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can |
685 | | // be larger than actual entries we have. Cap it to entries we have to limit |
686 | | // the inaccuracy. |
687 | 0 | entry_count = n; |
688 | 0 | } |
689 | 0 | uint64_t data_size = data_size_.load(std::memory_order_relaxed); |
690 | 0 | return {entry_count * (data_size / n), entry_count}; |
691 | 0 | } |
692 | | |
693 | | Status MemTable::VerifyEncodedEntry(Slice encoded, |
694 | 1.09M | const ProtectionInfoKVOS64& kv_prot_info) { |
695 | 1.09M | uint32_t ikey_len = 0; |
696 | 1.09M | if (!GetVarint32(&encoded, &ikey_len)) { |
697 | 0 | return Status::Corruption("Unable to parse internal key length"); |
698 | 0 | } |
699 | 1.09M | if (ikey_len < 8 + ts_sz_) { |
700 | 0 | return Status::Corruption("Internal key length too short"); |
701 | 0 | } |
702 | 1.09M | if (ikey_len > encoded.size()) { |
703 | 0 | return Status::Corruption("Internal key length too long"); |
704 | 0 | } |
705 | 1.09M | uint32_t value_len = 0; |
706 | 1.09M | const size_t user_key_len = ikey_len - 8; |
707 | 1.09M | Slice key(encoded.data(), user_key_len); |
708 | 1.09M | encoded.remove_prefix(user_key_len); |
709 | | |
710 | 1.09M | uint64_t packed = DecodeFixed64(encoded.data()); |
711 | 1.09M | ValueType value_type = kMaxValue; |
712 | 1.09M | SequenceNumber sequence_number = kMaxSequenceNumber; |
713 | 1.09M | UnPackSequenceAndType(packed, &sequence_number, &value_type); |
714 | 1.09M | encoded.remove_prefix(8); |
715 | | |
716 | 1.09M | if (!GetVarint32(&encoded, &value_len)) { |
717 | 0 | return Status::Corruption("Unable to parse value length"); |
718 | 0 | } |
719 | 1.09M | if (value_len < encoded.size()) { |
720 | 0 | return Status::Corruption("Value length too short"); |
721 | 0 | } |
722 | 1.09M | if (value_len > encoded.size()) { |
723 | 0 | return Status::Corruption("Value length too long"); |
724 | 0 | } |
725 | 1.09M | Slice value(encoded.data(), value_len); |
726 | | |
727 | 1.09M | return kv_prot_info.StripS(sequence_number) |
728 | 1.09M | .StripKVO(key, value, value_type) |
729 | 1.09M | .GetStatus(); |
730 | 1.09M | } |
731 | | |
732 | | void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info, |
733 | | const Slice& key, const Slice& value, |
734 | | ValueType type, SequenceNumber s, |
735 | 2.18M | char* checksum_ptr) { |
736 | 2.18M | if (moptions_.protection_bytes_per_key == 0) { |
737 | 2.18M | return; |
738 | 2.18M | } |
739 | | |
740 | 0 | if (kv_prot_info == nullptr) { |
741 | 0 | ProtectionInfo64() |
742 | 0 | .ProtectKVO(key, value, type) |
743 | 0 | .ProtectS(s) |
744 | 0 | .Encode(static_cast<uint8_t>(moptions_.protection_bytes_per_key), |
745 | 0 | checksum_ptr); |
746 | 0 | } else { |
747 | 0 | kv_prot_info->Encode( |
748 | 0 | static_cast<uint8_t>(moptions_.protection_bytes_per_key), checksum_ptr); |
749 | 0 | } |
750 | 0 | } |
751 | | |
752 | | Status MemTable::Add(SequenceNumber s, ValueType type, |
753 | | const Slice& key, /* user key */ |
754 | | const Slice& value, |
755 | | const ProtectionInfoKVOS64* kv_prot_info, |
756 | | bool allow_concurrent, |
757 | 2.18M | MemTablePostProcessInfo* post_process_info, void** hint) { |
758 | | // Format of an entry is concatenation of: |
759 | | // key_size : varint32 of internal_key.size() |
760 | | // key bytes : char[internal_key.size()] |
761 | | // value_size : varint32 of value.size() |
762 | | // value bytes : char[value.size()] |
763 | | // checksum : char[moptions_.protection_bytes_per_key] |
764 | 2.18M | uint32_t key_size = static_cast<uint32_t>(key.size()); |
765 | 2.18M | uint32_t val_size = static_cast<uint32_t>(value.size()); |
766 | 2.18M | uint32_t internal_key_size = key_size + 8; |
767 | 2.18M | const uint32_t encoded_len = VarintLength(internal_key_size) + |
768 | 2.18M | internal_key_size + VarintLength(val_size) + |
769 | 2.18M | val_size + moptions_.protection_bytes_per_key; |
770 | 2.18M | char* buf = nullptr; |
771 | 2.18M | std::unique_ptr<MemTableRep>& table = |
772 | 2.18M | type == kTypeRangeDeletion ? range_del_table_ : table_; |
773 | 2.18M | KeyHandle handle = table->Allocate(encoded_len, &buf); |
774 | | |
775 | 2.18M | char* p = EncodeVarint32(buf, internal_key_size); |
776 | 2.18M | memcpy(p, key.data(), key_size); |
777 | 2.18M | Slice key_slice(p, key_size); |
778 | 2.18M | p += key_size; |
779 | 2.18M | uint64_t packed = PackSequenceAndType(s, type); |
780 | 2.18M | EncodeFixed64(p, packed); |
781 | 2.18M | p += 8; |
782 | 2.18M | p = EncodeVarint32(p, val_size); |
783 | 2.18M | memcpy(p, value.data(), val_size); |
784 | 2.18M | assert((unsigned)(p + val_size - buf + moptions_.protection_bytes_per_key) == |
785 | 2.18M | (unsigned)encoded_len); |
786 | | |
787 | 2.18M | UpdateEntryChecksum(kv_prot_info, key, value, type, s, |
788 | 2.18M | buf + encoded_len - moptions_.protection_bytes_per_key); |
789 | 2.18M | Slice encoded(buf, encoded_len - moptions_.protection_bytes_per_key); |
790 | 2.18M | if (kv_prot_info != nullptr) { |
791 | 1.09M | TEST_SYNC_POINT_CALLBACK("MemTable::Add:Encoded", &encoded); |
792 | 1.09M | Status status = VerifyEncodedEntry(encoded, *kv_prot_info); |
793 | 1.09M | if (!status.ok()) { |
794 | 0 | return status; |
795 | 0 | } |
796 | 1.09M | } |
797 | | |
798 | 2.18M | Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_); |
799 | | |
800 | 2.18M | if (!allow_concurrent) { |
801 | | // Extract prefix for insert with hint. Hints are for point key table |
802 | | // (`table_`) only, not `range_del_table_`. |
803 | 2.18M | if (table == table_ && insert_with_hint_prefix_extractor_ != nullptr && |
804 | 2.18M | insert_with_hint_prefix_extractor_->InDomain(key_slice)) { |
805 | 0 | Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); |
806 | 0 | bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]); |
807 | 0 | if (UNLIKELY(!res)) { |
808 | 0 | return Status::TryAgain("key+seq exists"); |
809 | 0 | } |
810 | 2.18M | } else { |
811 | 2.18M | bool res = table->InsertKey(handle); |
812 | 2.18M | if (UNLIKELY(!res)) { |
813 | 0 | return Status::TryAgain("key+seq exists"); |
814 | 0 | } |
815 | 2.18M | } |
816 | | |
817 | | // this is a bit ugly, but is the way to avoid locked instructions |
818 | | // when incrementing an atomic |
819 | 2.18M | num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, |
820 | 2.18M | std::memory_order_relaxed); |
821 | 2.18M | data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, |
822 | 2.18M | std::memory_order_relaxed); |
823 | 2.18M | if (type == kTypeDeletion || type == kTypeSingleDeletion || |
824 | 2.18M | type == kTypeDeletionWithTimestamp) { |
825 | 256k | num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1, |
826 | 256k | std::memory_order_relaxed); |
827 | 1.92M | } else if (type == kTypeRangeDeletion) { |
828 | 750k | uint64_t val = num_range_deletes_.load(std::memory_order_relaxed) + 1; |
829 | 750k | num_range_deletes_.store(val, std::memory_order_relaxed); |
830 | 750k | } |
831 | | |
832 | 2.18M | if (bloom_filter_ && prefix_extractor_ && |
833 | 2.18M | prefix_extractor_->InDomain(key_without_ts)) { |
834 | 0 | bloom_filter_->Add(prefix_extractor_->Transform(key_without_ts)); |
835 | 0 | } |
836 | 2.18M | if (bloom_filter_ && moptions_.memtable_whole_key_filtering) { |
837 | 0 | bloom_filter_->Add(key_without_ts); |
838 | 0 | } |
839 | | |
840 | | // The first sequence number inserted into the memtable |
841 | 2.18M | assert(first_seqno_ == 0 || s >= first_seqno_); |
842 | 2.18M | if (first_seqno_ == 0) { |
843 | 10.9k | first_seqno_.store(s, std::memory_order_relaxed); |
844 | | |
845 | 10.9k | if (earliest_seqno_ == kMaxSequenceNumber) { |
846 | 0 | earliest_seqno_.store(GetFirstSequenceNumber(), |
847 | 0 | std::memory_order_relaxed); |
848 | 0 | } |
849 | 10.9k | assert(first_seqno_.load() >= earliest_seqno_.load()); |
850 | 10.9k | } |
851 | 2.18M | assert(post_process_info == nullptr); |
852 | | // TODO(yuzhangyu): support updating newest UDT for when `allow_concurrent` |
853 | | // is true. |
854 | 2.18M | MaybeUpdateNewestUDT(key_slice); |
855 | 2.18M | UpdateFlushState(); |
856 | 2.18M | } else { |
857 | 0 | bool res = (hint == nullptr) |
858 | 0 | ? table->InsertKeyConcurrently(handle) |
859 | 0 | : table->InsertKeyWithHintConcurrently(handle, hint); |
860 | 0 | if (UNLIKELY(!res)) { |
861 | 0 | return Status::TryAgain("key+seq exists"); |
862 | 0 | } |
863 | | |
864 | 0 | assert(post_process_info != nullptr); |
865 | 0 | post_process_info->num_entries++; |
866 | 0 | post_process_info->data_size += encoded_len; |
867 | 0 | if (type == kTypeDeletion) { |
868 | 0 | post_process_info->num_deletes++; |
869 | 0 | } |
870 | |
|
871 | 0 | if (bloom_filter_ && prefix_extractor_ && |
872 | 0 | prefix_extractor_->InDomain(key_without_ts)) { |
873 | 0 | bloom_filter_->AddConcurrently( |
874 | 0 | prefix_extractor_->Transform(key_without_ts)); |
875 | 0 | } |
876 | 0 | if (bloom_filter_ && moptions_.memtable_whole_key_filtering) { |
877 | 0 | bloom_filter_->AddConcurrently(key_without_ts); |
878 | 0 | } |
879 | | |
880 | | // atomically update first_seqno_ and earliest_seqno_. |
881 | 0 | uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); |
882 | 0 | while ((cur_seq_num == 0 || s < cur_seq_num) && |
883 | 0 | !first_seqno_.compare_exchange_weak(cur_seq_num, s)) { |
884 | 0 | } |
885 | 0 | uint64_t cur_earliest_seqno = |
886 | 0 | earliest_seqno_.load(std::memory_order_relaxed); |
887 | 0 | while ( |
888 | 0 | (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) && |
889 | 0 | !earliest_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { |
890 | 0 | } |
891 | 0 | } |
892 | 2.18M | if (type == kTypeRangeDeletion) { |
893 | 750k | auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>(); |
894 | 750k | size_t size = cached_range_tombstone_.Size(); |
895 | 750k | if (allow_concurrent) { |
896 | 0 | post_process_info->num_range_deletes++; |
897 | 0 | range_del_mutex_.lock(); |
898 | 0 | } |
899 | 24.7M | for (size_t i = 0; i < size; ++i) { |
900 | 24.0M | std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr = |
901 | 24.0M | cached_range_tombstone_.AccessAtCore(i); |
902 | 24.0M | auto new_local_cache_ref = std::make_shared< |
903 | 24.0M | const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache); |
904 | | // It is okay for some reader to load old cache during invalidation as |
905 | | // the new sequence number is not published yet. |
906 | | // Each core will have a shared_ptr to a shared_ptr to the cached |
907 | | // fragmented range tombstones, so that ref count is maintianed locally |
908 | | // per-core using the per-core shared_ptr. |
909 | 24.0M | std::atomic_store_explicit( |
910 | 24.0M | local_cache_ref_ptr, |
911 | 24.0M | std::shared_ptr<FragmentedRangeTombstoneListCache>( |
912 | 24.0M | new_local_cache_ref, new_cache.get()), |
913 | 24.0M | std::memory_order_relaxed); |
914 | 24.0M | } |
915 | | |
916 | 750k | if (allow_concurrent) { |
917 | 0 | range_del_mutex_.unlock(); |
918 | 0 | } |
919 | 750k | is_range_del_table_empty_.store(false, std::memory_order_relaxed); |
920 | 750k | } |
921 | 2.18M | UpdateOldestKeyTime(); |
922 | | |
923 | 2.18M | TEST_SYNC_POINT_CALLBACK("MemTable::Add:BeforeReturn:Encoded", &encoded); |
924 | 2.18M | return Status::OK(); |
925 | 2.18M | } |
926 | | |
927 | | // Callback from MemTable::Get() |
928 | | namespace { |
929 | | |
930 | | struct Saver { |
931 | | Status* status; |
932 | | const LookupKey* key; |
933 | | bool* found_final_value; // Is value set correctly? Used by KeyMayExist |
934 | | bool* merge_in_progress; |
935 | | std::string* value; |
936 | | PinnableWideColumns* columns; |
937 | | SequenceNumber seq; |
938 | | std::string* timestamp; |
939 | | const MergeOperator* merge_operator; |
940 | | // the merge operations encountered; |
941 | | MergeContext* merge_context; |
942 | | SequenceNumber max_covering_tombstone_seq; |
943 | | MemTable* mem; |
944 | | Logger* logger; |
945 | | Statistics* statistics; |
946 | | bool inplace_update_support; |
947 | | bool do_merge; |
948 | | SystemClock* clock; |
949 | | |
950 | | ReadCallback* callback_; |
951 | | bool* is_blob_index; |
952 | | bool allow_data_in_errors; |
953 | | uint32_t protection_bytes_per_key; |
954 | 399 | bool CheckCallback(SequenceNumber _seq) { |
955 | 399 | if (callback_) { |
956 | 0 | return callback_->IsVisible(_seq); |
957 | 0 | } |
958 | 399 | return true; |
959 | 399 | } |
960 | | }; |
961 | | } // anonymous namespace |
962 | | |
963 | 429 | static bool SaveValue(void* arg, const char* entry) { |
964 | 429 | Saver* s = static_cast<Saver*>(arg); |
965 | 429 | assert(s != nullptr); |
966 | 429 | assert(!s->value || !s->columns); |
967 | 429 | assert(!*(s->found_final_value)); |
968 | 429 | assert(s->status->ok() || s->status->IsMergeInProgress()); |
969 | | |
970 | 429 | MergeContext* merge_context = s->merge_context; |
971 | 429 | SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq; |
972 | 429 | const MergeOperator* merge_operator = s->merge_operator; |
973 | | |
974 | 429 | assert(merge_context != nullptr); |
975 | | |
976 | | // Refer to comments under MemTable::Add() for entry format. |
977 | | // Check that it belongs to same user key. |
978 | 429 | uint32_t key_length = 0; |
979 | 429 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
980 | 429 | assert(key_length >= 8); |
981 | 429 | Slice user_key_slice = Slice(key_ptr, key_length - 8); |
982 | 429 | const Comparator* user_comparator = |
983 | 429 | s->mem->GetInternalKeyComparator().user_comparator(); |
984 | 429 | size_t ts_sz = user_comparator->timestamp_size(); |
985 | 429 | if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) { |
986 | | // timestamp should already be set to range tombstone timestamp |
987 | 0 | assert(s->timestamp->size() == ts_sz); |
988 | 0 | } |
989 | 429 | if (user_comparator->EqualWithoutTimestamp(user_key_slice, |
990 | 429 | s->key->user_key())) { |
991 | | // Correct user key |
992 | 399 | TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Found:entry", &entry); |
993 | 399 | std::optional<ReadLock> read_lock; |
994 | 399 | if (s->inplace_update_support) { |
995 | 0 | read_lock.emplace(s->mem->GetLock(s->key->user_key())); |
996 | 0 | } |
997 | | |
998 | 399 | if (s->protection_bytes_per_key > 0) { |
999 | 0 | *(s->status) = MemTable::VerifyEntryChecksum( |
1000 | 0 | entry, s->protection_bytes_per_key, s->allow_data_in_errors); |
1001 | 0 | if (!s->status->ok()) { |
1002 | 0 | *(s->found_final_value) = true; |
1003 | 0 | ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState()); |
1004 | | // Memtable entry corrupted |
1005 | 0 | return false; |
1006 | 0 | } |
1007 | 0 | } |
1008 | | |
1009 | 399 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
1010 | 399 | ValueType type; |
1011 | 399 | SequenceNumber seq; |
1012 | 399 | UnPackSequenceAndType(tag, &seq, &type); |
1013 | | // If the value is not in the snapshot, skip it |
1014 | 399 | if (!s->CheckCallback(seq)) { |
1015 | 0 | return true; // to continue to the next seq |
1016 | 0 | } |
1017 | | |
1018 | 399 | if (s->seq == kMaxSequenceNumber) { |
1019 | 399 | s->seq = seq; |
1020 | 399 | if (s->seq > max_covering_tombstone_seq) { |
1021 | 399 | if (ts_sz && s->timestamp != nullptr) { |
1022 | | // `timestamp` was set to range tombstone's timestamp before |
1023 | | // `SaveValue` is ever called. This key has a higher sequence number |
1024 | | // than range tombstone, and is the key with the highest seqno across |
1025 | | // all keys with this user_key, so we update timestamp here. |
1026 | 0 | Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); |
1027 | 0 | s->timestamp->assign(ts.data(), ts_sz); |
1028 | 0 | } |
1029 | 399 | } else { |
1030 | 0 | s->seq = max_covering_tombstone_seq; |
1031 | 0 | } |
1032 | 399 | } |
1033 | | |
1034 | 399 | if (ts_sz > 0 && s->timestamp != nullptr) { |
1035 | 0 | if (!s->timestamp->empty()) { |
1036 | 0 | assert(ts_sz == s->timestamp->size()); |
1037 | 0 | } |
1038 | | // TODO optimize for smaller size ts |
1039 | 0 | const std::string kMaxTs(ts_sz, '\xff'); |
1040 | 0 | if (s->timestamp->empty() || |
1041 | 0 | user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) { |
1042 | 0 | Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); |
1043 | 0 | s->timestamp->assign(ts.data(), ts_sz); |
1044 | 0 | } |
1045 | 0 | } |
1046 | | |
1047 | 399 | if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || |
1048 | 399 | type == kTypeWideColumnEntity || type == kTypeDeletion || |
1049 | 399 | type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp || |
1050 | 399 | type == kTypeValuePreferredSeqno) && |
1051 | 399 | max_covering_tombstone_seq > seq) { |
1052 | 0 | type = kTypeRangeDeletion; |
1053 | 0 | } |
1054 | 399 | switch (type) { |
1055 | 0 | case kTypeBlobIndex: { |
1056 | 0 | if (!s->do_merge) { |
1057 | 0 | *(s->status) = Status::NotSupported( |
1058 | 0 | "GetMergeOperands not supported by stacked BlobDB"); |
1059 | 0 | *(s->found_final_value) = true; |
1060 | 0 | return false; |
1061 | 0 | } |
1062 | | |
1063 | 0 | if (*(s->merge_in_progress)) { |
1064 | 0 | *(s->status) = Status::NotSupported( |
1065 | 0 | "Merge operator not supported by stacked BlobDB"); |
1066 | 0 | *(s->found_final_value) = true; |
1067 | 0 | return false; |
1068 | 0 | } |
1069 | | |
1070 | 0 | if (s->is_blob_index == nullptr) { |
1071 | 0 | ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index."); |
1072 | 0 | *(s->status) = Status::NotSupported( |
1073 | 0 | "Encountered unexpected blob index. Please open DB with " |
1074 | 0 | "ROCKSDB_NAMESPACE::blob_db::BlobDB."); |
1075 | 0 | *(s->found_final_value) = true; |
1076 | 0 | return false; |
1077 | 0 | } |
1078 | | |
1079 | 0 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1080 | |
|
1081 | 0 | *(s->status) = Status::OK(); |
1082 | |
|
1083 | 0 | if (s->value) { |
1084 | 0 | s->value->assign(v.data(), v.size()); |
1085 | 0 | } else if (s->columns) { |
1086 | 0 | s->columns->SetPlainValue(v); |
1087 | 0 | } |
1088 | |
|
1089 | 0 | *(s->found_final_value) = true; |
1090 | 0 | *(s->is_blob_index) = true; |
1091 | |
|
1092 | 0 | return false; |
1093 | 0 | } |
1094 | 385 | case kTypeValue: |
1095 | 385 | case kTypeValuePreferredSeqno: { |
1096 | 385 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1097 | | |
1098 | 385 | if (type == kTypeValuePreferredSeqno) { |
1099 | 0 | v = ParsePackedValueForValue(v); |
1100 | 0 | } |
1101 | | |
1102 | 385 | *(s->status) = Status::OK(); |
1103 | | |
1104 | 385 | if (!s->do_merge) { |
1105 | | // Preserve the value with the goal of returning it as part of |
1106 | | // raw merge operands to the user |
1107 | | // TODO(yanqin) update MergeContext so that timestamps information |
1108 | | // can also be retained. |
1109 | |
|
1110 | 0 | merge_context->PushOperand( |
1111 | 0 | v, s->inplace_update_support == false /* operand_pinned */); |
1112 | 385 | } else if (*(s->merge_in_progress)) { |
1113 | 0 | assert(s->do_merge); |
1114 | |
|
1115 | 0 | if (s->value || s->columns) { |
1116 | | // `op_failure_scope` (an output parameter) is not provided (set to |
1117 | | // nullptr) since a failure must be propagated regardless of its |
1118 | | // value. |
1119 | 0 | *(s->status) = MergeHelper::TimedFullMerge( |
1120 | 0 | merge_operator, s->key->user_key(), |
1121 | 0 | MergeHelper::kPlainBaseValue, v, merge_context->GetOperands(), |
1122 | 0 | s->logger, s->statistics, s->clock, |
1123 | 0 | /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, |
1124 | 0 | s->value, s->columns); |
1125 | 0 | } |
1126 | 385 | } else if (s->value) { |
1127 | 385 | s->value->assign(v.data(), v.size()); |
1128 | 385 | } else if (s->columns) { |
1129 | 0 | s->columns->SetPlainValue(v); |
1130 | 0 | } |
1131 | | |
1132 | 385 | *(s->found_final_value) = true; |
1133 | | |
1134 | 385 | if (s->is_blob_index != nullptr) { |
1135 | 0 | *(s->is_blob_index) = false; |
1136 | 0 | } |
1137 | | |
1138 | 385 | return false; |
1139 | 385 | } |
1140 | 0 | case kTypeWideColumnEntity: { |
1141 | 0 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1142 | |
|
1143 | 0 | *(s->status) = Status::OK(); |
1144 | |
|
1145 | 0 | if (!s->do_merge) { |
1146 | | // Preserve the value with the goal of returning it as part of |
1147 | | // raw merge operands to the user |
1148 | |
|
1149 | 0 | Slice value_of_default; |
1150 | 0 | *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn( |
1151 | 0 | v, value_of_default); |
1152 | |
|
1153 | 0 | if (s->status->ok()) { |
1154 | 0 | merge_context->PushOperand( |
1155 | 0 | value_of_default, |
1156 | 0 | s->inplace_update_support == false /* operand_pinned */); |
1157 | 0 | } |
1158 | 0 | } else if (*(s->merge_in_progress)) { |
1159 | 0 | assert(s->do_merge); |
1160 | |
|
1161 | 0 | if (s->value || s->columns) { |
1162 | | // `op_failure_scope` (an output parameter) is not provided (set |
1163 | | // to nullptr) since a failure must be propagated regardless of |
1164 | | // its value. |
1165 | 0 | *(s->status) = MergeHelper::TimedFullMerge( |
1166 | 0 | merge_operator, s->key->user_key(), MergeHelper::kWideBaseValue, |
1167 | 0 | v, merge_context->GetOperands(), s->logger, s->statistics, |
1168 | 0 | s->clock, /* update_num_ops_stats */ true, |
1169 | 0 | /* op_failure_scope */ nullptr, s->value, s->columns); |
1170 | 0 | } |
1171 | 0 | } else if (s->value) { |
1172 | 0 | Slice value_of_default; |
1173 | 0 | *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn( |
1174 | 0 | v, value_of_default); |
1175 | 0 | if (s->status->ok()) { |
1176 | 0 | s->value->assign(value_of_default.data(), value_of_default.size()); |
1177 | 0 | } |
1178 | 0 | } else if (s->columns) { |
1179 | 0 | *(s->status) = s->columns->SetWideColumnValue(v); |
1180 | 0 | } |
1181 | |
|
1182 | 0 | *(s->found_final_value) = true; |
1183 | |
|
1184 | 0 | if (s->is_blob_index != nullptr) { |
1185 | 0 | *(s->is_blob_index) = false; |
1186 | 0 | } |
1187 | |
|
1188 | 0 | return false; |
1189 | 385 | } |
1190 | 14 | case kTypeDeletion: |
1191 | 14 | case kTypeDeletionWithTimestamp: |
1192 | 14 | case kTypeSingleDeletion: |
1193 | 14 | case kTypeRangeDeletion: { |
1194 | 14 | if (*(s->merge_in_progress)) { |
1195 | 0 | if (s->value || s->columns) { |
1196 | | // `op_failure_scope` (an output parameter) is not provided (set to |
1197 | | // nullptr) since a failure must be propagated regardless of its |
1198 | | // value. |
1199 | 0 | *(s->status) = MergeHelper::TimedFullMerge( |
1200 | 0 | merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue, |
1201 | 0 | merge_context->GetOperands(), s->logger, s->statistics, |
1202 | 0 | s->clock, /* update_num_ops_stats */ true, |
1203 | 0 | /* op_failure_scope */ nullptr, s->value, s->columns); |
1204 | 0 | } else { |
1205 | | // We have found a final value (a base deletion) and have newer |
1206 | | // merge operands that we do not intend to merge. Nothing remains |
1207 | | // to be done so assign status to OK. |
1208 | 0 | *(s->status) = Status::OK(); |
1209 | 0 | } |
1210 | 14 | } else { |
1211 | 14 | *(s->status) = Status::NotFound(); |
1212 | 14 | } |
1213 | 14 | *(s->found_final_value) = true; |
1214 | 14 | return false; |
1215 | 14 | } |
1216 | 0 | case kTypeMerge: { |
1217 | 0 | if (!merge_operator) { |
1218 | 0 | *(s->status) = Status::InvalidArgument( |
1219 | 0 | "merge_operator is not properly initialized."); |
1220 | | // Normally we continue the loop (return true) when we see a merge |
1221 | | // operand. But in case of an error, we should stop the loop |
1222 | | // immediately and pretend we have found the value to stop further |
1223 | | // seek. Otherwise, the later call will override this error status. |
1224 | 0 | *(s->found_final_value) = true; |
1225 | 0 | return false; |
1226 | 0 | } |
1227 | 0 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1228 | 0 | *(s->merge_in_progress) = true; |
1229 | 0 | merge_context->PushOperand( |
1230 | 0 | v, s->inplace_update_support == false /* operand_pinned */); |
1231 | 0 | PERF_COUNTER_ADD(internal_merge_point_lookup_count, 1); |
1232 | |
|
1233 | 0 | if (s->do_merge && merge_operator->ShouldMerge( |
1234 | 0 | merge_context->GetOperandsDirectionBackward())) { |
1235 | 0 | if (s->value || s->columns) { |
1236 | | // `op_failure_scope` (an output parameter) is not provided (set to |
1237 | | // nullptr) since a failure must be propagated regardless of its |
1238 | | // value. |
1239 | 0 | *(s->status) = MergeHelper::TimedFullMerge( |
1240 | 0 | merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue, |
1241 | 0 | merge_context->GetOperands(), s->logger, s->statistics, |
1242 | 0 | s->clock, /* update_num_ops_stats */ true, |
1243 | 0 | /* op_failure_scope */ nullptr, s->value, s->columns); |
1244 | 0 | } |
1245 | |
|
1246 | 0 | *(s->found_final_value) = true; |
1247 | 0 | return false; |
1248 | 0 | } |
1249 | 0 | if (merge_context->get_merge_operands_options != nullptr && |
1250 | 0 | merge_context->get_merge_operands_options->continue_cb != nullptr && |
1251 | 0 | !merge_context->get_merge_operands_options->continue_cb(v)) { |
1252 | | // We were told not to continue. |
1253 | 0 | *(s->found_final_value) = true; |
1254 | 0 | return false; |
1255 | 0 | } |
1256 | | |
1257 | 0 | return true; |
1258 | 0 | } |
1259 | 0 | default: { |
1260 | 0 | std::string msg("Corrupted value not expected."); |
1261 | 0 | if (s->allow_data_in_errors) { |
1262 | 0 | msg.append("Unrecognized value type: " + |
1263 | 0 | std::to_string(static_cast<int>(type)) + ". "); |
1264 | 0 | msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) + |
1265 | 0 | ". "); |
1266 | 0 | msg.append("seq: " + std::to_string(seq) + "."); |
1267 | 0 | } |
1268 | 0 | *(s->found_final_value) = true; |
1269 | 0 | *(s->status) = Status::Corruption(msg.c_str()); |
1270 | 0 | return false; |
1271 | 0 | } |
1272 | 399 | } |
1273 | 399 | } |
1274 | | |
1275 | | // s->state could be Corrupt, merge or notfound |
1276 | 30 | return false; |
1277 | 429 | } |
1278 | | |
1279 | | bool MemTable::Get(const LookupKey& key, std::string* value, |
1280 | | PinnableWideColumns* columns, std::string* timestamp, |
1281 | | Status* s, MergeContext* merge_context, |
1282 | | SequenceNumber* max_covering_tombstone_seq, |
1283 | | SequenceNumber* seq, const ReadOptions& read_opts, |
1284 | | bool immutable_memtable, ReadCallback* callback, |
1285 | 584 | bool* is_blob_index, bool do_merge) { |
1286 | | // The sequence number is updated synchronously in version_set.h |
1287 | 584 | if (IsEmpty()) { |
1288 | | // Avoiding recording stats for speed. |
1289 | 155 | return false; |
1290 | 155 | } |
1291 | | |
1292 | 429 | PERF_TIMER_GUARD(get_from_memtable_time); |
1293 | | |
1294 | 429 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
1295 | 429 | NewRangeTombstoneIterator(read_opts, |
1296 | 429 | GetInternalKeySeqno(key.internal_key()), |
1297 | 429 | immutable_memtable)); |
1298 | 429 | if (range_del_iter != nullptr) { |
1299 | 0 | SequenceNumber covering_seq = |
1300 | 0 | range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()); |
1301 | 0 | if (covering_seq > *max_covering_tombstone_seq) { |
1302 | 0 | *max_covering_tombstone_seq = covering_seq; |
1303 | 0 | if (timestamp) { |
1304 | | // Will be overwritten in SaveValue() if there is a point key with |
1305 | | // a higher seqno. |
1306 | 0 | timestamp->assign(range_del_iter->timestamp().data(), |
1307 | 0 | range_del_iter->timestamp().size()); |
1308 | 0 | } |
1309 | 0 | } |
1310 | 0 | } |
1311 | | |
1312 | 429 | bool found_final_value = false; |
1313 | 429 | bool merge_in_progress = s->IsMergeInProgress(); |
1314 | 429 | bool may_contain = true; |
1315 | 429 | Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz_); |
1316 | 429 | bool bloom_checked = false; |
1317 | 429 | if (bloom_filter_) { |
1318 | | // when both memtable_whole_key_filtering and prefix_extractor_ are set, |
1319 | | // only do whole key filtering for Get() to save CPU |
1320 | 0 | if (moptions_.memtable_whole_key_filtering) { |
1321 | 0 | may_contain = bloom_filter_->MayContain(user_key_without_ts); |
1322 | 0 | bloom_checked = true; |
1323 | 0 | } else { |
1324 | 0 | assert(prefix_extractor_); |
1325 | 0 | if (prefix_extractor_->InDomain(user_key_without_ts)) { |
1326 | 0 | may_contain = bloom_filter_->MayContain( |
1327 | 0 | prefix_extractor_->Transform(user_key_without_ts)); |
1328 | 0 | bloom_checked = true; |
1329 | 0 | } |
1330 | 0 | } |
1331 | 0 | } |
1332 | | |
1333 | 429 | if (bloom_filter_ && !may_contain) { |
1334 | | // iter is null if prefix bloom says the key does not exist |
1335 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
1336 | 0 | *seq = kMaxSequenceNumber; |
1337 | 429 | } else { |
1338 | 429 | if (bloom_checked) { |
1339 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
1340 | 0 | } |
1341 | 429 | GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback, |
1342 | 429 | is_blob_index, value, columns, timestamp, s, merge_context, |
1343 | 429 | seq, &found_final_value, &merge_in_progress); |
1344 | 429 | } |
1345 | | |
1346 | | // No change to value, since we have not yet found a Put/Delete |
1347 | | // Propagate corruption error |
1348 | 429 | if (!found_final_value && merge_in_progress) { |
1349 | 0 | if (s->ok()) { |
1350 | 0 | *s = Status::MergeInProgress(); |
1351 | 0 | } else { |
1352 | 0 | assert(s->IsMergeInProgress()); |
1353 | 0 | } |
1354 | 0 | } |
1355 | 429 | PERF_COUNTER_ADD(get_from_memtable_count, 1); |
1356 | 429 | return found_final_value; |
1357 | 584 | } |
1358 | | |
1359 | | void MemTable::GetFromTable(const LookupKey& key, |
1360 | | SequenceNumber max_covering_tombstone_seq, |
1361 | | bool do_merge, ReadCallback* callback, |
1362 | | bool* is_blob_index, std::string* value, |
1363 | | PinnableWideColumns* columns, |
1364 | | std::string* timestamp, Status* s, |
1365 | | MergeContext* merge_context, SequenceNumber* seq, |
1366 | 429 | bool* found_final_value, bool* merge_in_progress) { |
1367 | 429 | Saver saver; |
1368 | 429 | saver.status = s; |
1369 | 429 | saver.found_final_value = found_final_value; |
1370 | 429 | saver.merge_in_progress = merge_in_progress; |
1371 | 429 | saver.key = &key; |
1372 | 429 | saver.value = value; |
1373 | 429 | saver.columns = columns; |
1374 | 429 | saver.timestamp = timestamp; |
1375 | 429 | saver.seq = kMaxSequenceNumber; |
1376 | 429 | saver.mem = this; |
1377 | 429 | saver.merge_context = merge_context; |
1378 | 429 | saver.max_covering_tombstone_seq = max_covering_tombstone_seq; |
1379 | 429 | saver.merge_operator = moptions_.merge_operator; |
1380 | 429 | saver.logger = moptions_.info_log; |
1381 | 429 | saver.inplace_update_support = moptions_.inplace_update_support; |
1382 | 429 | saver.statistics = moptions_.statistics; |
1383 | 429 | saver.clock = clock_; |
1384 | 429 | saver.callback_ = callback; |
1385 | 429 | saver.is_blob_index = is_blob_index; |
1386 | 429 | saver.do_merge = do_merge; |
1387 | 429 | saver.allow_data_in_errors = moptions_.allow_data_in_errors; |
1388 | 429 | saver.protection_bytes_per_key = moptions_.protection_bytes_per_key; |
1389 | | |
1390 | 429 | if (!moptions_.paranoid_memory_checks) { |
1391 | 429 | table_->Get(key, &saver, SaveValue); |
1392 | 429 | } else { |
1393 | 0 | Status check_s = table_->GetAndValidate(key, &saver, SaveValue, |
1394 | 0 | moptions_.allow_data_in_errors); |
1395 | 0 | if (check_s.IsCorruption()) { |
1396 | 0 | *(saver.status) = check_s; |
1397 | | // Should stop searching the LSM. |
1398 | 0 | *(saver.found_final_value) = true; |
1399 | 0 | } |
1400 | 0 | } |
1401 | 429 | assert(s->ok() || s->IsMergeInProgress() || *found_final_value); |
1402 | 429 | *seq = saver.seq; |
1403 | 429 | } |
1404 | | |
1405 | | void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, |
1406 | 0 | ReadCallback* callback, bool immutable_memtable) { |
1407 | | // The sequence number is updated synchronously in version_set.h |
1408 | 0 | if (IsEmpty()) { |
1409 | | // Avoiding recording stats for speed. |
1410 | 0 | return; |
1411 | 0 | } |
1412 | 0 | PERF_TIMER_GUARD(get_from_memtable_time); |
1413 | | |
1414 | | // For now, memtable Bloom filter is effectively disabled if there are any |
1415 | | // range tombstones. This is the simplest way to ensure range tombstones are |
1416 | | // handled. TODO: allow Bloom checks where max_covering_tombstone_seq==0 |
1417 | 0 | bool no_range_del = read_options.ignore_range_deletions || |
1418 | 0 | is_range_del_table_empty_.load(std::memory_order_relaxed); |
1419 | 0 | MultiGetRange temp_range(*range, range->begin(), range->end()); |
1420 | 0 | if (bloom_filter_ && no_range_del) { |
1421 | 0 | bool whole_key = |
1422 | 0 | !prefix_extractor_ || moptions_.memtable_whole_key_filtering; |
1423 | 0 | std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys; |
1424 | 0 | std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match; |
1425 | 0 | std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes; |
1426 | 0 | int num_keys = 0; |
1427 | 0 | for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { |
1428 | 0 | if (whole_key) { |
1429 | 0 | bloom_keys[num_keys] = iter->ukey_without_ts; |
1430 | 0 | range_indexes[num_keys++] = iter.index(); |
1431 | 0 | } else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) { |
1432 | 0 | bloom_keys[num_keys] = |
1433 | 0 | prefix_extractor_->Transform(iter->ukey_without_ts); |
1434 | 0 | range_indexes[num_keys++] = iter.index(); |
1435 | 0 | } |
1436 | 0 | } |
1437 | 0 | bloom_filter_->MayContain(num_keys, bloom_keys.data(), may_match.data()); |
1438 | 0 | for (int i = 0; i < num_keys; ++i) { |
1439 | 0 | if (!may_match[i]) { |
1440 | 0 | temp_range.SkipIndex(range_indexes[i]); |
1441 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
1442 | 0 | } else { |
1443 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
1444 | 0 | } |
1445 | 0 | } |
1446 | 0 | } |
1447 | 0 | for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { |
1448 | 0 | bool found_final_value{false}; |
1449 | 0 | bool merge_in_progress = iter->s->IsMergeInProgress(); |
1450 | 0 | if (!no_range_del) { |
1451 | 0 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
1452 | 0 | NewRangeTombstoneIteratorInternal( |
1453 | 0 | read_options, GetInternalKeySeqno(iter->lkey->internal_key()), |
1454 | 0 | immutable_memtable)); |
1455 | 0 | SequenceNumber covering_seq = |
1456 | 0 | range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()); |
1457 | 0 | if (covering_seq > iter->max_covering_tombstone_seq) { |
1458 | 0 | iter->max_covering_tombstone_seq = covering_seq; |
1459 | 0 | if (iter->timestamp) { |
1460 | | // Will be overwritten in SaveValue() if there is a point key with |
1461 | | // a higher seqno. |
1462 | 0 | iter->timestamp->assign(range_del_iter->timestamp().data(), |
1463 | 0 | range_del_iter->timestamp().size()); |
1464 | 0 | } |
1465 | 0 | } |
1466 | 0 | } |
1467 | 0 | SequenceNumber dummy_seq; |
1468 | 0 | GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, |
1469 | 0 | callback, &iter->is_blob_index, |
1470 | 0 | iter->value ? iter->value->GetSelf() : nullptr, iter->columns, |
1471 | 0 | iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq, |
1472 | 0 | &found_final_value, &merge_in_progress); |
1473 | |
|
1474 | 0 | if (!found_final_value && merge_in_progress) { |
1475 | 0 | if (iter->s->ok()) { |
1476 | 0 | *(iter->s) = Status::MergeInProgress(); |
1477 | 0 | } else { |
1478 | 0 | assert(iter->s->IsMergeInProgress()); |
1479 | 0 | } |
1480 | 0 | } |
1481 | |
|
1482 | 0 | if (found_final_value || |
1483 | 0 | (!iter->s->ok() && !iter->s->IsMergeInProgress())) { |
1484 | | // `found_final_value` should be set if an error/corruption occurs. |
1485 | | // The check on iter->s is just there in case GetFromTable() did not |
1486 | | // set `found_final_value` properly. |
1487 | 0 | assert(found_final_value); |
1488 | 0 | if (iter->value) { |
1489 | 0 | iter->value->PinSelf(); |
1490 | 0 | range->AddValueSize(iter->value->size()); |
1491 | 0 | } else { |
1492 | 0 | assert(iter->columns); |
1493 | 0 | range->AddValueSize(iter->columns->serialized_size()); |
1494 | 0 | } |
1495 | |
|
1496 | 0 | range->MarkKeyDone(iter); |
1497 | 0 | RecordTick(moptions_.statistics, MEMTABLE_HIT); |
1498 | 0 | if (range->GetValueSize() > read_options.value_size_soft_limit) { |
1499 | | // Set all remaining keys in range to Abort |
1500 | 0 | for (auto range_iter = range->begin(); range_iter != range->end(); |
1501 | 0 | ++range_iter) { |
1502 | 0 | range->MarkKeyDone(range_iter); |
1503 | 0 | *(range_iter->s) = Status::Aborted(); |
1504 | 0 | } |
1505 | 0 | break; |
1506 | 0 | } |
1507 | 0 | } |
1508 | 0 | } |
1509 | 0 | PERF_COUNTER_ADD(get_from_memtable_count, 1); |
1510 | 0 | } |
1511 | | |
1512 | | Status MemTable::Update(SequenceNumber seq, ValueType value_type, |
1513 | | const Slice& key, const Slice& value, |
1514 | 0 | const ProtectionInfoKVOS64* kv_prot_info) { |
1515 | 0 | LookupKey lkey(key, seq); |
1516 | 0 | Slice mem_key = lkey.memtable_key(); |
1517 | |
|
1518 | 0 | std::unique_ptr<MemTableRep::Iterator> iter( |
1519 | 0 | table_->GetDynamicPrefixIterator()); |
1520 | 0 | iter->Seek(lkey.internal_key(), mem_key.data()); |
1521 | |
|
1522 | 0 | if (iter->Valid()) { |
1523 | | // Refer to comments under MemTable::Add() for entry format. |
1524 | | // Check that it belongs to same user key. We do not check the |
1525 | | // sequence number since the Seek() call above should have skipped |
1526 | | // all entries with overly large sequence numbers. |
1527 | 0 | const char* entry = iter->key(); |
1528 | 0 | uint32_t key_length = 0; |
1529 | 0 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
1530 | 0 | if (comparator_.comparator.user_comparator()->Equal( |
1531 | 0 | Slice(key_ptr, key_length - 8), lkey.user_key())) { |
1532 | | // Correct user key |
1533 | 0 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
1534 | 0 | ValueType type; |
1535 | 0 | SequenceNumber existing_seq; |
1536 | 0 | UnPackSequenceAndType(tag, &existing_seq, &type); |
1537 | 0 | assert(existing_seq != seq); |
1538 | 0 | if (type == value_type) { |
1539 | 0 | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); |
1540 | 0 | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); |
1541 | 0 | uint32_t new_size = static_cast<uint32_t>(value.size()); |
1542 | | |
1543 | | // Update value, if new value size <= previous value size |
1544 | 0 | if (new_size <= prev_size) { |
1545 | 0 | WriteLock wl(GetLock(lkey.user_key())); |
1546 | 0 | char* p = |
1547 | 0 | EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size); |
1548 | 0 | memcpy(p, value.data(), value.size()); |
1549 | 0 | assert((unsigned)((p + value.size()) - entry) == |
1550 | 0 | (unsigned)(VarintLength(key_length) + key_length + |
1551 | 0 | VarintLength(value.size()) + value.size())); |
1552 | 0 | RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); |
1553 | 0 | if (kv_prot_info != nullptr) { |
1554 | 0 | ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); |
1555 | | // `seq` is swallowed and `existing_seq` prevails. |
1556 | 0 | updated_kv_prot_info.UpdateS(seq, existing_seq); |
1557 | 0 | UpdateEntryChecksum(&updated_kv_prot_info, key, value, type, |
1558 | 0 | existing_seq, p + value.size()); |
1559 | 0 | Slice encoded(entry, p + value.size() - entry); |
1560 | 0 | return VerifyEncodedEntry(encoded, updated_kv_prot_info); |
1561 | 0 | } else { |
1562 | 0 | UpdateEntryChecksum(nullptr, key, value, type, existing_seq, |
1563 | 0 | p + value.size()); |
1564 | 0 | } |
1565 | 0 | return Status::OK(); |
1566 | 0 | } |
1567 | 0 | } |
1568 | 0 | } |
1569 | 0 | } |
1570 | | |
1571 | | // The latest value is not value_type or key doesn't exist |
1572 | 0 | return Add(seq, value_type, key, value, kv_prot_info); |
1573 | 0 | } |
1574 | | |
1575 | | Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, |
1576 | | const Slice& delta, |
1577 | 0 | const ProtectionInfoKVOS64* kv_prot_info) { |
1578 | 0 | LookupKey lkey(key, seq); |
1579 | 0 | Slice memkey = lkey.memtable_key(); |
1580 | |
|
1581 | 0 | std::unique_ptr<MemTableRep::Iterator> iter( |
1582 | 0 | table_->GetDynamicPrefixIterator()); |
1583 | 0 | iter->Seek(lkey.internal_key(), memkey.data()); |
1584 | |
|
1585 | 0 | if (iter->Valid()) { |
1586 | | // Refer to comments under MemTable::Add() for entry format. |
1587 | | // Check that it belongs to same user key. We do not check the |
1588 | | // sequence number since the Seek() call above should have skipped |
1589 | | // all entries with overly large sequence numbers. |
1590 | 0 | const char* entry = iter->key(); |
1591 | 0 | uint32_t key_length = 0; |
1592 | 0 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
1593 | 0 | if (comparator_.comparator.user_comparator()->Equal( |
1594 | 0 | Slice(key_ptr, key_length - 8), lkey.user_key())) { |
1595 | | // Correct user key |
1596 | 0 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
1597 | 0 | ValueType type; |
1598 | 0 | uint64_t existing_seq; |
1599 | 0 | UnPackSequenceAndType(tag, &existing_seq, &type); |
1600 | 0 | if (type == kTypeValue) { |
1601 | 0 | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); |
1602 | 0 | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); |
1603 | |
|
1604 | 0 | char* prev_buffer = const_cast<char*>(prev_value.data()); |
1605 | 0 | uint32_t new_prev_size = prev_size; |
1606 | |
|
1607 | 0 | std::string str_value; |
1608 | 0 | WriteLock wl(GetLock(lkey.user_key())); |
1609 | 0 | auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, |
1610 | 0 | delta, &str_value); |
1611 | 0 | if (status == UpdateStatus::UPDATED_INPLACE) { |
1612 | | // Value already updated by callback. |
1613 | 0 | assert(new_prev_size <= prev_size); |
1614 | 0 | if (new_prev_size < prev_size) { |
1615 | | // overwrite the new prev_size |
1616 | 0 | char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, |
1617 | 0 | new_prev_size); |
1618 | 0 | if (VarintLength(new_prev_size) < VarintLength(prev_size)) { |
1619 | | // shift the value buffer as well. |
1620 | 0 | memcpy(p, prev_buffer, new_prev_size); |
1621 | 0 | prev_buffer = p; |
1622 | 0 | } |
1623 | 0 | } |
1624 | 0 | RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); |
1625 | 0 | UpdateFlushState(); |
1626 | 0 | Slice new_value(prev_buffer, new_prev_size); |
1627 | 0 | if (kv_prot_info != nullptr) { |
1628 | 0 | ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); |
1629 | | // `seq` is swallowed and `existing_seq` prevails. |
1630 | 0 | updated_kv_prot_info.UpdateS(seq, existing_seq); |
1631 | 0 | updated_kv_prot_info.UpdateV(delta, new_value); |
1632 | 0 | Slice encoded(entry, prev_buffer + new_prev_size - entry); |
1633 | 0 | UpdateEntryChecksum(&updated_kv_prot_info, key, new_value, type, |
1634 | 0 | existing_seq, prev_buffer + new_prev_size); |
1635 | 0 | return VerifyEncodedEntry(encoded, updated_kv_prot_info); |
1636 | 0 | } else { |
1637 | 0 | UpdateEntryChecksum(nullptr, key, new_value, type, existing_seq, |
1638 | 0 | prev_buffer + new_prev_size); |
1639 | 0 | } |
1640 | 0 | return Status::OK(); |
1641 | 0 | } else if (status == UpdateStatus::UPDATED) { |
1642 | 0 | Status s; |
1643 | 0 | if (kv_prot_info != nullptr) { |
1644 | 0 | ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); |
1645 | 0 | updated_kv_prot_info.UpdateV(delta, str_value); |
1646 | 0 | s = Add(seq, kTypeValue, key, Slice(str_value), |
1647 | 0 | &updated_kv_prot_info); |
1648 | 0 | } else { |
1649 | 0 | s = Add(seq, kTypeValue, key, Slice(str_value), |
1650 | 0 | nullptr /* kv_prot_info */); |
1651 | 0 | } |
1652 | 0 | RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); |
1653 | 0 | UpdateFlushState(); |
1654 | 0 | return s; |
1655 | 0 | } else if (status == UpdateStatus::UPDATE_FAILED) { |
1656 | | // `UPDATE_FAILED` is named incorrectly. It indicates no update |
1657 | | // happened. It does not indicate a failure happened. |
1658 | 0 | UpdateFlushState(); |
1659 | 0 | return Status::OK(); |
1660 | 0 | } |
1661 | 0 | } |
1662 | 0 | } |
1663 | 0 | } |
1664 | | // The latest value is not `kTypeValue` or key doesn't exist |
1665 | 0 | return Status::NotFound(); |
1666 | 0 | } |
1667 | | |
1668 | | size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key, |
1669 | 0 | size_t limit) { |
1670 | 0 | Slice memkey = key.memtable_key(); |
1671 | | |
1672 | | // A total ordered iterator is costly for some memtablerep (prefix aware |
1673 | | // reps). By passing in the user key, we allow efficient iterator creation. |
1674 | | // The iterator only needs to be ordered within the same user key. |
1675 | 0 | std::unique_ptr<MemTableRep::Iterator> iter( |
1676 | 0 | table_->GetDynamicPrefixIterator()); |
1677 | 0 | iter->Seek(key.internal_key(), memkey.data()); |
1678 | |
|
1679 | 0 | size_t num_successive_merges = 0; |
1680 | |
|
1681 | 0 | for (; iter->Valid() && num_successive_merges < limit; iter->Next()) { |
1682 | 0 | const char* entry = iter->key(); |
1683 | 0 | uint32_t key_length = 0; |
1684 | 0 | const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
1685 | 0 | if (!comparator_.comparator.user_comparator()->Equal( |
1686 | 0 | Slice(iter_key_ptr, key_length - 8), key.user_key())) { |
1687 | 0 | break; |
1688 | 0 | } |
1689 | | |
1690 | 0 | const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8); |
1691 | 0 | ValueType type; |
1692 | 0 | uint64_t unused; |
1693 | 0 | UnPackSequenceAndType(tag, &unused, &type); |
1694 | 0 | if (type != kTypeMerge) { |
1695 | 0 | break; |
1696 | 0 | } |
1697 | | |
1698 | 0 | ++num_successive_merges; |
1699 | 0 | } |
1700 | |
|
1701 | 0 | return num_successive_merges; |
1702 | 0 | } |
1703 | | |
1704 | | void MemTableRep::Get(const LookupKey& k, void* callback_args, |
1705 | 0 | bool (*callback_func)(void* arg, const char* entry)) { |
1706 | 0 | auto iter = GetDynamicPrefixIterator(); |
1707 | 0 | for (iter->Seek(k.internal_key(), k.memtable_key().data()); |
1708 | 0 | iter->Valid() && callback_func(callback_args, iter->key()); |
1709 | 0 | iter->Next()) { |
1710 | 0 | } |
1711 | 0 | } |
1712 | | |
1713 | 0 | void MemTable::RefLogContainingPrepSection(uint64_t log) { |
1714 | 0 | assert(log > 0); |
1715 | 0 | auto cur = min_prep_log_referenced_.load(); |
1716 | 0 | while ((log < cur || cur == 0) && |
1717 | 0 | !min_prep_log_referenced_.compare_exchange_strong(cur, log)) { |
1718 | 0 | cur = min_prep_log_referenced_.load(); |
1719 | 0 | } |
1720 | 0 | } |
1721 | | |
1722 | 0 | uint64_t MemTable::GetMinLogContainingPrepSection() { |
1723 | 0 | return min_prep_log_referenced_.load(); |
1724 | 0 | } |
1725 | | |
1726 | 2.18M | void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) { |
1727 | 2.18M | if (ts_sz_ == 0 || persist_user_defined_timestamps_) { |
1728 | 2.18M | return; |
1729 | 2.18M | } |
1730 | 0 | const Comparator* ucmp = GetInternalKeyComparator().user_comparator(); |
1731 | 0 | Slice udt = ExtractTimestampFromUserKey(user_key, ts_sz_); |
1732 | 0 | if (newest_udt_.empty() || ucmp->CompareTimestamp(udt, newest_udt_) > 0) { |
1733 | 0 | newest_udt_ = udt; |
1734 | 0 | } |
1735 | 0 | } |
1736 | | |
1737 | 0 | const Slice& MemTable::GetNewestUDT() const { |
1738 | | // This path should not be invoked for MemTables that does not enable the UDT |
1739 | | // in Memtable only feature. |
1740 | 0 | assert(ts_sz_ > 0 && !persist_user_defined_timestamps_); |
1741 | 0 | return newest_udt_; |
1742 | 0 | } |
1743 | | |
1744 | | } // namespace ROCKSDB_NAMESPACE |