/src/rocksdb/db/memtable.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/memtable.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <array> |
14 | | #include <limits> |
15 | | #include <memory> |
16 | | #include <optional> |
17 | | |
18 | | #include "db/dbformat.h" |
19 | | #include "db/kv_checksum.h" |
20 | | #include "db/merge_context.h" |
21 | | #include "db/merge_helper.h" |
22 | | #include "db/pinned_iterators_manager.h" |
23 | | #include "db/range_tombstone_fragmenter.h" |
24 | | #include "db/read_callback.h" |
25 | | #include "db/wide/wide_column_serialization.h" |
26 | | #include "logging/logging.h" |
27 | | #include "memory/arena.h" |
28 | | #include "memory/memory_usage.h" |
29 | | #include "monitoring/perf_context_imp.h" |
30 | | #include "monitoring/statistics_impl.h" |
31 | | #include "port/lang.h" |
32 | | #include "port/port.h" |
33 | | #include "rocksdb/comparator.h" |
34 | | #include "rocksdb/env.h" |
35 | | #include "rocksdb/iterator.h" |
36 | | #include "rocksdb/merge_operator.h" |
37 | | #include "rocksdb/slice_transform.h" |
38 | | #include "rocksdb/types.h" |
39 | | #include "rocksdb/write_buffer_manager.h" |
40 | | #include "table/internal_iterator.h" |
41 | | #include "table/iterator_wrapper.h" |
42 | | #include "table/merging_iterator.h" |
43 | | #include "util/autovector.h" |
44 | | #include "util/coding.h" |
45 | | #include "util/mutexlock.h" |
46 | | |
47 | | namespace ROCKSDB_NAMESPACE { |
48 | | |
49 | | ImmutableMemTableOptions::ImmutableMemTableOptions( |
50 | | const ImmutableOptions& ioptions, |
51 | | const MutableCFOptions& mutable_cf_options) |
52 | 141k | : arena_block_size(mutable_cf_options.arena_block_size), |
53 | | memtable_prefix_bloom_bits( |
54 | 141k | static_cast<uint32_t>( |
55 | 141k | static_cast<double>(mutable_cf_options.write_buffer_size) * |
56 | 141k | mutable_cf_options.memtable_prefix_bloom_size_ratio) * |
57 | 141k | 8u), |
58 | 141k | memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size), |
59 | | memtable_whole_key_filtering( |
60 | 141k | mutable_cf_options.memtable_whole_key_filtering), |
61 | 141k | inplace_update_support(ioptions.inplace_update_support), |
62 | 141k | inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks), |
63 | 141k | inplace_callback(ioptions.inplace_callback), |
64 | 141k | max_successive_merges(mutable_cf_options.max_successive_merges), |
65 | | strict_max_successive_merges( |
66 | 141k | mutable_cf_options.strict_max_successive_merges), |
67 | 141k | statistics(ioptions.stats), |
68 | 141k | merge_operator(ioptions.merge_operator.get()), |
69 | 141k | info_log(ioptions.logger), |
70 | | protection_bytes_per_key( |
71 | 141k | mutable_cf_options.memtable_protection_bytes_per_key), |
72 | 141k | allow_data_in_errors(ioptions.allow_data_in_errors), |
73 | 141k | paranoid_memory_checks(mutable_cf_options.paranoid_memory_checks) {} |
74 | | |
75 | | MemTable::MemTable(const InternalKeyComparator& cmp, |
76 | | const ImmutableOptions& ioptions, |
77 | | const MutableCFOptions& mutable_cf_options, |
78 | | WriteBufferManager* write_buffer_manager, |
79 | | SequenceNumber latest_seq, uint32_t column_family_id) |
80 | 141k | : comparator_(cmp), |
81 | 141k | moptions_(ioptions, mutable_cf_options), |
82 | 141k | kArenaBlockSize(Arena::OptimizeBlockSize(moptions_.arena_block_size)), |
83 | 141k | mem_tracker_(write_buffer_manager), |
84 | 141k | arena_(moptions_.arena_block_size, |
85 | 141k | (write_buffer_manager != nullptr && |
86 | 141k | (write_buffer_manager->enabled() || |
87 | 141k | write_buffer_manager->cost_to_cache())) |
88 | 141k | ? &mem_tracker_ |
89 | 141k | : nullptr, |
90 | 141k | mutable_cf_options.memtable_huge_page_size), |
91 | 141k | table_(ioptions.memtable_factory->CreateMemTableRep( |
92 | 141k | comparator_, &arena_, mutable_cf_options.prefix_extractor.get(), |
93 | 141k | ioptions.logger, column_family_id)), |
94 | 141k | range_del_table_(SkipListFactory().CreateMemTableRep( |
95 | 141k | comparator_, &arena_, nullptr /* transform */, ioptions.logger, |
96 | 141k | column_family_id)), |
97 | 141k | is_range_del_table_empty_(true), |
98 | 141k | data_size_(0), |
99 | 141k | num_entries_(0), |
100 | 141k | num_deletes_(0), |
101 | 141k | num_range_deletes_(0), |
102 | 141k | write_buffer_size_(mutable_cf_options.write_buffer_size), |
103 | 141k | first_seqno_(0), |
104 | 141k | earliest_seqno_(latest_seq), |
105 | 141k | creation_seq_(latest_seq), |
106 | 141k | min_prep_log_referenced_(0), |
107 | 141k | locks_(moptions_.inplace_update_support |
108 | 141k | ? moptions_.inplace_update_num_locks |
109 | 141k | : 0), |
110 | 141k | prefix_extractor_(mutable_cf_options.prefix_extractor.get()), |
111 | 141k | flush_state_(FLUSH_NOT_REQUESTED), |
112 | 141k | clock_(ioptions.clock), |
113 | | insert_with_hint_prefix_extractor_( |
114 | 141k | ioptions.memtable_insert_with_hint_prefix_extractor.get()), |
115 | 141k | oldest_key_time_(std::numeric_limits<uint64_t>::max()), |
116 | 141k | approximate_memory_usage_(0), |
117 | | memtable_max_range_deletions_( |
118 | 141k | mutable_cf_options.memtable_max_range_deletions) { |
119 | 141k | UpdateFlushState(); |
120 | | // something went wrong if we need to flush before inserting anything |
121 | 141k | assert(!ShouldScheduleFlush()); |
122 | | |
123 | | // use bloom_filter_ for both whole key and prefix bloom filter |
124 | 141k | if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) && |
125 | 141k | moptions_.memtable_prefix_bloom_bits > 0) { |
126 | 0 | bloom_filter_.reset( |
127 | 0 | new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits, |
128 | 0 | 6 /* hard coded 6 probes */, |
129 | 0 | moptions_.memtable_huge_page_size, ioptions.logger)); |
130 | 0 | } |
131 | | // Initialize cached_range_tombstone_ here since it could |
132 | | // be read before it is constructed in MemTable::Add(), which could also lead |
133 | | // to a data race on the global mutex table backing atomic shared_ptr. |
134 | 141k | auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>(); |
135 | 141k | size_t size = cached_range_tombstone_.Size(); |
136 | 4.65M | for (size_t i = 0; i < size; ++i) { |
137 | 4.51M | std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr = |
138 | 4.51M | cached_range_tombstone_.AccessAtCore(i); |
139 | 4.51M | auto new_local_cache_ref = std::make_shared< |
140 | 4.51M | const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache); |
141 | 4.51M | std::atomic_store_explicit( |
142 | 4.51M | local_cache_ref_ptr, |
143 | 4.51M | std::shared_ptr<FragmentedRangeTombstoneListCache>(new_local_cache_ref, |
144 | 4.51M | new_cache.get()), |
145 | 4.51M | std::memory_order_relaxed); |
146 | 4.51M | } |
147 | 141k | const Comparator* ucmp = cmp.user_comparator(); |
148 | 141k | assert(ucmp); |
149 | 141k | ts_sz_ = ucmp->timestamp_size(); |
150 | 141k | } |
151 | | |
152 | 141k | MemTable::~MemTable() { |
153 | 141k | mem_tracker_.FreeMem(); |
154 | 141k | assert(refs_ == 0); |
155 | 141k | } |
156 | | |
157 | 8.95k | size_t MemTable::ApproximateMemoryUsage() { |
158 | 8.95k | autovector<size_t> usages = { |
159 | 8.95k | arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(), |
160 | 8.95k | range_del_table_->ApproximateMemoryUsage(), |
161 | 8.95k | ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)}; |
162 | 8.95k | size_t total_usage = 0; |
163 | 35.8k | for (size_t usage : usages) { |
164 | | // If usage + total_usage >= kMaxSizet, return kMaxSizet. |
165 | | // the following variation is to avoid numeric overflow. |
166 | 35.8k | if (usage >= std::numeric_limits<size_t>::max() - total_usage) { |
167 | 0 | return std::numeric_limits<size_t>::max(); |
168 | 0 | } |
169 | 35.8k | total_usage += usage; |
170 | 35.8k | } |
171 | 8.95k | approximate_memory_usage_.store(total_usage, std::memory_order_relaxed); |
172 | | // otherwise, return the actual usage |
173 | 8.95k | return total_usage; |
174 | 8.95k | } |
175 | | |
176 | 2.03M | bool MemTable::ShouldFlushNow() { |
177 | 2.03M | if (IsMarkedForFlush()) { |
178 | | // TODO: dedicated flush reason when marked for flush |
179 | 0 | return true; |
180 | 0 | } |
181 | | |
182 | | // This is set if memtable_max_range_deletions is > 0, |
183 | | // and that many range deletions are done |
184 | 2.03M | if (memtable_max_range_deletions_ > 0 && |
185 | 2.03M | num_range_deletes_.load(std::memory_order_relaxed) >= |
186 | 0 | static_cast<uint64_t>(memtable_max_range_deletions_)) { |
187 | 0 | return true; |
188 | 0 | } |
189 | | |
190 | 2.03M | size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed); |
191 | | // In a lot of times, we cannot allocate arena blocks that exactly matches the |
192 | | // buffer size. Thus we have to decide if we should over-allocate or |
193 | | // under-allocate. |
194 | | // This constant variable can be interpreted as: if we still have more than |
195 | | // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over |
196 | | // allocate one more block. |
197 | 2.03M | const double kAllowOverAllocationRatio = 0.6; |
198 | | |
199 | | // range deletion use skip list which allocates all memeory through `arena_` |
200 | 2.03M | assert(range_del_table_->ApproximateMemoryUsage() == 0); |
201 | | // If arena still have room for new block allocation, we can safely say it |
202 | | // shouldn't flush. |
203 | 2.03M | auto allocated_memory = table_->ApproximateMemoryUsage() + |
204 | 2.03M | arena_.MemoryAllocatedBytes(); |
205 | | |
206 | 2.03M | approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed); |
207 | | |
208 | | // if we can still allocate one more block without exceeding the |
209 | | // over-allocation ratio, then we should not flush. |
210 | 2.03M | if (allocated_memory + kArenaBlockSize < |
211 | 2.03M | write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { |
212 | 2.03M | return false; |
213 | 2.03M | } |
214 | | |
215 | | // if user keeps adding entries that exceeds write_buffer_size, we need to |
216 | | // flush earlier even though we still have much available memory left. |
217 | 0 | if (allocated_memory > |
218 | 0 | write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) { |
219 | 0 | return true; |
220 | 0 | } |
221 | | |
222 | | // In this code path, Arena has already allocated its "last block", which |
223 | | // means the total allocatedmemory size is either: |
224 | | // (1) "moderately" over allocated the memory (no more than `0.6 * arena |
225 | | // block size`. Or, |
226 | | // (2) the allocated memory is less than write buffer size, but we'll stop |
227 | | // here since if we allocate a new arena block, we'll over allocate too much |
228 | | // more (half of the arena block size) memory. |
229 | | // |
230 | | // In either case, to avoid over-allocate, the last block will stop allocation |
231 | | // when its usage reaches a certain ratio, which we carefully choose "0.75 |
232 | | // full" as the stop condition because it addresses the following issue with |
233 | | // great simplicity: What if the next inserted entry's size is |
234 | | // bigger than AllocatedAndUnused()? |
235 | | // |
236 | | // The answer is: if the entry size is also bigger than 0.25 * |
237 | | // kArenaBlockSize, a dedicated block will be allocated for it; otherwise |
238 | | // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty |
239 | | // and regular block. In either case, we *overly* over-allocated. |
240 | | // |
241 | | // Therefore, setting the last block to be at most "0.75 full" avoids both |
242 | | // cases. |
243 | | // |
244 | | // NOTE: the average percentage of waste space of this approach can be counted |
245 | | // as: "arena block size * 0.25 / write buffer size". User who specify a small |
246 | | // write buffer size and/or big arena block size may suffer. |
247 | 0 | return arena_.AllocatedAndUnused() < kArenaBlockSize / 4; |
248 | 0 | } |
249 | | |
250 | 2.03M | void MemTable::UpdateFlushState() { |
251 | 2.03M | auto state = flush_state_.load(std::memory_order_relaxed); |
252 | 2.03M | if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) { |
253 | | // ignore CAS failure, because that means somebody else requested |
254 | | // a flush |
255 | 0 | flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED, |
256 | 0 | std::memory_order_relaxed, |
257 | 0 | std::memory_order_relaxed); |
258 | 0 | } |
259 | 2.03M | } |
260 | | |
261 | 1.89M | void MemTable::UpdateOldestKeyTime() { |
262 | 1.89M | uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed); |
263 | 1.89M | if (oldest_key_time == std::numeric_limits<uint64_t>::max()) { |
264 | 65.2k | int64_t current_time = 0; |
265 | 65.2k | auto s = clock_->GetCurrentTime(¤t_time); |
266 | 65.2k | if (s.ok()) { |
267 | 65.2k | assert(current_time >= 0); |
268 | | // If fail, the timestamp is already set. |
269 | 65.2k | oldest_key_time_.compare_exchange_strong( |
270 | 65.2k | oldest_key_time, static_cast<uint64_t>(current_time), |
271 | 65.2k | std::memory_order_relaxed, std::memory_order_relaxed); |
272 | 65.2k | } |
273 | 65.2k | } |
274 | 1.89M | } |
275 | | |
276 | | Status MemTable::VerifyEntryChecksum(const char* entry, |
277 | | uint32_t protection_bytes_per_key, |
278 | 0 | bool allow_data_in_errors) { |
279 | 0 | if (protection_bytes_per_key == 0) { |
280 | 0 | return Status::OK(); |
281 | 0 | } |
282 | 0 | uint32_t key_length; |
283 | 0 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
284 | 0 | if (key_ptr == nullptr) { |
285 | 0 | return Status::Corruption("Unable to parse internal key length"); |
286 | 0 | } |
287 | 0 | if (key_length < 8) { |
288 | 0 | return Status::Corruption("Memtable entry internal key length too short."); |
289 | 0 | } |
290 | 0 | Slice user_key = Slice(key_ptr, key_length - 8); |
291 | |
|
292 | 0 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
293 | 0 | ValueType type; |
294 | 0 | SequenceNumber seq; |
295 | 0 | UnPackSequenceAndType(tag, &seq, &type); |
296 | |
|
297 | 0 | uint32_t value_length = 0; |
298 | 0 | const char* value_ptr = GetVarint32Ptr( |
299 | 0 | key_ptr + key_length, key_ptr + key_length + 5, &value_length); |
300 | 0 | if (value_ptr == nullptr) { |
301 | 0 | return Status::Corruption("Unable to parse internal key value"); |
302 | 0 | } |
303 | 0 | Slice value = Slice(value_ptr, value_length); |
304 | |
|
305 | 0 | const char* checksum_ptr = value_ptr + value_length; |
306 | 0 | bool match = |
307 | 0 | ProtectionInfo64() |
308 | 0 | .ProtectKVO(user_key, value, type) |
309 | 0 | .ProtectS(seq) |
310 | 0 | .Verify(static_cast<uint8_t>(protection_bytes_per_key), checksum_ptr); |
311 | 0 | if (!match) { |
312 | 0 | std::string msg( |
313 | 0 | "Corrupted memtable entry, per key-value checksum verification " |
314 | 0 | "failed."); |
315 | 0 | if (allow_data_in_errors) { |
316 | 0 | msg.append("Unrecognized value type: " + |
317 | 0 | std::to_string(static_cast<int>(type)) + ". "); |
318 | 0 | msg.append("User key: " + user_key.ToString(/*hex=*/true) + ". "); |
319 | 0 | msg.append("seq: " + std::to_string(seq) + "."); |
320 | 0 | } |
321 | 0 | return Status::Corruption(msg.c_str()); |
322 | 0 | } |
323 | 0 | return Status::OK(); |
324 | 0 | } |
325 | | |
326 | | int MemTable::KeyComparator::operator()(const char* prefix_len_key1, |
327 | 0 | const char* prefix_len_key2) const { |
328 | | // Internal keys are encoded as length-prefixed strings. |
329 | 0 | Slice k1 = GetLengthPrefixedSlice(prefix_len_key1); |
330 | 0 | Slice k2 = GetLengthPrefixedSlice(prefix_len_key2); |
331 | 0 | return comparator.CompareKeySeq(k1, k2); |
332 | 0 | } |
333 | | |
334 | | int MemTable::KeyComparator::operator()( |
335 | 18.1M | const char* prefix_len_key, const KeyComparator::DecodedType& key) const { |
336 | | // Internal keys are encoded as length-prefixed strings. |
337 | 18.1M | Slice a = GetLengthPrefixedSlice(prefix_len_key); |
338 | 18.1M | return comparator.CompareKeySeq(a, key); |
339 | 18.1M | } |
340 | | |
341 | 0 | void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) { |
342 | 0 | throw std::runtime_error("concurrent insert not supported"); |
343 | 0 | } |
344 | | |
345 | 0 | Slice MemTableRep::UserKey(const char* key) const { |
346 | 0 | Slice slice = GetLengthPrefixedSlice(key); |
347 | 0 | return Slice(slice.data(), slice.size() - 8); |
348 | 0 | } |
349 | | |
350 | 0 | KeyHandle MemTableRep::Allocate(const size_t len, char** buf) { |
351 | 0 | *buf = allocator_->Allocate(len); |
352 | 0 | return static_cast<KeyHandle>(*buf); |
353 | 0 | } |
354 | | |
355 | | // Encode a suitable internal key target for "target" and return it. |
356 | | // Uses *scratch as scratch space, and the returned pointer will point |
357 | | // into this scratch space. |
358 | 10.1k | const char* EncodeKey(std::string* scratch, const Slice& target) { |
359 | 10.1k | scratch->clear(); |
360 | 10.1k | PutVarint32(scratch, static_cast<uint32_t>(target.size())); |
361 | 10.1k | scratch->append(target.data(), target.size()); |
362 | 10.1k | return scratch->data(); |
363 | 10.1k | } |
364 | | |
365 | | class MemTableIterator : public InternalIterator { |
366 | | public: |
367 | | enum Kind { kPointEntries, kRangeDelEntries }; |
368 | | MemTableIterator( |
369 | | Kind kind, const MemTable& mem, const ReadOptions& read_options, |
370 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping = nullptr, |
371 | | Arena* arena = nullptr, |
372 | | const SliceTransform* cf_prefix_extractor = nullptr) |
373 | 48.7k | : bloom_(nullptr), |
374 | 48.7k | prefix_extractor_(mem.prefix_extractor_), |
375 | 48.7k | comparator_(mem.comparator_), |
376 | 48.7k | seqno_to_time_mapping_(seqno_to_time_mapping), |
377 | 48.7k | status_(Status::OK()), |
378 | 48.7k | logger_(mem.moptions_.info_log), |
379 | 48.7k | ts_sz_(mem.ts_sz_), |
380 | 48.7k | protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key), |
381 | 48.7k | valid_(false), |
382 | | value_pinned_( |
383 | 48.7k | !mem.GetImmutableMemTableOptions()->inplace_update_support), |
384 | 48.7k | arena_mode_(arena != nullptr), |
385 | 48.7k | paranoid_memory_checks_(mem.moptions_.paranoid_memory_checks), |
386 | 48.7k | allow_data_in_error(mem.moptions_.allow_data_in_errors) { |
387 | 48.7k | if (kind == kRangeDelEntries) { |
388 | 3.72k | iter_ = mem.range_del_table_->GetIterator(arena); |
389 | 45.0k | } else if (prefix_extractor_ != nullptr && |
390 | | // NOTE: checking extractor equivalence when not pointer |
391 | | // equivalent is arguably too expensive for memtable |
392 | 45.0k | prefix_extractor_ == cf_prefix_extractor && |
393 | 45.0k | (read_options.prefix_same_as_start || |
394 | 0 | (!read_options.total_order_seek && |
395 | 0 | !read_options.auto_prefix_mode))) { |
396 | | // Auto prefix mode is not implemented in memtable yet. |
397 | 0 | assert(kind == kPointEntries); |
398 | 0 | bloom_ = mem.bloom_filter_.get(); |
399 | 0 | iter_ = mem.table_->GetDynamicPrefixIterator(arena); |
400 | 45.0k | } else { |
401 | 45.0k | assert(kind == kPointEntries); |
402 | 45.0k | iter_ = mem.table_->GetIterator(arena); |
403 | 45.0k | } |
404 | 48.7k | status_.PermitUncheckedError(); |
405 | 48.7k | } |
406 | | // No copying allowed |
407 | | MemTableIterator(const MemTableIterator&) = delete; |
408 | | void operator=(const MemTableIterator&) = delete; |
409 | | |
410 | 48.7k | ~MemTableIterator() override { |
411 | | #ifndef NDEBUG |
412 | | // Assert that the MemTableIterator is never deleted while |
413 | | // Pinning is Enabled. |
414 | | assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled()); |
415 | | #endif |
416 | 48.7k | if (arena_mode_) { |
417 | 45.0k | iter_->~Iterator(); |
418 | 45.0k | } else { |
419 | 3.72k | delete iter_; |
420 | 3.72k | } |
421 | 48.7k | status_.PermitUncheckedError(); |
422 | 48.7k | } |
423 | | |
424 | | #ifndef NDEBUG |
425 | | void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { |
426 | | pinned_iters_mgr_ = pinned_iters_mgr; |
427 | | } |
428 | | PinnedIteratorsManager* pinned_iters_mgr_ = nullptr; |
429 | | #endif |
430 | | |
431 | 1.54M | bool Valid() const override { |
432 | | // If inner iter_ is not valid, then this iter should also not be valid. |
433 | 1.54M | assert(iter_->Valid() || !(valid_ && status_.ok())); |
434 | 1.54M | return valid_ && status_.ok(); |
435 | 1.54M | } |
436 | | |
437 | 5.46k | void Seek(const Slice& k) override { |
438 | 5.46k | PERF_TIMER_GUARD(seek_on_memtable_time); |
439 | 5.46k | PERF_COUNTER_ADD(seek_on_memtable_count, 1); |
440 | 5.46k | status_ = Status::OK(); |
441 | 5.46k | if (bloom_) { |
442 | | // iterator should only use prefix bloom filter |
443 | 0 | Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_)); |
444 | 0 | if (prefix_extractor_->InDomain(user_k_without_ts)) { |
445 | 0 | Slice prefix = prefix_extractor_->Transform(user_k_without_ts); |
446 | 0 | if (!bloom_->MayContain(prefix)) { |
447 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
448 | 0 | valid_ = false; |
449 | 0 | return; |
450 | 0 | } else { |
451 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
452 | 0 | } |
453 | 0 | } |
454 | 0 | } |
455 | 5.46k | if (paranoid_memory_checks_) { |
456 | 0 | status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error); |
457 | 5.46k | } else { |
458 | 5.46k | iter_->Seek(k, nullptr); |
459 | 5.46k | } |
460 | 5.46k | valid_ = iter_->Valid(); |
461 | 5.46k | VerifyEntryChecksum(); |
462 | 5.46k | } |
463 | 4.66k | void SeekForPrev(const Slice& k) override { |
464 | 4.66k | PERF_TIMER_GUARD(seek_on_memtable_time); |
465 | 4.66k | PERF_COUNTER_ADD(seek_on_memtable_count, 1); |
466 | 4.66k | status_ = Status::OK(); |
467 | 4.66k | if (bloom_) { |
468 | 0 | Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_)); |
469 | 0 | if (prefix_extractor_->InDomain(user_k_without_ts)) { |
470 | 0 | if (!bloom_->MayContain( |
471 | 0 | prefix_extractor_->Transform(user_k_without_ts))) { |
472 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
473 | 0 | valid_ = false; |
474 | 0 | return; |
475 | 0 | } else { |
476 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
477 | 0 | } |
478 | 0 | } |
479 | 0 | } |
480 | 4.66k | if (paranoid_memory_checks_) { |
481 | 0 | status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error); |
482 | 4.66k | } else { |
483 | 4.66k | iter_->Seek(k, nullptr); |
484 | 4.66k | } |
485 | 4.66k | valid_ = iter_->Valid(); |
486 | 4.66k | VerifyEntryChecksum(); |
487 | 4.66k | if (!Valid() && status().ok()) { |
488 | 4.14k | SeekToLast(); |
489 | 4.14k | } |
490 | 5.18k | while (Valid() && comparator_.comparator.Compare(k, key()) < 0) { |
491 | 523 | Prev(); |
492 | 523 | } |
493 | 4.66k | } |
494 | 39.1k | void SeekToFirst() override { |
495 | 39.1k | status_ = Status::OK(); |
496 | 39.1k | iter_->SeekToFirst(); |
497 | 39.1k | valid_ = iter_->Valid(); |
498 | 39.1k | VerifyEntryChecksum(); |
499 | 39.1k | } |
500 | 4.14k | void SeekToLast() override { |
501 | 4.14k | status_ = Status::OK(); |
502 | 4.14k | iter_->SeekToLast(); |
503 | 4.14k | valid_ = iter_->Valid(); |
504 | 4.14k | VerifyEntryChecksum(); |
505 | 4.14k | } |
506 | 1.35M | void Next() override { |
507 | 1.35M | PERF_COUNTER_ADD(next_on_memtable_count, 1); |
508 | 1.35M | assert(Valid()); |
509 | 1.35M | if (paranoid_memory_checks_) { |
510 | 0 | status_ = iter_->NextAndValidate(allow_data_in_error); |
511 | 1.35M | } else { |
512 | 1.35M | iter_->Next(); |
513 | 1.35M | TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_); |
514 | 1.35M | } |
515 | 1.35M | valid_ = iter_->Valid(); |
516 | 1.35M | VerifyEntryChecksum(); |
517 | 1.35M | } |
518 | 6.17k | bool NextAndGetResult(IterateResult* result) override { |
519 | 6.17k | Next(); |
520 | 6.17k | bool is_valid = Valid(); |
521 | 6.17k | if (is_valid) { |
522 | 4.62k | result->key = key(); |
523 | 4.62k | result->bound_check_result = IterBoundCheck::kUnknown; |
524 | 4.62k | result->value_prepared = true; |
525 | 4.62k | } |
526 | 6.17k | return is_valid; |
527 | 6.17k | } |
528 | 7.28k | void Prev() override { |
529 | 7.28k | PERF_COUNTER_ADD(prev_on_memtable_count, 1); |
530 | 7.28k | assert(Valid()); |
531 | 7.28k | if (paranoid_memory_checks_) { |
532 | 0 | status_ = iter_->PrevAndValidate(allow_data_in_error); |
533 | 7.28k | } else { |
534 | 7.28k | iter_->Prev(); |
535 | 7.28k | } |
536 | 7.28k | valid_ = iter_->Valid(); |
537 | 7.28k | VerifyEntryChecksum(); |
538 | 7.28k | } |
539 | 2.20M | Slice key() const override { |
540 | 2.20M | assert(Valid()); |
541 | 2.20M | return GetLengthPrefixedSlice(iter_->key()); |
542 | 2.20M | } |
543 | | |
544 | 5.57k | uint64_t write_unix_time() const override { |
545 | 5.57k | assert(Valid()); |
546 | 5.57k | ParsedInternalKey pikey; |
547 | 5.57k | Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false); |
548 | 5.57k | if (!s.ok()) { |
549 | 0 | return std::numeric_limits<uint64_t>::max(); |
550 | 5.57k | } else if (kTypeValuePreferredSeqno == pikey.type) { |
551 | 0 | return ParsePackedValueForWriteTime(value()); |
552 | 5.57k | } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) { |
553 | 5.57k | return std::numeric_limits<uint64_t>::max(); |
554 | 5.57k | } |
555 | 0 | return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence); |
556 | 5.57k | } |
557 | | |
558 | 1.35M | Slice value() const override { |
559 | 1.35M | assert(Valid()); |
560 | 1.35M | Slice key_slice = GetLengthPrefixedSlice(iter_->key()); |
561 | 1.35M | return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); |
562 | 1.35M | } |
563 | | |
564 | 76.0k | Status status() const override { return status_; } |
565 | | |
566 | 830k | bool IsKeyPinned() const override { |
567 | | // memtable data is always pinned |
568 | 830k | return true; |
569 | 830k | } |
570 | | |
571 | 418k | bool IsValuePinned() const override { |
572 | | // memtable value is always pinned, except if we allow inplace update. |
573 | 418k | return value_pinned_; |
574 | 418k | } |
575 | | |
576 | | private: |
577 | | DynamicBloom* bloom_; |
578 | | const SliceTransform* const prefix_extractor_; |
579 | | const MemTable::KeyComparator comparator_; |
580 | | MemTableRep::Iterator* iter_; |
581 | | // The seqno to time mapping is owned by the SuperVersion. |
582 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping_; |
583 | | Status status_; |
584 | | Logger* logger_; |
585 | | size_t ts_sz_; |
586 | | uint32_t protection_bytes_per_key_; |
587 | | bool valid_; |
588 | | bool value_pinned_; |
589 | | bool arena_mode_; |
590 | | const bool paranoid_memory_checks_; |
591 | | const bool allow_data_in_error; |
592 | | |
593 | 1.41M | void VerifyEntryChecksum() { |
594 | 1.41M | if (protection_bytes_per_key_ > 0 && Valid()) { |
595 | 0 | status_ = MemTable::VerifyEntryChecksum(iter_->key(), |
596 | 0 | protection_bytes_per_key_); |
597 | 0 | if (!status_.ok()) { |
598 | 0 | ROCKS_LOG_ERROR(logger_, "In MemtableIterator: %s", status_.getState()); |
599 | 0 | } |
600 | 0 | } |
601 | 1.41M | } |
602 | | }; |
603 | | |
604 | | InternalIterator* MemTable::NewIterator( |
605 | | const ReadOptions& read_options, |
606 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena, |
607 | 45.0k | const SliceTransform* prefix_extractor, bool /*for_flush*/) { |
608 | 45.0k | assert(arena != nullptr); |
609 | 45.0k | auto mem = arena->AllocateAligned(sizeof(MemTableIterator)); |
610 | 45.0k | return new (mem) |
611 | 45.0k | MemTableIterator(MemTableIterator::kPointEntries, *this, read_options, |
612 | 45.0k | seqno_to_time_mapping, arena, prefix_extractor); |
613 | 45.0k | } |
614 | | |
615 | | // An iterator wrapper that wraps a MemTableIterator and logically strips each |
616 | | // key's user-defined timestamp. |
617 | | class TimestampStrippingIterator : public InternalIterator { |
618 | | public: |
619 | | TimestampStrippingIterator( |
620 | | MemTableIterator::Kind kind, const MemTable& memtable, |
621 | | const ReadOptions& read_options, |
622 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena, |
623 | | const SliceTransform* cf_prefix_extractor, size_t ts_sz) |
624 | 0 | : arena_mode_(arena != nullptr), kind_(kind), ts_sz_(ts_sz) { |
625 | 0 | assert(ts_sz_ != 0); |
626 | 0 | void* mem = arena ? arena->AllocateAligned(sizeof(MemTableIterator)) |
627 | 0 | : operator new(sizeof(MemTableIterator)); |
628 | 0 | iter_ = new (mem) |
629 | 0 | MemTableIterator(kind, memtable, read_options, seqno_to_time_mapping, |
630 | 0 | arena, cf_prefix_extractor); |
631 | 0 | } |
632 | | |
633 | | // No copying allowed |
634 | | TimestampStrippingIterator(const TimestampStrippingIterator&) = delete; |
635 | | void operator=(const TimestampStrippingIterator&) = delete; |
636 | | |
637 | 0 | ~TimestampStrippingIterator() override { |
638 | 0 | if (arena_mode_) { |
639 | 0 | iter_->~MemTableIterator(); |
640 | 0 | } else { |
641 | 0 | delete iter_; |
642 | 0 | } |
643 | 0 | } |
644 | | |
645 | 0 | void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override { |
646 | 0 | iter_->SetPinnedItersMgr(pinned_iters_mgr); |
647 | 0 | } |
648 | | |
649 | 0 | bool Valid() const override { return iter_->Valid(); } |
650 | 0 | void Seek(const Slice& k) override { |
651 | 0 | iter_->Seek(k); |
652 | 0 | UpdateKeyAndValueBuffer(); |
653 | 0 | } |
654 | 0 | void SeekForPrev(const Slice& k) override { |
655 | 0 | iter_->SeekForPrev(k); |
656 | 0 | UpdateKeyAndValueBuffer(); |
657 | 0 | } |
658 | 0 | void SeekToFirst() override { |
659 | 0 | iter_->SeekToFirst(); |
660 | 0 | UpdateKeyAndValueBuffer(); |
661 | 0 | } |
662 | 0 | void SeekToLast() override { |
663 | 0 | iter_->SeekToLast(); |
664 | 0 | UpdateKeyAndValueBuffer(); |
665 | 0 | } |
666 | 0 | void Next() override { |
667 | 0 | iter_->Next(); |
668 | 0 | UpdateKeyAndValueBuffer(); |
669 | 0 | } |
670 | 0 | bool NextAndGetResult(IterateResult* result) override { |
671 | 0 | iter_->Next(); |
672 | 0 | UpdateKeyAndValueBuffer(); |
673 | 0 | bool is_valid = Valid(); |
674 | 0 | if (is_valid) { |
675 | 0 | result->key = key(); |
676 | 0 | result->bound_check_result = IterBoundCheck::kUnknown; |
677 | 0 | result->value_prepared = true; |
678 | 0 | } |
679 | 0 | return is_valid; |
680 | 0 | } |
681 | 0 | void Prev() override { |
682 | 0 | iter_->Prev(); |
683 | 0 | UpdateKeyAndValueBuffer(); |
684 | 0 | } |
685 | 0 | Slice key() const override { |
686 | 0 | assert(Valid()); |
687 | 0 | return key_buf_; |
688 | 0 | } |
689 | | |
690 | 0 | uint64_t write_unix_time() const override { return iter_->write_unix_time(); } |
691 | 0 | Slice value() const override { |
692 | 0 | if (kind_ == MemTableIterator::Kind::kRangeDelEntries) { |
693 | 0 | return value_buf_; |
694 | 0 | } |
695 | 0 | return iter_->value(); |
696 | 0 | } |
697 | 0 | Status status() const override { return iter_->status(); } |
698 | 0 | bool IsKeyPinned() const override { |
699 | | // Key is only in a buffer that is updated in each iteration. |
700 | 0 | return false; |
701 | 0 | } |
702 | 0 | bool IsValuePinned() const override { |
703 | 0 | if (kind_ == MemTableIterator::Kind::kRangeDelEntries) { |
704 | 0 | return false; |
705 | 0 | } |
706 | 0 | return iter_->IsValuePinned(); |
707 | 0 | } |
708 | | |
709 | | private: |
710 | 0 | void UpdateKeyAndValueBuffer() { |
711 | 0 | key_buf_.clear(); |
712 | 0 | if (kind_ == MemTableIterator::Kind::kRangeDelEntries) { |
713 | 0 | value_buf_.clear(); |
714 | 0 | } |
715 | 0 | if (!Valid()) { |
716 | 0 | return; |
717 | 0 | } |
718 | 0 | Slice original_key = iter_->key(); |
719 | 0 | ReplaceInternalKeyWithMinTimestamp(&key_buf_, original_key, ts_sz_); |
720 | 0 | if (kind_ == MemTableIterator::Kind::kRangeDelEntries) { |
721 | 0 | Slice original_value = iter_->value(); |
722 | 0 | AppendUserKeyWithMinTimestamp(&value_buf_, original_value, ts_sz_); |
723 | 0 | } |
724 | 0 | } |
725 | | bool arena_mode_; |
726 | | MemTableIterator::Kind kind_; |
727 | | size_t ts_sz_; |
728 | | MemTableIterator* iter_; |
729 | | std::string key_buf_; |
730 | | std::string value_buf_; |
731 | | }; |
732 | | |
733 | | InternalIterator* MemTable::NewTimestampStrippingIterator( |
734 | | const ReadOptions& read_options, |
735 | | UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena, |
736 | 0 | const SliceTransform* prefix_extractor, size_t ts_sz) { |
737 | 0 | assert(arena != nullptr); |
738 | 0 | auto mem = arena->AllocateAligned(sizeof(TimestampStrippingIterator)); |
739 | 0 | return new (mem) TimestampStrippingIterator( |
740 | 0 | MemTableIterator::kPointEntries, *this, read_options, |
741 | 0 | seqno_to_time_mapping, arena, prefix_extractor, ts_sz); |
742 | 0 | } |
743 | | |
744 | | FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator( |
745 | | const ReadOptions& read_options, SequenceNumber read_seq, |
746 | 67.9k | bool immutable_memtable) { |
747 | 67.9k | if (read_options.ignore_range_deletions || |
748 | 67.9k | is_range_del_table_empty_.load(std::memory_order_relaxed)) { |
749 | 64.2k | return nullptr; |
750 | 64.2k | } |
751 | 3.72k | return NewRangeTombstoneIteratorInternal(read_options, read_seq, |
752 | 3.72k | immutable_memtable); |
753 | 67.9k | } |
754 | | |
755 | | FragmentedRangeTombstoneIterator* |
756 | | MemTable::NewTimestampStrippingRangeTombstoneIterator( |
757 | 0 | const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz) { |
758 | 0 | if (read_options.ignore_range_deletions || |
759 | 0 | is_range_del_table_empty_.load(std::memory_order_relaxed)) { |
760 | 0 | return nullptr; |
761 | 0 | } |
762 | 0 | if (!timestamp_stripping_fragmented_range_tombstone_list_) { |
763 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
764 | 0 | auto* unfragmented_iter = new TimestampStrippingIterator( |
765 | 0 | MemTableIterator::kRangeDelEntries, *this, ReadOptions(), |
766 | 0 | /*seqno_to_time_mapping*/ nullptr, /* arena */ nullptr, |
767 | 0 | /* prefix_extractor */ nullptr, ts_sz); |
768 | |
|
769 | 0 | timestamp_stripping_fragmented_range_tombstone_list_ = |
770 | 0 | std::make_unique<FragmentedRangeTombstoneList>( |
771 | 0 | std::unique_ptr<InternalIterator>(unfragmented_iter), |
772 | 0 | comparator_.comparator); |
773 | 0 | } |
774 | 0 | return new FragmentedRangeTombstoneIterator( |
775 | 0 | timestamp_stripping_fragmented_range_tombstone_list_.get(), |
776 | 0 | comparator_.comparator, read_seq, read_options.timestamp); |
777 | 0 | } |
778 | | |
779 | | FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal( |
780 | | const ReadOptions& read_options, SequenceNumber read_seq, |
781 | 3.72k | bool immutable_memtable) { |
782 | 3.72k | if (immutable_memtable) { |
783 | | // Note that caller should already have verified that |
784 | | // !is_range_del_table_empty_ |
785 | 0 | assert(IsFragmentedRangeTombstonesConstructed()); |
786 | 0 | return new FragmentedRangeTombstoneIterator( |
787 | 0 | fragmented_range_tombstone_list_.get(), comparator_.comparator, |
788 | 0 | read_seq, read_options.timestamp); |
789 | 0 | } |
790 | | |
791 | | // takes current cache |
792 | 3.72k | std::shared_ptr<FragmentedRangeTombstoneListCache> cache = |
793 | 3.72k | std::atomic_load_explicit(cached_range_tombstone_.Access(), |
794 | 3.72k | std::memory_order_relaxed); |
795 | | // construct fragmented tombstone list if necessary |
796 | 3.72k | if (!cache->initialized.load(std::memory_order_acquire)) { |
797 | 3.72k | cache->reader_mutex.lock(); |
798 | 3.72k | if (!cache->tombstones) { |
799 | 3.72k | auto* unfragmented_iter = new MemTableIterator( |
800 | 3.72k | MemTableIterator::kRangeDelEntries, *this, read_options); |
801 | 3.72k | cache->tombstones.reset(new FragmentedRangeTombstoneList( |
802 | 3.72k | std::unique_ptr<InternalIterator>(unfragmented_iter), |
803 | 3.72k | comparator_.comparator)); |
804 | 3.72k | cache->initialized.store(true, std::memory_order_release); |
805 | 3.72k | } |
806 | 3.72k | cache->reader_mutex.unlock(); |
807 | 3.72k | } |
808 | | |
809 | 3.72k | auto* fragmented_iter = new FragmentedRangeTombstoneIterator( |
810 | 3.72k | cache, comparator_.comparator, read_seq, read_options.timestamp); |
811 | 3.72k | return fragmented_iter; |
812 | 3.72k | } |
813 | | |
814 | 2.23k | void MemTable::ConstructFragmentedRangeTombstones() { |
815 | | // There should be no concurrent Construction. |
816 | | // We could also check fragmented_range_tombstone_list_ to avoid repeate |
817 | | // constructions. We just construct them here again to be safe. |
818 | 2.23k | if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) { |
819 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
820 | 0 | auto* unfragmented_iter = new MemTableIterator( |
821 | 0 | MemTableIterator::kRangeDelEntries, *this, ReadOptions()); |
822 | |
|
823 | 0 | fragmented_range_tombstone_list_ = |
824 | 0 | std::make_unique<FragmentedRangeTombstoneList>( |
825 | 0 | std::unique_ptr<InternalIterator>(unfragmented_iter), |
826 | 0 | comparator_.comparator); |
827 | 0 | } |
828 | 2.23k | } |
829 | | |
830 | 0 | port::RWMutex* MemTable::GetLock(const Slice& key) { |
831 | 0 | return &locks_[GetSliceRangedNPHash(key, locks_.size())]; |
832 | 0 | } |
833 | | |
834 | | ReadOnlyMemTable::MemTableStats MemTable::ApproximateStats( |
835 | 0 | const Slice& start_ikey, const Slice& end_ikey) { |
836 | 0 | uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey); |
837 | 0 | entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey); |
838 | 0 | if (entry_count == 0) { |
839 | 0 | return {0, 0}; |
840 | 0 | } |
841 | 0 | uint64_t n = num_entries_.load(std::memory_order_relaxed); |
842 | 0 | if (n == 0) { |
843 | 0 | return {0, 0}; |
844 | 0 | } |
845 | 0 | if (entry_count > n) { |
846 | | // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can |
847 | | // be larger than actual entries we have. Cap it to entries we have to limit |
848 | | // the inaccuracy. |
849 | 0 | entry_count = n; |
850 | 0 | } |
851 | 0 | uint64_t data_size = data_size_.load(std::memory_order_relaxed); |
852 | 0 | return {entry_count * (data_size / n), entry_count}; |
853 | 0 | } |
854 | | |
855 | | Status MemTable::VerifyEncodedEntry(Slice encoded, |
856 | 931k | const ProtectionInfoKVOS64& kv_prot_info) { |
857 | 931k | uint32_t ikey_len = 0; |
858 | 931k | if (!GetVarint32(&encoded, &ikey_len)) { |
859 | 0 | return Status::Corruption("Unable to parse internal key length"); |
860 | 0 | } |
861 | 931k | if (ikey_len < 8 + ts_sz_) { |
862 | 0 | return Status::Corruption("Internal key length too short"); |
863 | 0 | } |
864 | 931k | if (ikey_len > encoded.size()) { |
865 | 0 | return Status::Corruption("Internal key length too long"); |
866 | 0 | } |
867 | 931k | uint32_t value_len = 0; |
868 | 931k | const size_t user_key_len = ikey_len - 8; |
869 | 931k | Slice key(encoded.data(), user_key_len); |
870 | 931k | encoded.remove_prefix(user_key_len); |
871 | | |
872 | 931k | uint64_t packed = DecodeFixed64(encoded.data()); |
873 | 931k | ValueType value_type = kMaxValue; |
874 | 931k | SequenceNumber sequence_number = kMaxSequenceNumber; |
875 | 931k | UnPackSequenceAndType(packed, &sequence_number, &value_type); |
876 | 931k | encoded.remove_prefix(8); |
877 | | |
878 | 931k | if (!GetVarint32(&encoded, &value_len)) { |
879 | 0 | return Status::Corruption("Unable to parse value length"); |
880 | 0 | } |
881 | 931k | if (value_len < encoded.size()) { |
882 | 0 | return Status::Corruption("Value length too short"); |
883 | 0 | } |
884 | 931k | if (value_len > encoded.size()) { |
885 | 0 | return Status::Corruption("Value length too long"); |
886 | 0 | } |
887 | 931k | Slice value(encoded.data(), value_len); |
888 | | |
889 | 931k | return kv_prot_info.StripS(sequence_number) |
890 | 931k | .StripKVO(key, value, value_type) |
891 | 931k | .GetStatus(); |
892 | 931k | } |
893 | | |
894 | | void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info, |
895 | | const Slice& key, const Slice& value, |
896 | | ValueType type, SequenceNumber s, |
897 | 1.89M | char* checksum_ptr) { |
898 | 1.89M | if (moptions_.protection_bytes_per_key == 0) { |
899 | 1.89M | return; |
900 | 1.89M | } |
901 | | |
902 | 0 | if (kv_prot_info == nullptr) { |
903 | 0 | ProtectionInfo64() |
904 | 0 | .ProtectKVO(key, value, type) |
905 | 0 | .ProtectS(s) |
906 | 0 | .Encode(static_cast<uint8_t>(moptions_.protection_bytes_per_key), |
907 | 0 | checksum_ptr); |
908 | 0 | } else { |
909 | 0 | kv_prot_info->Encode( |
910 | 0 | static_cast<uint8_t>(moptions_.protection_bytes_per_key), checksum_ptr); |
911 | 0 | } |
912 | 0 | } |
913 | | |
914 | | Status MemTable::Add(SequenceNumber s, ValueType type, |
915 | | const Slice& key, /* user key */ |
916 | | const Slice& value, |
917 | | const ProtectionInfoKVOS64* kv_prot_info, |
918 | | bool allow_concurrent, |
919 | 1.89M | MemTablePostProcessInfo* post_process_info, void** hint) { |
920 | | // Format of an entry is concatenation of: |
921 | | // key_size : varint32 of internal_key.size() |
922 | | // key bytes : char[internal_key.size()] |
923 | | // value_size : varint32 of value.size() |
924 | | // value bytes : char[value.size()] |
925 | | // checksum : char[moptions_.protection_bytes_per_key] |
926 | 1.89M | uint32_t key_size = static_cast<uint32_t>(key.size()); |
927 | 1.89M | uint32_t val_size = static_cast<uint32_t>(value.size()); |
928 | 1.89M | uint32_t internal_key_size = key_size + 8; |
929 | 1.89M | const uint32_t encoded_len = VarintLength(internal_key_size) + |
930 | 1.89M | internal_key_size + VarintLength(val_size) + |
931 | 1.89M | val_size + moptions_.protection_bytes_per_key; |
932 | 1.89M | char* buf = nullptr; |
933 | 1.89M | std::unique_ptr<MemTableRep>& table = |
934 | 1.89M | type == kTypeRangeDeletion ? range_del_table_ : table_; |
935 | 1.89M | KeyHandle handle = table->Allocate(encoded_len, &buf); |
936 | | |
937 | 1.89M | char* p = EncodeVarint32(buf, internal_key_size); |
938 | 1.89M | memcpy(p, key.data(), key_size); |
939 | 1.89M | Slice key_slice(p, key_size); |
940 | 1.89M | p += key_size; |
941 | 1.89M | uint64_t packed = PackSequenceAndType(s, type); |
942 | 1.89M | EncodeFixed64(p, packed); |
943 | 1.89M | p += 8; |
944 | 1.89M | p = EncodeVarint32(p, val_size); |
945 | 1.89M | memcpy(p, value.data(), val_size); |
946 | 1.89M | assert((unsigned)(p + val_size - buf + moptions_.protection_bytes_per_key) == |
947 | 1.89M | (unsigned)encoded_len); |
948 | | |
949 | 1.89M | UpdateEntryChecksum(kv_prot_info, key, value, type, s, |
950 | 1.89M | buf + encoded_len - moptions_.protection_bytes_per_key); |
951 | 1.89M | Slice encoded(buf, encoded_len - moptions_.protection_bytes_per_key); |
952 | 1.89M | if (kv_prot_info != nullptr) { |
953 | 931k | TEST_SYNC_POINT_CALLBACK("MemTable::Add:Encoded", &encoded); |
954 | 931k | Status status = VerifyEncodedEntry(encoded, *kv_prot_info); |
955 | 931k | if (!status.ok()) { |
956 | 0 | return status; |
957 | 0 | } |
958 | 931k | } |
959 | | |
960 | 1.89M | Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_); |
961 | | |
962 | 1.89M | if (!allow_concurrent) { |
963 | | // Extract prefix for insert with hint. Hints are for point key table |
964 | | // (`table_`) only, not `range_del_table_`. |
965 | 1.89M | if (table == table_ && insert_with_hint_prefix_extractor_ != nullptr && |
966 | 1.89M | insert_with_hint_prefix_extractor_->InDomain(key_slice)) { |
967 | 0 | Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice); |
968 | 0 | bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]); |
969 | 0 | if (UNLIKELY(!res)) { |
970 | 0 | return Status::TryAgain("key+seq exists"); |
971 | 0 | } |
972 | 1.89M | } else { |
973 | 1.89M | bool res = table->InsertKey(handle); |
974 | 1.89M | if (UNLIKELY(!res)) { |
975 | 0 | return Status::TryAgain("key+seq exists"); |
976 | 0 | } |
977 | 1.89M | } |
978 | | |
979 | | // this is a bit ugly, but is the way to avoid locked instructions |
980 | | // when incrementing an atomic |
981 | 1.89M | num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1, |
982 | 1.89M | std::memory_order_relaxed); |
983 | 1.89M | data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len, |
984 | 1.89M | std::memory_order_relaxed); |
985 | 1.89M | if (type == kTypeDeletion || type == kTypeSingleDeletion || |
986 | 1.89M | type == kTypeDeletionWithTimestamp) { |
987 | 247k | num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1, |
988 | 247k | std::memory_order_relaxed); |
989 | 1.64M | } else if (type == kTypeRangeDeletion) { |
990 | 829k | uint64_t val = num_range_deletes_.load(std::memory_order_relaxed) + 1; |
991 | 829k | num_range_deletes_.store(val, std::memory_order_relaxed); |
992 | 829k | } |
993 | | |
994 | 1.89M | if (bloom_filter_ && prefix_extractor_ && |
995 | 1.89M | prefix_extractor_->InDomain(key_without_ts)) { |
996 | 0 | bloom_filter_->Add(prefix_extractor_->Transform(key_without_ts)); |
997 | 0 | } |
998 | 1.89M | if (bloom_filter_ && moptions_.memtable_whole_key_filtering) { |
999 | 0 | bloom_filter_->Add(key_without_ts); |
1000 | 0 | } |
1001 | | |
1002 | | // The first sequence number inserted into the memtable |
1003 | 1.89M | assert(first_seqno_ == 0 || s >= first_seqno_); |
1004 | 1.89M | if (first_seqno_ == 0) { |
1005 | 65.2k | first_seqno_.store(s, std::memory_order_relaxed); |
1006 | | |
1007 | 65.2k | if (earliest_seqno_ == kMaxSequenceNumber) { |
1008 | 0 | earliest_seqno_.store(GetFirstSequenceNumber(), |
1009 | 0 | std::memory_order_relaxed); |
1010 | 0 | } |
1011 | 65.2k | assert(first_seqno_.load() >= earliest_seqno_.load()); |
1012 | 65.2k | } |
1013 | 1.89M | assert(post_process_info == nullptr); |
1014 | | // TODO(yuzhangyu): support updating newest UDT for when `allow_concurrent` |
1015 | | // is true. |
1016 | 1.89M | MaybeUpdateNewestUDT(key_slice); |
1017 | 1.89M | UpdateFlushState(); |
1018 | 1.89M | } else { |
1019 | 0 | bool res = (hint == nullptr) |
1020 | 0 | ? table->InsertKeyConcurrently(handle) |
1021 | 0 | : table->InsertKeyWithHintConcurrently(handle, hint); |
1022 | 0 | if (UNLIKELY(!res)) { |
1023 | 0 | return Status::TryAgain("key+seq exists"); |
1024 | 0 | } |
1025 | | |
1026 | 0 | assert(post_process_info != nullptr); |
1027 | 0 | post_process_info->num_entries++; |
1028 | 0 | post_process_info->data_size += encoded_len; |
1029 | 0 | if (type == kTypeDeletion) { |
1030 | 0 | post_process_info->num_deletes++; |
1031 | 0 | } |
1032 | |
|
1033 | 0 | if (bloom_filter_ && prefix_extractor_ && |
1034 | 0 | prefix_extractor_->InDomain(key_without_ts)) { |
1035 | 0 | bloom_filter_->AddConcurrently( |
1036 | 0 | prefix_extractor_->Transform(key_without_ts)); |
1037 | 0 | } |
1038 | 0 | if (bloom_filter_ && moptions_.memtable_whole_key_filtering) { |
1039 | 0 | bloom_filter_->AddConcurrently(key_without_ts); |
1040 | 0 | } |
1041 | | |
1042 | | // atomically update first_seqno_ and earliest_seqno_. |
1043 | 0 | uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed); |
1044 | 0 | while ((cur_seq_num == 0 || s < cur_seq_num) && |
1045 | 0 | !first_seqno_.compare_exchange_weak(cur_seq_num, s)) { |
1046 | 0 | } |
1047 | 0 | uint64_t cur_earliest_seqno = |
1048 | 0 | earliest_seqno_.load(std::memory_order_relaxed); |
1049 | 0 | while ( |
1050 | 0 | (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) && |
1051 | 0 | !earliest_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) { |
1052 | 0 | } |
1053 | 0 | } |
1054 | 1.89M | if (type == kTypeRangeDeletion) { |
1055 | 829k | auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>(); |
1056 | 829k | size_t size = cached_range_tombstone_.Size(); |
1057 | 829k | if (allow_concurrent) { |
1058 | 0 | post_process_info->num_range_deletes++; |
1059 | 0 | range_del_mutex_.lock(); |
1060 | 0 | } |
1061 | 27.3M | for (size_t i = 0; i < size; ++i) { |
1062 | 26.5M | std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr = |
1063 | 26.5M | cached_range_tombstone_.AccessAtCore(i); |
1064 | 26.5M | auto new_local_cache_ref = std::make_shared< |
1065 | 26.5M | const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache); |
1066 | | // It is okay for some reader to load old cache during invalidation as |
1067 | | // the new sequence number is not published yet. |
1068 | | // Each core will have a shared_ptr to a shared_ptr to the cached |
1069 | | // fragmented range tombstones, so that ref count is maintianed locally |
1070 | | // per-core using the per-core shared_ptr. |
1071 | 26.5M | std::atomic_store_explicit( |
1072 | 26.5M | local_cache_ref_ptr, |
1073 | 26.5M | std::shared_ptr<FragmentedRangeTombstoneListCache>( |
1074 | 26.5M | new_local_cache_ref, new_cache.get()), |
1075 | 26.5M | std::memory_order_relaxed); |
1076 | 26.5M | } |
1077 | | |
1078 | 829k | if (allow_concurrent) { |
1079 | 0 | range_del_mutex_.unlock(); |
1080 | 0 | } |
1081 | 829k | is_range_del_table_empty_.store(false, std::memory_order_relaxed); |
1082 | 829k | } |
1083 | 1.89M | UpdateOldestKeyTime(); |
1084 | | |
1085 | 1.89M | TEST_SYNC_POINT_CALLBACK("MemTable::Add:BeforeReturn:Encoded", &encoded); |
1086 | 1.89M | return Status::OK(); |
1087 | 1.89M | } |
1088 | | |
1089 | | // Callback from MemTable::Get() |
1090 | | namespace { |
1091 | | |
1092 | | struct Saver { |
1093 | | Status* status; |
1094 | | const LookupKey* key; |
1095 | | bool* found_final_value; // Is value set correctly? Used by KeyMayExist |
1096 | | bool* merge_in_progress; |
1097 | | std::string* value; |
1098 | | PinnableWideColumns* columns; |
1099 | | SequenceNumber seq; |
1100 | | std::string* timestamp; |
1101 | | const MergeOperator* merge_operator; |
1102 | | // the merge operations encountered; |
1103 | | MergeContext* merge_context; |
1104 | | SequenceNumber max_covering_tombstone_seq; |
1105 | | MemTable* mem; |
1106 | | Logger* logger; |
1107 | | Statistics* statistics; |
1108 | | bool inplace_update_support; |
1109 | | bool do_merge; |
1110 | | SystemClock* clock; |
1111 | | |
1112 | | ReadCallback* callback_; |
1113 | | bool* is_blob_index; |
1114 | | bool allow_data_in_errors; |
1115 | | uint32_t protection_bytes_per_key; |
1116 | 21.7k | bool CheckCallback(SequenceNumber _seq) { |
1117 | 21.7k | if (callback_) { |
1118 | 0 | return callback_->IsVisible(_seq); |
1119 | 0 | } |
1120 | 21.7k | return true; |
1121 | 21.7k | } |
1122 | | }; |
1123 | | } // anonymous namespace |
1124 | | |
1125 | 22.8k | static bool SaveValue(void* arg, const char* entry) { |
1126 | 22.8k | Saver* s = static_cast<Saver*>(arg); |
1127 | 22.8k | assert(s != nullptr); |
1128 | 22.8k | assert(!s->value || !s->columns); |
1129 | 22.8k | assert(!*(s->found_final_value)); |
1130 | 22.8k | assert(s->status->ok() || s->status->IsMergeInProgress()); |
1131 | | |
1132 | 22.8k | MergeContext* merge_context = s->merge_context; |
1133 | 22.8k | SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq; |
1134 | 22.8k | const MergeOperator* merge_operator = s->merge_operator; |
1135 | | |
1136 | 22.8k | assert(merge_context != nullptr); |
1137 | | |
1138 | | // Refer to comments under MemTable::Add() for entry format. |
1139 | | // Check that it belongs to same user key. |
1140 | 22.8k | uint32_t key_length = 0; |
1141 | 22.8k | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
1142 | 22.8k | assert(key_length >= 8); |
1143 | 22.8k | Slice user_key_slice = Slice(key_ptr, key_length - 8); |
1144 | 22.8k | const Comparator* user_comparator = |
1145 | 22.8k | s->mem->GetInternalKeyComparator().user_comparator(); |
1146 | 22.8k | size_t ts_sz = user_comparator->timestamp_size(); |
1147 | 22.8k | if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) { |
1148 | | // timestamp should already be set to range tombstone timestamp |
1149 | 0 | assert(s->timestamp->size() == ts_sz); |
1150 | 0 | } |
1151 | 22.8k | if (user_comparator->EqualWithoutTimestamp(user_key_slice, |
1152 | 22.8k | s->key->user_key())) { |
1153 | | // Correct user key |
1154 | 21.7k | TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Found:entry", &entry); |
1155 | 21.7k | std::optional<ReadLock> read_lock; |
1156 | 21.7k | if (s->inplace_update_support) { |
1157 | 0 | read_lock.emplace(s->mem->GetLock(s->key->user_key())); |
1158 | 0 | } |
1159 | | |
1160 | 21.7k | if (s->protection_bytes_per_key > 0) { |
1161 | 0 | *(s->status) = MemTable::VerifyEntryChecksum( |
1162 | 0 | entry, s->protection_bytes_per_key, s->allow_data_in_errors); |
1163 | 0 | if (!s->status->ok()) { |
1164 | 0 | *(s->found_final_value) = true; |
1165 | 0 | ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState()); |
1166 | | // Memtable entry corrupted |
1167 | 0 | return false; |
1168 | 0 | } |
1169 | 0 | } |
1170 | | |
1171 | 21.7k | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
1172 | 21.7k | ValueType type; |
1173 | 21.7k | SequenceNumber seq; |
1174 | 21.7k | UnPackSequenceAndType(tag, &seq, &type); |
1175 | | // If the value is not in the snapshot, skip it |
1176 | 21.7k | if (!s->CheckCallback(seq)) { |
1177 | 0 | return true; // to continue to the next seq |
1178 | 0 | } |
1179 | | |
1180 | 21.7k | if (s->seq == kMaxSequenceNumber) { |
1181 | 21.7k | s->seq = seq; |
1182 | 21.7k | if (s->seq > max_covering_tombstone_seq) { |
1183 | 21.7k | if (ts_sz && s->timestamp != nullptr) { |
1184 | | // `timestamp` was set to range tombstone's timestamp before |
1185 | | // `SaveValue` is ever called. This key has a higher sequence number |
1186 | | // than range tombstone, and is the key with the highest seqno across |
1187 | | // all keys with this user_key, so we update timestamp here. |
1188 | 0 | Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); |
1189 | 0 | s->timestamp->assign(ts.data(), ts_sz); |
1190 | 0 | } |
1191 | 21.7k | } else { |
1192 | 0 | s->seq = max_covering_tombstone_seq; |
1193 | 0 | } |
1194 | 21.7k | } |
1195 | | |
1196 | 21.7k | if (ts_sz > 0 && s->timestamp != nullptr) { |
1197 | 0 | if (!s->timestamp->empty()) { |
1198 | 0 | assert(ts_sz == s->timestamp->size()); |
1199 | 0 | } |
1200 | | // TODO optimize for smaller size ts |
1201 | 0 | const std::string kMaxTs(ts_sz, '\xff'); |
1202 | 0 | if (s->timestamp->empty() || |
1203 | 0 | user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) { |
1204 | 0 | Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz); |
1205 | 0 | s->timestamp->assign(ts.data(), ts_sz); |
1206 | 0 | } |
1207 | 0 | } |
1208 | | |
1209 | 21.7k | if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex || |
1210 | 21.7k | type == kTypeWideColumnEntity || type == kTypeDeletion || |
1211 | 21.7k | type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp || |
1212 | 21.7k | type == kTypeValuePreferredSeqno) && |
1213 | 21.7k | max_covering_tombstone_seq > seq) { |
1214 | 0 | type = kTypeRangeDeletion; |
1215 | 0 | } |
1216 | 21.7k | switch (type) { |
1217 | 0 | case kTypeBlobIndex: { |
1218 | 0 | if (!s->do_merge) { |
1219 | 0 | *(s->status) = Status::NotSupported( |
1220 | 0 | "GetMergeOperands not supported by stacked BlobDB"); |
1221 | 0 | *(s->found_final_value) = true; |
1222 | 0 | return false; |
1223 | 0 | } |
1224 | | |
1225 | 0 | if (*(s->merge_in_progress)) { |
1226 | 0 | *(s->status) = Status::NotSupported( |
1227 | 0 | "Merge operator not supported by stacked BlobDB"); |
1228 | 0 | *(s->found_final_value) = true; |
1229 | 0 | return false; |
1230 | 0 | } |
1231 | | |
1232 | 0 | if (s->is_blob_index == nullptr) { |
1233 | 0 | ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index."); |
1234 | 0 | *(s->status) = Status::NotSupported( |
1235 | 0 | "Encountered unexpected blob index. Please open DB with " |
1236 | 0 | "ROCKSDB_NAMESPACE::blob_db::BlobDB."); |
1237 | 0 | *(s->found_final_value) = true; |
1238 | 0 | return false; |
1239 | 0 | } |
1240 | | |
1241 | 0 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1242 | |
|
1243 | 0 | *(s->status) = Status::OK(); |
1244 | |
|
1245 | 0 | if (s->value) { |
1246 | 0 | s->value->assign(v.data(), v.size()); |
1247 | 0 | } else if (s->columns) { |
1248 | 0 | s->columns->SetPlainValue(v); |
1249 | 0 | } |
1250 | |
|
1251 | 0 | *(s->found_final_value) = true; |
1252 | 0 | *(s->is_blob_index) = true; |
1253 | |
|
1254 | 0 | return false; |
1255 | 0 | } |
1256 | 21.4k | case kTypeValue: |
1257 | 21.4k | case kTypeValuePreferredSeqno: { |
1258 | 21.4k | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1259 | 21.4k | if (type == kTypeValuePreferredSeqno) { |
1260 | 0 | v = ParsePackedValueForValue(v); |
1261 | 0 | } |
1262 | | |
1263 | 21.4k | ReadOnlyMemTable::HandleTypeValue( |
1264 | 21.4k | s->key->user_key(), v, s->inplace_update_support == false, |
1265 | 21.4k | s->do_merge, *(s->merge_in_progress), merge_context, |
1266 | 21.4k | s->merge_operator, s->clock, s->statistics, s->logger, s->status, |
1267 | 21.4k | s->value, s->columns, s->is_blob_index); |
1268 | 21.4k | *(s->found_final_value) = true; |
1269 | 21.4k | return false; |
1270 | 21.4k | } |
1271 | 0 | case kTypeWideColumnEntity: { |
1272 | 0 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1273 | |
|
1274 | 0 | *(s->status) = Status::OK(); |
1275 | |
|
1276 | 0 | if (!s->do_merge) { |
1277 | | // Preserve the value with the goal of returning it as part of |
1278 | | // raw merge operands to the user |
1279 | |
|
1280 | 0 | Slice value_of_default; |
1281 | 0 | *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn( |
1282 | 0 | v, value_of_default); |
1283 | |
|
1284 | 0 | if (s->status->ok()) { |
1285 | 0 | merge_context->PushOperand( |
1286 | 0 | value_of_default, |
1287 | 0 | s->inplace_update_support == false /* operand_pinned */); |
1288 | 0 | } |
1289 | 0 | } else if (*(s->merge_in_progress)) { |
1290 | 0 | assert(s->do_merge); |
1291 | |
|
1292 | 0 | if (s->value || s->columns) { |
1293 | | // `op_failure_scope` (an output parameter) is not provided (set |
1294 | | // to nullptr) since a failure must be propagated regardless of |
1295 | | // its value. |
1296 | 0 | *(s->status) = MergeHelper::TimedFullMerge( |
1297 | 0 | merge_operator, s->key->user_key(), MergeHelper::kWideBaseValue, |
1298 | 0 | v, merge_context->GetOperands(), s->logger, s->statistics, |
1299 | 0 | s->clock, /* update_num_ops_stats */ true, |
1300 | 0 | /* op_failure_scope */ nullptr, s->value, s->columns); |
1301 | 0 | } |
1302 | 0 | } else if (s->value) { |
1303 | 0 | Slice value_of_default; |
1304 | 0 | *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn( |
1305 | 0 | v, value_of_default); |
1306 | 0 | if (s->status->ok()) { |
1307 | 0 | s->value->assign(value_of_default.data(), value_of_default.size()); |
1308 | 0 | } |
1309 | 0 | } else if (s->columns) { |
1310 | 0 | *(s->status) = s->columns->SetWideColumnValue(v); |
1311 | 0 | } |
1312 | |
|
1313 | 0 | *(s->found_final_value) = true; |
1314 | |
|
1315 | 0 | if (s->is_blob_index != nullptr) { |
1316 | 0 | *(s->is_blob_index) = false; |
1317 | 0 | } |
1318 | |
|
1319 | 0 | return false; |
1320 | 21.4k | } |
1321 | 365 | case kTypeDeletion: |
1322 | 365 | case kTypeDeletionWithTimestamp: |
1323 | 365 | case kTypeSingleDeletion: |
1324 | 365 | case kTypeRangeDeletion: { |
1325 | 365 | ReadOnlyMemTable::HandleTypeDeletion( |
1326 | 365 | s->key->user_key(), *(s->merge_in_progress), s->merge_context, |
1327 | 365 | s->merge_operator, s->clock, s->statistics, s->logger, s->status, |
1328 | 365 | s->value, s->columns); |
1329 | 365 | *(s->found_final_value) = true; |
1330 | 365 | return false; |
1331 | 365 | } |
1332 | 0 | case kTypeMerge: { |
1333 | 0 | Slice v = GetLengthPrefixedSlice(key_ptr + key_length); |
1334 | 0 | *(s->merge_in_progress) = true; |
1335 | 0 | *(s->found_final_value) = ReadOnlyMemTable::HandleTypeMerge( |
1336 | 0 | s->key->user_key(), v, s->inplace_update_support == false, |
1337 | 0 | s->do_merge, merge_context, s->merge_operator, s->clock, |
1338 | 0 | s->statistics, s->logger, s->status, s->value, s->columns); |
1339 | 0 | return !*(s->found_final_value); |
1340 | 365 | } |
1341 | 0 | default: { |
1342 | 0 | std::string msg("Corrupted value not expected."); |
1343 | 0 | if (s->allow_data_in_errors) { |
1344 | 0 | msg.append("Unrecognized value type: " + |
1345 | 0 | std::to_string(static_cast<int>(type)) + ". "); |
1346 | 0 | msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) + |
1347 | 0 | ". "); |
1348 | 0 | msg.append("seq: " + std::to_string(seq) + "."); |
1349 | 0 | } |
1350 | 0 | *(s->found_final_value) = true; |
1351 | 0 | *(s->status) = Status::Corruption(msg.c_str()); |
1352 | 0 | return false; |
1353 | 365 | } |
1354 | 21.7k | } |
1355 | 21.7k | } |
1356 | | |
1357 | | // s->state could be Corrupt, merge or notfound |
1358 | 1.05k | return false; |
1359 | 22.8k | } |
1360 | | |
1361 | | bool MemTable::Get(const LookupKey& key, std::string* value, |
1362 | | PinnableWideColumns* columns, std::string* timestamp, |
1363 | | Status* s, MergeContext* merge_context, |
1364 | | SequenceNumber* max_covering_tombstone_seq, |
1365 | | SequenceNumber* seq, const ReadOptions& read_opts, |
1366 | | bool immutable_memtable, ReadCallback* callback, |
1367 | 24.7k | bool* is_blob_index, bool do_merge) { |
1368 | | // The sequence number is updated synchronously in version_set.h |
1369 | 24.7k | if (IsEmpty()) { |
1370 | | // Avoiding recording stats for speed. |
1371 | 1.79k | return false; |
1372 | 1.79k | } |
1373 | | |
1374 | 22.9k | PERF_TIMER_GUARD(get_from_memtable_time); |
1375 | | |
1376 | 22.9k | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
1377 | 22.9k | NewRangeTombstoneIterator(read_opts, |
1378 | 22.9k | GetInternalKeySeqno(key.internal_key()), |
1379 | 22.9k | immutable_memtable)); |
1380 | 22.9k | if (range_del_iter != nullptr) { |
1381 | 0 | SequenceNumber covering_seq = |
1382 | 0 | range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key()); |
1383 | 0 | if (covering_seq > *max_covering_tombstone_seq) { |
1384 | 0 | *max_covering_tombstone_seq = covering_seq; |
1385 | 0 | if (timestamp) { |
1386 | | // Will be overwritten in SaveValue() if there is a point key with |
1387 | | // a higher seqno. |
1388 | 0 | timestamp->assign(range_del_iter->timestamp().data(), |
1389 | 0 | range_del_iter->timestamp().size()); |
1390 | 0 | } |
1391 | 0 | } |
1392 | 0 | } |
1393 | | |
1394 | 22.9k | bool found_final_value = false; |
1395 | 22.9k | bool merge_in_progress = s->IsMergeInProgress(); |
1396 | 22.9k | bool may_contain = true; |
1397 | 22.9k | Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz_); |
1398 | 22.9k | bool bloom_checked = false; |
1399 | 22.9k | if (bloom_filter_) { |
1400 | | // when both memtable_whole_key_filtering and prefix_extractor_ are set, |
1401 | | // only do whole key filtering for Get() to save CPU |
1402 | 0 | if (moptions_.memtable_whole_key_filtering) { |
1403 | 0 | may_contain = bloom_filter_->MayContain(user_key_without_ts); |
1404 | 0 | bloom_checked = true; |
1405 | 0 | } else { |
1406 | 0 | assert(prefix_extractor_); |
1407 | 0 | if (prefix_extractor_->InDomain(user_key_without_ts)) { |
1408 | 0 | may_contain = bloom_filter_->MayContain( |
1409 | 0 | prefix_extractor_->Transform(user_key_without_ts)); |
1410 | 0 | bloom_checked = true; |
1411 | 0 | } |
1412 | 0 | } |
1413 | 0 | } |
1414 | | |
1415 | 22.9k | if (bloom_filter_ && !may_contain) { |
1416 | | // iter is null if prefix bloom says the key does not exist |
1417 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
1418 | 0 | *seq = kMaxSequenceNumber; |
1419 | 22.9k | } else { |
1420 | 22.9k | if (bloom_checked) { |
1421 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
1422 | 0 | } |
1423 | 22.9k | GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback, |
1424 | 22.9k | is_blob_index, value, columns, timestamp, s, merge_context, |
1425 | 22.9k | seq, &found_final_value, &merge_in_progress); |
1426 | 22.9k | } |
1427 | | |
1428 | | // No change to value, since we have not yet found a Put/Delete |
1429 | | // Propagate corruption error |
1430 | 22.9k | if (!found_final_value && merge_in_progress) { |
1431 | 0 | if (s->ok()) { |
1432 | 0 | *s = Status::MergeInProgress(); |
1433 | 0 | } else { |
1434 | 0 | assert(s->IsMergeInProgress()); |
1435 | 0 | } |
1436 | 0 | } |
1437 | 22.9k | PERF_COUNTER_ADD(get_from_memtable_count, 1); |
1438 | 22.9k | return found_final_value; |
1439 | 24.7k | } |
1440 | | |
1441 | | void MemTable::GetFromTable(const LookupKey& key, |
1442 | | SequenceNumber max_covering_tombstone_seq, |
1443 | | bool do_merge, ReadCallback* callback, |
1444 | | bool* is_blob_index, std::string* value, |
1445 | | PinnableWideColumns* columns, |
1446 | | std::string* timestamp, Status* s, |
1447 | | MergeContext* merge_context, SequenceNumber* seq, |
1448 | 22.9k | bool* found_final_value, bool* merge_in_progress) { |
1449 | 22.9k | Saver saver; |
1450 | 22.9k | saver.status = s; |
1451 | 22.9k | saver.found_final_value = found_final_value; |
1452 | 22.9k | saver.merge_in_progress = merge_in_progress; |
1453 | 22.9k | saver.key = &key; |
1454 | 22.9k | saver.value = value; |
1455 | 22.9k | saver.columns = columns; |
1456 | 22.9k | saver.timestamp = timestamp; |
1457 | 22.9k | saver.seq = kMaxSequenceNumber; |
1458 | 22.9k | saver.mem = this; |
1459 | 22.9k | saver.merge_context = merge_context; |
1460 | 22.9k | saver.max_covering_tombstone_seq = max_covering_tombstone_seq; |
1461 | 22.9k | saver.merge_operator = moptions_.merge_operator; |
1462 | 22.9k | saver.logger = moptions_.info_log; |
1463 | 22.9k | saver.inplace_update_support = moptions_.inplace_update_support; |
1464 | 22.9k | saver.statistics = moptions_.statistics; |
1465 | 22.9k | saver.clock = clock_; |
1466 | 22.9k | saver.callback_ = callback; |
1467 | 22.9k | saver.is_blob_index = is_blob_index; |
1468 | 22.9k | saver.do_merge = do_merge; |
1469 | 22.9k | saver.allow_data_in_errors = moptions_.allow_data_in_errors; |
1470 | 22.9k | saver.protection_bytes_per_key = moptions_.protection_bytes_per_key; |
1471 | | |
1472 | 22.9k | if (!moptions_.paranoid_memory_checks) { |
1473 | 22.9k | table_->Get(key, &saver, SaveValue); |
1474 | 22.9k | } else { |
1475 | 0 | Status check_s = table_->GetAndValidate(key, &saver, SaveValue, |
1476 | 0 | moptions_.allow_data_in_errors); |
1477 | 0 | if (check_s.IsCorruption()) { |
1478 | 0 | *(saver.status) = check_s; |
1479 | | // Should stop searching the LSM. |
1480 | 0 | *(saver.found_final_value) = true; |
1481 | 0 | } |
1482 | 0 | } |
1483 | 22.9k | assert(s->ok() || s->IsMergeInProgress() || *found_final_value); |
1484 | 22.9k | *seq = saver.seq; |
1485 | 22.9k | } |
1486 | | |
1487 | | void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range, |
1488 | 0 | ReadCallback* callback, bool immutable_memtable) { |
1489 | | // The sequence number is updated synchronously in version_set.h |
1490 | 0 | if (IsEmpty()) { |
1491 | | // Avoiding recording stats for speed. |
1492 | 0 | return; |
1493 | 0 | } |
1494 | 0 | PERF_TIMER_GUARD(get_from_memtable_time); |
1495 | | |
1496 | | // For now, memtable Bloom filter is effectively disabled if there are any |
1497 | | // range tombstones. This is the simplest way to ensure range tombstones are |
1498 | | // handled. TODO: allow Bloom checks where max_covering_tombstone_seq==0 |
1499 | 0 | bool no_range_del = read_options.ignore_range_deletions || |
1500 | 0 | is_range_del_table_empty_.load(std::memory_order_relaxed); |
1501 | 0 | MultiGetRange temp_range(*range, range->begin(), range->end()); |
1502 | 0 | if (bloom_filter_ && no_range_del) { |
1503 | 0 | bool whole_key = |
1504 | 0 | !prefix_extractor_ || moptions_.memtable_whole_key_filtering; |
1505 | 0 | std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys; |
1506 | 0 | std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match; |
1507 | 0 | std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes; |
1508 | 0 | int num_keys = 0; |
1509 | 0 | for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { |
1510 | 0 | if (whole_key) { |
1511 | 0 | bloom_keys[num_keys] = iter->ukey_without_ts; |
1512 | 0 | range_indexes[num_keys++] = iter.index(); |
1513 | 0 | } else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) { |
1514 | 0 | bloom_keys[num_keys] = |
1515 | 0 | prefix_extractor_->Transform(iter->ukey_without_ts); |
1516 | 0 | range_indexes[num_keys++] = iter.index(); |
1517 | 0 | } |
1518 | 0 | } |
1519 | 0 | bloom_filter_->MayContain(num_keys, bloom_keys.data(), may_match.data()); |
1520 | 0 | for (int i = 0; i < num_keys; ++i) { |
1521 | 0 | if (!may_match[i]) { |
1522 | 0 | temp_range.SkipIndex(range_indexes[i]); |
1523 | 0 | PERF_COUNTER_ADD(bloom_memtable_miss_count, 1); |
1524 | 0 | } else { |
1525 | 0 | PERF_COUNTER_ADD(bloom_memtable_hit_count, 1); |
1526 | 0 | } |
1527 | 0 | } |
1528 | 0 | } |
1529 | 0 | for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) { |
1530 | 0 | bool found_final_value{false}; |
1531 | 0 | bool merge_in_progress = iter->s->IsMergeInProgress(); |
1532 | 0 | if (!no_range_del) { |
1533 | 0 | std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter( |
1534 | 0 | NewRangeTombstoneIteratorInternal( |
1535 | 0 | read_options, GetInternalKeySeqno(iter->lkey->internal_key()), |
1536 | 0 | immutable_memtable)); |
1537 | 0 | SequenceNumber covering_seq = |
1538 | 0 | range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key()); |
1539 | 0 | if (covering_seq > iter->max_covering_tombstone_seq) { |
1540 | 0 | iter->max_covering_tombstone_seq = covering_seq; |
1541 | 0 | if (iter->timestamp) { |
1542 | | // Will be overwritten in SaveValue() if there is a point key with |
1543 | | // a higher seqno. |
1544 | 0 | iter->timestamp->assign(range_del_iter->timestamp().data(), |
1545 | 0 | range_del_iter->timestamp().size()); |
1546 | 0 | } |
1547 | 0 | } |
1548 | 0 | } |
1549 | 0 | SequenceNumber dummy_seq; |
1550 | 0 | GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true, |
1551 | 0 | callback, &iter->is_blob_index, |
1552 | 0 | iter->value ? iter->value->GetSelf() : nullptr, iter->columns, |
1553 | 0 | iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq, |
1554 | 0 | &found_final_value, &merge_in_progress); |
1555 | |
|
1556 | 0 | if (!found_final_value && merge_in_progress) { |
1557 | 0 | if (iter->s->ok()) { |
1558 | 0 | *(iter->s) = Status::MergeInProgress(); |
1559 | 0 | } else { |
1560 | 0 | assert(iter->s->IsMergeInProgress()); |
1561 | 0 | } |
1562 | 0 | } |
1563 | |
|
1564 | 0 | if (found_final_value || |
1565 | 0 | (!iter->s->ok() && !iter->s->IsMergeInProgress())) { |
1566 | | // `found_final_value` should be set if an error/corruption occurs. |
1567 | | // The check on iter->s is just there in case GetFromTable() did not |
1568 | | // set `found_final_value` properly. |
1569 | 0 | assert(found_final_value); |
1570 | 0 | if (iter->value) { |
1571 | 0 | iter->value->PinSelf(); |
1572 | 0 | range->AddValueSize(iter->value->size()); |
1573 | 0 | } else { |
1574 | 0 | assert(iter->columns); |
1575 | 0 | range->AddValueSize(iter->columns->serialized_size()); |
1576 | 0 | } |
1577 | |
|
1578 | 0 | range->MarkKeyDone(iter); |
1579 | 0 | RecordTick(moptions_.statistics, MEMTABLE_HIT); |
1580 | 0 | if (range->GetValueSize() > read_options.value_size_soft_limit) { |
1581 | | // Set all remaining keys in range to Abort |
1582 | 0 | for (auto range_iter = range->begin(); range_iter != range->end(); |
1583 | 0 | ++range_iter) { |
1584 | 0 | range->MarkKeyDone(range_iter); |
1585 | 0 | *(range_iter->s) = Status::Aborted(); |
1586 | 0 | } |
1587 | 0 | break; |
1588 | 0 | } |
1589 | 0 | } |
1590 | 0 | } |
1591 | 0 | PERF_COUNTER_ADD(get_from_memtable_count, 1); |
1592 | 0 | } |
1593 | | |
1594 | | Status MemTable::Update(SequenceNumber seq, ValueType value_type, |
1595 | | const Slice& key, const Slice& value, |
1596 | 0 | const ProtectionInfoKVOS64* kv_prot_info) { |
1597 | 0 | LookupKey lkey(key, seq); |
1598 | 0 | Slice mem_key = lkey.memtable_key(); |
1599 | |
|
1600 | 0 | std::unique_ptr<MemTableRep::Iterator> iter( |
1601 | 0 | table_->GetDynamicPrefixIterator()); |
1602 | 0 | iter->Seek(lkey.internal_key(), mem_key.data()); |
1603 | |
|
1604 | 0 | if (iter->Valid()) { |
1605 | | // Refer to comments under MemTable::Add() for entry format. |
1606 | | // Check that it belongs to same user key. We do not check the |
1607 | | // sequence number since the Seek() call above should have skipped |
1608 | | // all entries with overly large sequence numbers. |
1609 | 0 | const char* entry = iter->key(); |
1610 | 0 | uint32_t key_length = 0; |
1611 | 0 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
1612 | 0 | if (comparator_.comparator.user_comparator()->Equal( |
1613 | 0 | Slice(key_ptr, key_length - 8), lkey.user_key())) { |
1614 | | // Correct user key |
1615 | 0 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
1616 | 0 | ValueType type; |
1617 | 0 | SequenceNumber existing_seq; |
1618 | 0 | UnPackSequenceAndType(tag, &existing_seq, &type); |
1619 | 0 | assert(existing_seq != seq); |
1620 | 0 | if (type == value_type) { |
1621 | 0 | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); |
1622 | 0 | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); |
1623 | 0 | uint32_t new_size = static_cast<uint32_t>(value.size()); |
1624 | | |
1625 | | // Update value, if new value size <= previous value size |
1626 | 0 | if (new_size <= prev_size) { |
1627 | 0 | WriteLock wl(GetLock(lkey.user_key())); |
1628 | 0 | char* p = |
1629 | 0 | EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size); |
1630 | 0 | memcpy(p, value.data(), value.size()); |
1631 | 0 | assert((unsigned)((p + value.size()) - entry) == |
1632 | 0 | (unsigned)(VarintLength(key_length) + key_length + |
1633 | 0 | VarintLength(value.size()) + value.size())); |
1634 | 0 | RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); |
1635 | 0 | if (kv_prot_info != nullptr) { |
1636 | 0 | ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); |
1637 | | // `seq` is swallowed and `existing_seq` prevails. |
1638 | 0 | updated_kv_prot_info.UpdateS(seq, existing_seq); |
1639 | 0 | UpdateEntryChecksum(&updated_kv_prot_info, key, value, type, |
1640 | 0 | existing_seq, p + value.size()); |
1641 | 0 | Slice encoded(entry, p + value.size() - entry); |
1642 | 0 | return VerifyEncodedEntry(encoded, updated_kv_prot_info); |
1643 | 0 | } else { |
1644 | 0 | UpdateEntryChecksum(nullptr, key, value, type, existing_seq, |
1645 | 0 | p + value.size()); |
1646 | 0 | } |
1647 | 0 | return Status::OK(); |
1648 | 0 | } |
1649 | 0 | } |
1650 | 0 | } |
1651 | 0 | } |
1652 | | |
1653 | | // The latest value is not value_type or key doesn't exist |
1654 | 0 | return Add(seq, value_type, key, value, kv_prot_info); |
1655 | 0 | } |
1656 | | |
1657 | | Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key, |
1658 | | const Slice& delta, |
1659 | 0 | const ProtectionInfoKVOS64* kv_prot_info) { |
1660 | 0 | LookupKey lkey(key, seq); |
1661 | 0 | Slice memkey = lkey.memtable_key(); |
1662 | |
|
1663 | 0 | std::unique_ptr<MemTableRep::Iterator> iter( |
1664 | 0 | table_->GetDynamicPrefixIterator()); |
1665 | 0 | iter->Seek(lkey.internal_key(), memkey.data()); |
1666 | |
|
1667 | 0 | if (iter->Valid()) { |
1668 | | // Refer to comments under MemTable::Add() for entry format. |
1669 | | // Check that it belongs to same user key. We do not check the |
1670 | | // sequence number since the Seek() call above should have skipped |
1671 | | // all entries with overly large sequence numbers. |
1672 | 0 | const char* entry = iter->key(); |
1673 | 0 | uint32_t key_length = 0; |
1674 | 0 | const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
1675 | 0 | if (comparator_.comparator.user_comparator()->Equal( |
1676 | 0 | Slice(key_ptr, key_length - 8), lkey.user_key())) { |
1677 | | // Correct user key |
1678 | 0 | const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); |
1679 | 0 | ValueType type; |
1680 | 0 | uint64_t existing_seq; |
1681 | 0 | UnPackSequenceAndType(tag, &existing_seq, &type); |
1682 | 0 | if (type == kTypeValue) { |
1683 | 0 | Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); |
1684 | 0 | uint32_t prev_size = static_cast<uint32_t>(prev_value.size()); |
1685 | |
|
1686 | 0 | char* prev_buffer = const_cast<char*>(prev_value.data()); |
1687 | 0 | uint32_t new_prev_size = prev_size; |
1688 | |
|
1689 | 0 | std::string str_value; |
1690 | 0 | WriteLock wl(GetLock(lkey.user_key())); |
1691 | 0 | auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size, |
1692 | 0 | delta, &str_value); |
1693 | 0 | if (status == UpdateStatus::UPDATED_INPLACE) { |
1694 | | // Value already updated by callback. |
1695 | 0 | assert(new_prev_size <= prev_size); |
1696 | 0 | if (new_prev_size < prev_size) { |
1697 | | // overwrite the new prev_size |
1698 | 0 | char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, |
1699 | 0 | new_prev_size); |
1700 | 0 | if (VarintLength(new_prev_size) < VarintLength(prev_size)) { |
1701 | | // shift the value buffer as well. |
1702 | 0 | memcpy(p, prev_buffer, new_prev_size); |
1703 | 0 | prev_buffer = p; |
1704 | 0 | } |
1705 | 0 | } |
1706 | 0 | RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED); |
1707 | 0 | UpdateFlushState(); |
1708 | 0 | Slice new_value(prev_buffer, new_prev_size); |
1709 | 0 | if (kv_prot_info != nullptr) { |
1710 | 0 | ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); |
1711 | | // `seq` is swallowed and `existing_seq` prevails. |
1712 | 0 | updated_kv_prot_info.UpdateS(seq, existing_seq); |
1713 | 0 | updated_kv_prot_info.UpdateV(delta, new_value); |
1714 | 0 | Slice encoded(entry, prev_buffer + new_prev_size - entry); |
1715 | 0 | UpdateEntryChecksum(&updated_kv_prot_info, key, new_value, type, |
1716 | 0 | existing_seq, prev_buffer + new_prev_size); |
1717 | 0 | return VerifyEncodedEntry(encoded, updated_kv_prot_info); |
1718 | 0 | } else { |
1719 | 0 | UpdateEntryChecksum(nullptr, key, new_value, type, existing_seq, |
1720 | 0 | prev_buffer + new_prev_size); |
1721 | 0 | } |
1722 | 0 | return Status::OK(); |
1723 | 0 | } else if (status == UpdateStatus::UPDATED) { |
1724 | 0 | Status s; |
1725 | 0 | if (kv_prot_info != nullptr) { |
1726 | 0 | ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info); |
1727 | 0 | updated_kv_prot_info.UpdateV(delta, str_value); |
1728 | 0 | s = Add(seq, kTypeValue, key, Slice(str_value), |
1729 | 0 | &updated_kv_prot_info); |
1730 | 0 | } else { |
1731 | 0 | s = Add(seq, kTypeValue, key, Slice(str_value), |
1732 | 0 | nullptr /* kv_prot_info */); |
1733 | 0 | } |
1734 | 0 | RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN); |
1735 | 0 | UpdateFlushState(); |
1736 | 0 | return s; |
1737 | 0 | } else if (status == UpdateStatus::UPDATE_FAILED) { |
1738 | | // `UPDATE_FAILED` is named incorrectly. It indicates no update |
1739 | | // happened. It does not indicate a failure happened. |
1740 | 0 | UpdateFlushState(); |
1741 | 0 | return Status::OK(); |
1742 | 0 | } |
1743 | 0 | } |
1744 | 0 | } |
1745 | 0 | } |
1746 | | // The latest value is not `kTypeValue` or key doesn't exist |
1747 | 0 | return Status::NotFound(); |
1748 | 0 | } |
1749 | | |
1750 | | size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key, |
1751 | 0 | size_t limit) { |
1752 | 0 | Slice memkey = key.memtable_key(); |
1753 | | |
1754 | | // A total ordered iterator is costly for some memtablerep (prefix aware |
1755 | | // reps). By passing in the user key, we allow efficient iterator creation. |
1756 | | // The iterator only needs to be ordered within the same user key. |
1757 | 0 | std::unique_ptr<MemTableRep::Iterator> iter( |
1758 | 0 | table_->GetDynamicPrefixIterator()); |
1759 | 0 | iter->Seek(key.internal_key(), memkey.data()); |
1760 | |
|
1761 | 0 | size_t num_successive_merges = 0; |
1762 | |
|
1763 | 0 | for (; iter->Valid() && num_successive_merges < limit; iter->Next()) { |
1764 | 0 | const char* entry = iter->key(); |
1765 | 0 | uint32_t key_length = 0; |
1766 | 0 | const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); |
1767 | 0 | if (!comparator_.comparator.user_comparator()->Equal( |
1768 | 0 | Slice(iter_key_ptr, key_length - 8), key.user_key())) { |
1769 | 0 | break; |
1770 | 0 | } |
1771 | | |
1772 | 0 | const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8); |
1773 | 0 | ValueType type; |
1774 | 0 | uint64_t unused; |
1775 | 0 | UnPackSequenceAndType(tag, &unused, &type); |
1776 | 0 | if (type != kTypeMerge) { |
1777 | 0 | break; |
1778 | 0 | } |
1779 | | |
1780 | 0 | ++num_successive_merges; |
1781 | 0 | } |
1782 | |
|
1783 | 0 | return num_successive_merges; |
1784 | 0 | } |
1785 | | |
1786 | | void MemTableRep::Get(const LookupKey& k, void* callback_args, |
1787 | 0 | bool (*callback_func)(void* arg, const char* entry)) { |
1788 | 0 | auto iter = GetDynamicPrefixIterator(); |
1789 | 0 | for (iter->Seek(k.internal_key(), k.memtable_key().data()); |
1790 | 0 | iter->Valid() && callback_func(callback_args, iter->key()); |
1791 | 0 | iter->Next()) { |
1792 | 0 | } |
1793 | 0 | } |
1794 | | |
1795 | 0 | void MemTable::RefLogContainingPrepSection(uint64_t log) { |
1796 | 0 | assert(log > 0); |
1797 | 0 | auto cur = min_prep_log_referenced_.load(); |
1798 | 0 | while ((log < cur || cur == 0) && |
1799 | 0 | !min_prep_log_referenced_.compare_exchange_strong(cur, log)) { |
1800 | 0 | cur = min_prep_log_referenced_.load(); |
1801 | 0 | } |
1802 | 0 | } |
1803 | | |
1804 | 0 | uint64_t MemTable::GetMinLogContainingPrepSection() { |
1805 | 0 | return min_prep_log_referenced_.load(); |
1806 | 0 | } |
1807 | | |
1808 | 1.89M | void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) { |
1809 | 1.89M | if (ts_sz_ == 0) { |
1810 | 1.89M | return; |
1811 | 1.89M | } |
1812 | 0 | const Comparator* ucmp = GetInternalKeyComparator().user_comparator(); |
1813 | 0 | Slice udt = ExtractTimestampFromUserKey(user_key, ts_sz_); |
1814 | 0 | if (newest_udt_.empty() || ucmp->CompareTimestamp(udt, newest_udt_) > 0) { |
1815 | 0 | newest_udt_ = udt; |
1816 | 0 | } |
1817 | 0 | } |
1818 | | |
1819 | 0 | const Slice& MemTable::GetNewestUDT() const { |
1820 | 0 | assert(ts_sz_ > 0); |
1821 | 0 | return newest_udt_; |
1822 | 0 | } |
1823 | | |
1824 | | } // namespace ROCKSDB_NAMESPACE |