Coverage Report

Created: 2025-07-23 07:17

/src/rocksdb/db/memtable.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/memtable.h"
11
12
#include <algorithm>
13
#include <array>
14
#include <limits>
15
#include <memory>
16
#include <optional>
17
18
#include "db/dbformat.h"
19
#include "db/kv_checksum.h"
20
#include "db/merge_context.h"
21
#include "db/merge_helper.h"
22
#include "db/pinned_iterators_manager.h"
23
#include "db/range_tombstone_fragmenter.h"
24
#include "db/read_callback.h"
25
#include "db/wide/wide_column_serialization.h"
26
#include "logging/logging.h"
27
#include "memory/arena.h"
28
#include "memory/memory_usage.h"
29
#include "monitoring/perf_context_imp.h"
30
#include "monitoring/statistics_impl.h"
31
#include "port/lang.h"
32
#include "port/port.h"
33
#include "rocksdb/comparator.h"
34
#include "rocksdb/env.h"
35
#include "rocksdb/iterator.h"
36
#include "rocksdb/merge_operator.h"
37
#include "rocksdb/slice_transform.h"
38
#include "rocksdb/types.h"
39
#include "rocksdb/write_buffer_manager.h"
40
#include "table/internal_iterator.h"
41
#include "table/iterator_wrapper.h"
42
#include "table/merging_iterator.h"
43
#include "util/autovector.h"
44
#include "util/coding.h"
45
#include "util/mutexlock.h"
46
47
namespace ROCKSDB_NAMESPACE {
48
49
ImmutableMemTableOptions::ImmutableMemTableOptions(
50
    const ImmutableOptions& ioptions,
51
    const MutableCFOptions& mutable_cf_options)
52
141k
    : arena_block_size(mutable_cf_options.arena_block_size),
53
      memtable_prefix_bloom_bits(
54
141k
          static_cast<uint32_t>(
55
141k
              static_cast<double>(mutable_cf_options.write_buffer_size) *
56
141k
              mutable_cf_options.memtable_prefix_bloom_size_ratio) *
57
141k
          8u),
58
141k
      memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
59
      memtable_whole_key_filtering(
60
141k
          mutable_cf_options.memtable_whole_key_filtering),
61
141k
      inplace_update_support(ioptions.inplace_update_support),
62
141k
      inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
63
141k
      inplace_callback(ioptions.inplace_callback),
64
141k
      max_successive_merges(mutable_cf_options.max_successive_merges),
65
      strict_max_successive_merges(
66
141k
          mutable_cf_options.strict_max_successive_merges),
67
141k
      statistics(ioptions.stats),
68
141k
      merge_operator(ioptions.merge_operator.get()),
69
141k
      info_log(ioptions.logger),
70
      protection_bytes_per_key(
71
141k
          mutable_cf_options.memtable_protection_bytes_per_key),
72
141k
      allow_data_in_errors(ioptions.allow_data_in_errors),
73
141k
      paranoid_memory_checks(mutable_cf_options.paranoid_memory_checks) {}
74
75
MemTable::MemTable(const InternalKeyComparator& cmp,
76
                   const ImmutableOptions& ioptions,
77
                   const MutableCFOptions& mutable_cf_options,
78
                   WriteBufferManager* write_buffer_manager,
79
                   SequenceNumber latest_seq, uint32_t column_family_id)
80
141k
    : comparator_(cmp),
81
141k
      moptions_(ioptions, mutable_cf_options),
82
141k
      kArenaBlockSize(Arena::OptimizeBlockSize(moptions_.arena_block_size)),
83
141k
      mem_tracker_(write_buffer_manager),
84
141k
      arena_(moptions_.arena_block_size,
85
141k
             (write_buffer_manager != nullptr &&
86
141k
              (write_buffer_manager->enabled() ||
87
141k
               write_buffer_manager->cost_to_cache()))
88
141k
                 ? &mem_tracker_
89
141k
                 : nullptr,
90
141k
             mutable_cf_options.memtable_huge_page_size),
91
141k
      table_(ioptions.memtable_factory->CreateMemTableRep(
92
141k
          comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
93
141k
          ioptions.logger, column_family_id)),
94
141k
      range_del_table_(SkipListFactory().CreateMemTableRep(
95
141k
          comparator_, &arena_, nullptr /* transform */, ioptions.logger,
96
141k
          column_family_id)),
97
141k
      is_range_del_table_empty_(true),
98
141k
      data_size_(0),
99
141k
      num_entries_(0),
100
141k
      num_deletes_(0),
101
141k
      num_range_deletes_(0),
102
141k
      write_buffer_size_(mutable_cf_options.write_buffer_size),
103
141k
      first_seqno_(0),
104
141k
      earliest_seqno_(latest_seq),
105
141k
      creation_seq_(latest_seq),
106
141k
      min_prep_log_referenced_(0),
107
141k
      locks_(moptions_.inplace_update_support
108
141k
                 ? moptions_.inplace_update_num_locks
109
141k
                 : 0),
110
141k
      prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
111
141k
      flush_state_(FLUSH_NOT_REQUESTED),
112
141k
      clock_(ioptions.clock),
113
      insert_with_hint_prefix_extractor_(
114
141k
          ioptions.memtable_insert_with_hint_prefix_extractor.get()),
115
141k
      oldest_key_time_(std::numeric_limits<uint64_t>::max()),
116
141k
      approximate_memory_usage_(0),
117
      memtable_max_range_deletions_(
118
141k
          mutable_cf_options.memtable_max_range_deletions) {
119
141k
  UpdateFlushState();
120
  // something went wrong if we need to flush before inserting anything
121
141k
  assert(!ShouldScheduleFlush());
122
123
  // use bloom_filter_ for both whole key and prefix bloom filter
124
141k
  if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
125
141k
      moptions_.memtable_prefix_bloom_bits > 0) {
126
0
    bloom_filter_.reset(
127
0
        new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
128
0
                         6 /* hard coded 6 probes */,
129
0
                         moptions_.memtable_huge_page_size, ioptions.logger));
130
0
  }
131
  // Initialize cached_range_tombstone_ here since it could
132
  // be read before it is constructed in MemTable::Add(), which could also lead
133
  // to a data race on the global mutex table backing atomic shared_ptr.
134
141k
  auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
135
141k
  size_t size = cached_range_tombstone_.Size();
136
4.65M
  for (size_t i = 0; i < size; ++i) {
137
4.51M
    std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
138
4.51M
        cached_range_tombstone_.AccessAtCore(i);
139
4.51M
    auto new_local_cache_ref = std::make_shared<
140
4.51M
        const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
141
4.51M
    std::atomic_store_explicit(
142
4.51M
        local_cache_ref_ptr,
143
4.51M
        std::shared_ptr<FragmentedRangeTombstoneListCache>(new_local_cache_ref,
144
4.51M
                                                           new_cache.get()),
145
4.51M
        std::memory_order_relaxed);
146
4.51M
  }
147
141k
  const Comparator* ucmp = cmp.user_comparator();
148
141k
  assert(ucmp);
149
141k
  ts_sz_ = ucmp->timestamp_size();
150
141k
}
151
152
141k
MemTable::~MemTable() {
153
141k
  mem_tracker_.FreeMem();
154
141k
  assert(refs_ == 0);
155
141k
}
156
157
8.95k
size_t MemTable::ApproximateMemoryUsage() {
158
8.95k
  autovector<size_t> usages = {
159
8.95k
      arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
160
8.95k
      range_del_table_->ApproximateMemoryUsage(),
161
8.95k
      ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)};
162
8.95k
  size_t total_usage = 0;
163
35.8k
  for (size_t usage : usages) {
164
    // If usage + total_usage >= kMaxSizet, return kMaxSizet.
165
    // the following variation is to avoid numeric overflow.
166
35.8k
    if (usage >= std::numeric_limits<size_t>::max() - total_usage) {
167
0
      return std::numeric_limits<size_t>::max();
168
0
    }
169
35.8k
    total_usage += usage;
170
35.8k
  }
171
8.95k
  approximate_memory_usage_.store(total_usage, std::memory_order_relaxed);
172
  // otherwise, return the actual usage
173
8.95k
  return total_usage;
174
8.95k
}
175
176
2.03M
bool MemTable::ShouldFlushNow() {
177
2.03M
  if (IsMarkedForFlush()) {
178
    // TODO: dedicated flush reason when marked for flush
179
0
    return true;
180
0
  }
181
182
  // This is set if memtable_max_range_deletions is > 0,
183
  // and that many range deletions are done
184
2.03M
  if (memtable_max_range_deletions_ > 0 &&
185
2.03M
      num_range_deletes_.load(std::memory_order_relaxed) >=
186
0
          static_cast<uint64_t>(memtable_max_range_deletions_)) {
187
0
    return true;
188
0
  }
189
190
2.03M
  size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
191
  // In a lot of times, we cannot allocate arena blocks that exactly matches the
192
  // buffer size. Thus we have to decide if we should over-allocate or
193
  // under-allocate.
194
  // This constant variable can be interpreted as: if we still have more than
195
  // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
196
  // allocate one more block.
197
2.03M
  const double kAllowOverAllocationRatio = 0.6;
198
199
  // range deletion use skip list which allocates all memeory through `arena_`
200
2.03M
  assert(range_del_table_->ApproximateMemoryUsage() == 0);
201
  // If arena still have room for new block allocation, we can safely say it
202
  // shouldn't flush.
203
2.03M
  auto allocated_memory = table_->ApproximateMemoryUsage() +
204
2.03M
                          arena_.MemoryAllocatedBytes();
205
206
2.03M
  approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed);
207
208
  // if we can still allocate one more block without exceeding the
209
  // over-allocation ratio, then we should not flush.
210
2.03M
  if (allocated_memory + kArenaBlockSize <
211
2.03M
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
212
2.03M
    return false;
213
2.03M
  }
214
215
  // if user keeps adding entries that exceeds write_buffer_size, we need to
216
  // flush earlier even though we still have much available memory left.
217
0
  if (allocated_memory >
218
0
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
219
0
    return true;
220
0
  }
221
222
  // In this code path, Arena has already allocated its "last block", which
223
  // means the total allocatedmemory size is either:
224
  //  (1) "moderately" over allocated the memory (no more than `0.6 * arena
225
  // block size`. Or,
226
  //  (2) the allocated memory is less than write buffer size, but we'll stop
227
  // here since if we allocate a new arena block, we'll over allocate too much
228
  // more (half of the arena block size) memory.
229
  //
230
  // In either case, to avoid over-allocate, the last block will stop allocation
231
  // when its usage reaches a certain ratio, which we carefully choose "0.75
232
  // full" as the stop condition because it addresses the following issue with
233
  // great simplicity: What if the next inserted entry's size is
234
  // bigger than AllocatedAndUnused()?
235
  //
236
  // The answer is: if the entry size is also bigger than 0.25 *
237
  // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
238
  // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
239
  // and regular block. In either case, we *overly* over-allocated.
240
  //
241
  // Therefore, setting the last block to be at most "0.75 full" avoids both
242
  // cases.
243
  //
244
  // NOTE: the average percentage of waste space of this approach can be counted
245
  // as: "arena block size * 0.25 / write buffer size". User who specify a small
246
  // write buffer size and/or big arena block size may suffer.
247
0
  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
248
0
}
249
250
2.03M
void MemTable::UpdateFlushState() {
251
2.03M
  auto state = flush_state_.load(std::memory_order_relaxed);
252
2.03M
  if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
253
    // ignore CAS failure, because that means somebody else requested
254
    // a flush
255
0
    flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
256
0
                                         std::memory_order_relaxed,
257
0
                                         std::memory_order_relaxed);
258
0
  }
259
2.03M
}
260
261
1.89M
void MemTable::UpdateOldestKeyTime() {
262
1.89M
  uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
263
1.89M
  if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
264
65.2k
    int64_t current_time = 0;
265
65.2k
    auto s = clock_->GetCurrentTime(&current_time);
266
65.2k
    if (s.ok()) {
267
65.2k
      assert(current_time >= 0);
268
      // If fail, the timestamp is already set.
269
65.2k
      oldest_key_time_.compare_exchange_strong(
270
65.2k
          oldest_key_time, static_cast<uint64_t>(current_time),
271
65.2k
          std::memory_order_relaxed, std::memory_order_relaxed);
272
65.2k
    }
273
65.2k
  }
274
1.89M
}
275
276
Status MemTable::VerifyEntryChecksum(const char* entry,
277
                                     uint32_t protection_bytes_per_key,
278
0
                                     bool allow_data_in_errors) {
279
0
  if (protection_bytes_per_key == 0) {
280
0
    return Status::OK();
281
0
  }
282
0
  uint32_t key_length;
283
0
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
284
0
  if (key_ptr == nullptr) {
285
0
    return Status::Corruption("Unable to parse internal key length");
286
0
  }
287
0
  if (key_length < 8) {
288
0
    return Status::Corruption("Memtable entry internal key length too short.");
289
0
  }
290
0
  Slice user_key = Slice(key_ptr, key_length - 8);
291
292
0
  const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
293
0
  ValueType type;
294
0
  SequenceNumber seq;
295
0
  UnPackSequenceAndType(tag, &seq, &type);
296
297
0
  uint32_t value_length = 0;
298
0
  const char* value_ptr = GetVarint32Ptr(
299
0
      key_ptr + key_length, key_ptr + key_length + 5, &value_length);
300
0
  if (value_ptr == nullptr) {
301
0
    return Status::Corruption("Unable to parse internal key value");
302
0
  }
303
0
  Slice value = Slice(value_ptr, value_length);
304
305
0
  const char* checksum_ptr = value_ptr + value_length;
306
0
  bool match =
307
0
      ProtectionInfo64()
308
0
          .ProtectKVO(user_key, value, type)
309
0
          .ProtectS(seq)
310
0
          .Verify(static_cast<uint8_t>(protection_bytes_per_key), checksum_ptr);
311
0
  if (!match) {
312
0
    std::string msg(
313
0
        "Corrupted memtable entry, per key-value checksum verification "
314
0
        "failed.");
315
0
    if (allow_data_in_errors) {
316
0
      msg.append("Unrecognized value type: " +
317
0
                 std::to_string(static_cast<int>(type)) + ". ");
318
0
      msg.append("User key: " + user_key.ToString(/*hex=*/true) + ". ");
319
0
      msg.append("seq: " + std::to_string(seq) + ".");
320
0
    }
321
0
    return Status::Corruption(msg.c_str());
322
0
  }
323
0
  return Status::OK();
324
0
}
325
326
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
327
0
                                        const char* prefix_len_key2) const {
328
  // Internal keys are encoded as length-prefixed strings.
329
0
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
330
0
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
331
0
  return comparator.CompareKeySeq(k1, k2);
332
0
}
333
334
int MemTable::KeyComparator::operator()(
335
18.1M
    const char* prefix_len_key, const KeyComparator::DecodedType& key) const {
336
  // Internal keys are encoded as length-prefixed strings.
337
18.1M
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
338
18.1M
  return comparator.CompareKeySeq(a, key);
339
18.1M
}
340
341
0
void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) {
342
0
  throw std::runtime_error("concurrent insert not supported");
343
0
}
344
345
0
Slice MemTableRep::UserKey(const char* key) const {
346
0
  Slice slice = GetLengthPrefixedSlice(key);
347
0
  return Slice(slice.data(), slice.size() - 8);
348
0
}
349
350
0
KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
351
0
  *buf = allocator_->Allocate(len);
352
0
  return static_cast<KeyHandle>(*buf);
353
0
}
354
355
// Encode a suitable internal key target for "target" and return it.
356
// Uses *scratch as scratch space, and the returned pointer will point
357
// into this scratch space.
358
10.1k
const char* EncodeKey(std::string* scratch, const Slice& target) {
359
10.1k
  scratch->clear();
360
10.1k
  PutVarint32(scratch, static_cast<uint32_t>(target.size()));
361
10.1k
  scratch->append(target.data(), target.size());
362
10.1k
  return scratch->data();
363
10.1k
}
364
365
class MemTableIterator : public InternalIterator {
366
 public:
367
  enum Kind { kPointEntries, kRangeDelEntries };
368
  MemTableIterator(
369
      Kind kind, const MemTable& mem, const ReadOptions& read_options,
370
      UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping = nullptr,
371
      Arena* arena = nullptr,
372
      const SliceTransform* cf_prefix_extractor = nullptr)
373
48.7k
      : bloom_(nullptr),
374
48.7k
        prefix_extractor_(mem.prefix_extractor_),
375
48.7k
        comparator_(mem.comparator_),
376
48.7k
        seqno_to_time_mapping_(seqno_to_time_mapping),
377
48.7k
        status_(Status::OK()),
378
48.7k
        logger_(mem.moptions_.info_log),
379
48.7k
        ts_sz_(mem.ts_sz_),
380
48.7k
        protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key),
381
48.7k
        valid_(false),
382
        value_pinned_(
383
48.7k
            !mem.GetImmutableMemTableOptions()->inplace_update_support),
384
48.7k
        arena_mode_(arena != nullptr),
385
48.7k
        paranoid_memory_checks_(mem.moptions_.paranoid_memory_checks),
386
48.7k
        allow_data_in_error(mem.moptions_.allow_data_in_errors) {
387
48.7k
    if (kind == kRangeDelEntries) {
388
3.72k
      iter_ = mem.range_del_table_->GetIterator(arena);
389
45.0k
    } else if (prefix_extractor_ != nullptr &&
390
               // NOTE: checking extractor equivalence when not pointer
391
               // equivalent is arguably too expensive for memtable
392
45.0k
               prefix_extractor_ == cf_prefix_extractor &&
393
45.0k
               (read_options.prefix_same_as_start ||
394
0
                (!read_options.total_order_seek &&
395
0
                 !read_options.auto_prefix_mode))) {
396
      // Auto prefix mode is not implemented in memtable yet.
397
0
      assert(kind == kPointEntries);
398
0
      bloom_ = mem.bloom_filter_.get();
399
0
      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
400
45.0k
    } else {
401
45.0k
      assert(kind == kPointEntries);
402
45.0k
      iter_ = mem.table_->GetIterator(arena);
403
45.0k
    }
404
48.7k
    status_.PermitUncheckedError();
405
48.7k
  }
406
  // No copying allowed
407
  MemTableIterator(const MemTableIterator&) = delete;
408
  void operator=(const MemTableIterator&) = delete;
409
410
48.7k
  ~MemTableIterator() override {
411
#ifndef NDEBUG
412
    // Assert that the MemTableIterator is never deleted while
413
    // Pinning is Enabled.
414
    assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled());
415
#endif
416
48.7k
    if (arena_mode_) {
417
45.0k
      iter_->~Iterator();
418
45.0k
    } else {
419
3.72k
      delete iter_;
420
3.72k
    }
421
48.7k
    status_.PermitUncheckedError();
422
48.7k
  }
423
424
#ifndef NDEBUG
425
  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
426
    pinned_iters_mgr_ = pinned_iters_mgr;
427
  }
428
  PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
429
#endif
430
431
1.54M
  bool Valid() const override {
432
    // If inner iter_ is not valid, then this iter should also not be valid.
433
1.54M
    assert(iter_->Valid() || !(valid_ && status_.ok()));
434
1.54M
    return valid_ && status_.ok();
435
1.54M
  }
436
437
5.46k
  void Seek(const Slice& k) override {
438
5.46k
    PERF_TIMER_GUARD(seek_on_memtable_time);
439
5.46k
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
440
5.46k
    status_ = Status::OK();
441
5.46k
    if (bloom_) {
442
      // iterator should only use prefix bloom filter
443
0
      Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
444
0
      if (prefix_extractor_->InDomain(user_k_without_ts)) {
445
0
        Slice prefix = prefix_extractor_->Transform(user_k_without_ts);
446
0
        if (!bloom_->MayContain(prefix)) {
447
0
          PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
448
0
          valid_ = false;
449
0
          return;
450
0
        } else {
451
0
          PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
452
0
        }
453
0
      }
454
0
    }
455
5.46k
    if (paranoid_memory_checks_) {
456
0
      status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error);
457
5.46k
    } else {
458
5.46k
      iter_->Seek(k, nullptr);
459
5.46k
    }
460
5.46k
    valid_ = iter_->Valid();
461
5.46k
    VerifyEntryChecksum();
462
5.46k
  }
463
4.66k
  void SeekForPrev(const Slice& k) override {
464
4.66k
    PERF_TIMER_GUARD(seek_on_memtable_time);
465
4.66k
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
466
4.66k
    status_ = Status::OK();
467
4.66k
    if (bloom_) {
468
0
      Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
469
0
      if (prefix_extractor_->InDomain(user_k_without_ts)) {
470
0
        if (!bloom_->MayContain(
471
0
                prefix_extractor_->Transform(user_k_without_ts))) {
472
0
          PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
473
0
          valid_ = false;
474
0
          return;
475
0
        } else {
476
0
          PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
477
0
        }
478
0
      }
479
0
    }
480
4.66k
    if (paranoid_memory_checks_) {
481
0
      status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error);
482
4.66k
    } else {
483
4.66k
      iter_->Seek(k, nullptr);
484
4.66k
    }
485
4.66k
    valid_ = iter_->Valid();
486
4.66k
    VerifyEntryChecksum();
487
4.66k
    if (!Valid() && status().ok()) {
488
4.14k
      SeekToLast();
489
4.14k
    }
490
5.18k
    while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
491
523
      Prev();
492
523
    }
493
4.66k
  }
494
39.1k
  void SeekToFirst() override {
495
39.1k
    status_ = Status::OK();
496
39.1k
    iter_->SeekToFirst();
497
39.1k
    valid_ = iter_->Valid();
498
39.1k
    VerifyEntryChecksum();
499
39.1k
  }
500
4.14k
  void SeekToLast() override {
501
4.14k
    status_ = Status::OK();
502
4.14k
    iter_->SeekToLast();
503
4.14k
    valid_ = iter_->Valid();
504
4.14k
    VerifyEntryChecksum();
505
4.14k
  }
506
1.35M
  void Next() override {
507
1.35M
    PERF_COUNTER_ADD(next_on_memtable_count, 1);
508
1.35M
    assert(Valid());
509
1.35M
    if (paranoid_memory_checks_) {
510
0
      status_ = iter_->NextAndValidate(allow_data_in_error);
511
1.35M
    } else {
512
1.35M
      iter_->Next();
513
1.35M
      TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_);
514
1.35M
    }
515
1.35M
    valid_ = iter_->Valid();
516
1.35M
    VerifyEntryChecksum();
517
1.35M
  }
518
6.17k
  bool NextAndGetResult(IterateResult* result) override {
519
6.17k
    Next();
520
6.17k
    bool is_valid = Valid();
521
6.17k
    if (is_valid) {
522
4.62k
      result->key = key();
523
4.62k
      result->bound_check_result = IterBoundCheck::kUnknown;
524
4.62k
      result->value_prepared = true;
525
4.62k
    }
526
6.17k
    return is_valid;
527
6.17k
  }
528
7.28k
  void Prev() override {
529
7.28k
    PERF_COUNTER_ADD(prev_on_memtable_count, 1);
530
7.28k
    assert(Valid());
531
7.28k
    if (paranoid_memory_checks_) {
532
0
      status_ = iter_->PrevAndValidate(allow_data_in_error);
533
7.28k
    } else {
534
7.28k
      iter_->Prev();
535
7.28k
    }
536
7.28k
    valid_ = iter_->Valid();
537
7.28k
    VerifyEntryChecksum();
538
7.28k
  }
539
2.20M
  Slice key() const override {
540
2.20M
    assert(Valid());
541
2.20M
    return GetLengthPrefixedSlice(iter_->key());
542
2.20M
  }
543
544
5.57k
  uint64_t write_unix_time() const override {
545
5.57k
    assert(Valid());
546
5.57k
    ParsedInternalKey pikey;
547
5.57k
    Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
548
5.57k
    if (!s.ok()) {
549
0
      return std::numeric_limits<uint64_t>::max();
550
5.57k
    } else if (kTypeValuePreferredSeqno == pikey.type) {
551
0
      return ParsePackedValueForWriteTime(value());
552
5.57k
    } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) {
553
5.57k
      return std::numeric_limits<uint64_t>::max();
554
5.57k
    }
555
0
    return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence);
556
5.57k
  }
557
558
1.35M
  Slice value() const override {
559
1.35M
    assert(Valid());
560
1.35M
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
561
1.35M
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
562
1.35M
  }
563
564
76.0k
  Status status() const override { return status_; }
565
566
830k
  bool IsKeyPinned() const override {
567
    // memtable data is always pinned
568
830k
    return true;
569
830k
  }
570
571
418k
  bool IsValuePinned() const override {
572
    // memtable value is always pinned, except if we allow inplace update.
573
418k
    return value_pinned_;
574
418k
  }
575
576
 private:
577
  DynamicBloom* bloom_;
578
  const SliceTransform* const prefix_extractor_;
579
  const MemTable::KeyComparator comparator_;
580
  MemTableRep::Iterator* iter_;
581
  // The seqno to time mapping is owned by the SuperVersion.
582
  UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
583
  Status status_;
584
  Logger* logger_;
585
  size_t ts_sz_;
586
  uint32_t protection_bytes_per_key_;
587
  bool valid_;
588
  bool value_pinned_;
589
  bool arena_mode_;
590
  const bool paranoid_memory_checks_;
591
  const bool allow_data_in_error;
592
593
1.41M
  void VerifyEntryChecksum() {
594
1.41M
    if (protection_bytes_per_key_ > 0 && Valid()) {
595
0
      status_ = MemTable::VerifyEntryChecksum(iter_->key(),
596
0
                                              protection_bytes_per_key_);
597
0
      if (!status_.ok()) {
598
0
        ROCKS_LOG_ERROR(logger_, "In MemtableIterator: %s", status_.getState());
599
0
      }
600
0
    }
601
1.41M
  }
602
};
603
604
InternalIterator* MemTable::NewIterator(
605
    const ReadOptions& read_options,
606
    UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
607
45.0k
    const SliceTransform* prefix_extractor, bool /*for_flush*/) {
608
45.0k
  assert(arena != nullptr);
609
45.0k
  auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
610
45.0k
  return new (mem)
611
45.0k
      MemTableIterator(MemTableIterator::kPointEntries, *this, read_options,
612
45.0k
                       seqno_to_time_mapping, arena, prefix_extractor);
613
45.0k
}
614
615
// An iterator wrapper that wraps a MemTableIterator and logically strips each
616
// key's user-defined timestamp.
617
class TimestampStrippingIterator : public InternalIterator {
618
 public:
619
  TimestampStrippingIterator(
620
      MemTableIterator::Kind kind, const MemTable& memtable,
621
      const ReadOptions& read_options,
622
      UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
623
      const SliceTransform* cf_prefix_extractor, size_t ts_sz)
624
0
      : arena_mode_(arena != nullptr), kind_(kind), ts_sz_(ts_sz) {
625
0
    assert(ts_sz_ != 0);
626
0
    void* mem = arena ? arena->AllocateAligned(sizeof(MemTableIterator))
627
0
                      : operator new(sizeof(MemTableIterator));
628
0
    iter_ = new (mem)
629
0
        MemTableIterator(kind, memtable, read_options, seqno_to_time_mapping,
630
0
                         arena, cf_prefix_extractor);
631
0
  }
632
633
  // No copying allowed
634
  TimestampStrippingIterator(const TimestampStrippingIterator&) = delete;
635
  void operator=(const TimestampStrippingIterator&) = delete;
636
637
0
  ~TimestampStrippingIterator() override {
638
0
    if (arena_mode_) {
639
0
      iter_->~MemTableIterator();
640
0
    } else {
641
0
      delete iter_;
642
0
    }
643
0
  }
644
645
0
  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
646
0
    iter_->SetPinnedItersMgr(pinned_iters_mgr);
647
0
  }
648
649
0
  bool Valid() const override { return iter_->Valid(); }
650
0
  void Seek(const Slice& k) override {
651
0
    iter_->Seek(k);
652
0
    UpdateKeyAndValueBuffer();
653
0
  }
654
0
  void SeekForPrev(const Slice& k) override {
655
0
    iter_->SeekForPrev(k);
656
0
    UpdateKeyAndValueBuffer();
657
0
  }
658
0
  void SeekToFirst() override {
659
0
    iter_->SeekToFirst();
660
0
    UpdateKeyAndValueBuffer();
661
0
  }
662
0
  void SeekToLast() override {
663
0
    iter_->SeekToLast();
664
0
    UpdateKeyAndValueBuffer();
665
0
  }
666
0
  void Next() override {
667
0
    iter_->Next();
668
0
    UpdateKeyAndValueBuffer();
669
0
  }
670
0
  bool NextAndGetResult(IterateResult* result) override {
671
0
    iter_->Next();
672
0
    UpdateKeyAndValueBuffer();
673
0
    bool is_valid = Valid();
674
0
    if (is_valid) {
675
0
      result->key = key();
676
0
      result->bound_check_result = IterBoundCheck::kUnknown;
677
0
      result->value_prepared = true;
678
0
    }
679
0
    return is_valid;
680
0
  }
681
0
  void Prev() override {
682
0
    iter_->Prev();
683
0
    UpdateKeyAndValueBuffer();
684
0
  }
685
0
  Slice key() const override {
686
0
    assert(Valid());
687
0
    return key_buf_;
688
0
  }
689
690
0
  uint64_t write_unix_time() const override { return iter_->write_unix_time(); }
691
0
  Slice value() const override {
692
0
    if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
693
0
      return value_buf_;
694
0
    }
695
0
    return iter_->value();
696
0
  }
697
0
  Status status() const override { return iter_->status(); }
698
0
  bool IsKeyPinned() const override {
699
    // Key is only in a buffer that is updated in each iteration.
700
0
    return false;
701
0
  }
702
0
  bool IsValuePinned() const override {
703
0
    if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
704
0
      return false;
705
0
    }
706
0
    return iter_->IsValuePinned();
707
0
  }
708
709
 private:
710
0
  void UpdateKeyAndValueBuffer() {
711
0
    key_buf_.clear();
712
0
    if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
713
0
      value_buf_.clear();
714
0
    }
715
0
    if (!Valid()) {
716
0
      return;
717
0
    }
718
0
    Slice original_key = iter_->key();
719
0
    ReplaceInternalKeyWithMinTimestamp(&key_buf_, original_key, ts_sz_);
720
0
    if (kind_ == MemTableIterator::Kind::kRangeDelEntries) {
721
0
      Slice original_value = iter_->value();
722
0
      AppendUserKeyWithMinTimestamp(&value_buf_, original_value, ts_sz_);
723
0
    }
724
0
  }
725
  bool arena_mode_;
726
  MemTableIterator::Kind kind_;
727
  size_t ts_sz_;
728
  MemTableIterator* iter_;
729
  std::string key_buf_;
730
  std::string value_buf_;
731
};
732
733
InternalIterator* MemTable::NewTimestampStrippingIterator(
734
    const ReadOptions& read_options,
735
    UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena,
736
0
    const SliceTransform* prefix_extractor, size_t ts_sz) {
737
0
  assert(arena != nullptr);
738
0
  auto mem = arena->AllocateAligned(sizeof(TimestampStrippingIterator));
739
0
  return new (mem) TimestampStrippingIterator(
740
0
      MemTableIterator::kPointEntries, *this, read_options,
741
0
      seqno_to_time_mapping, arena, prefix_extractor, ts_sz);
742
0
}
743
744
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
745
    const ReadOptions& read_options, SequenceNumber read_seq,
746
67.9k
    bool immutable_memtable) {
747
67.9k
  if (read_options.ignore_range_deletions ||
748
67.9k
      is_range_del_table_empty_.load(std::memory_order_relaxed)) {
749
64.2k
    return nullptr;
750
64.2k
  }
751
3.72k
  return NewRangeTombstoneIteratorInternal(read_options, read_seq,
752
3.72k
                                           immutable_memtable);
753
67.9k
}
754
755
FragmentedRangeTombstoneIterator*
756
MemTable::NewTimestampStrippingRangeTombstoneIterator(
757
0
    const ReadOptions& read_options, SequenceNumber read_seq, size_t ts_sz) {
758
0
  if (read_options.ignore_range_deletions ||
759
0
      is_range_del_table_empty_.load(std::memory_order_relaxed)) {
760
0
    return nullptr;
761
0
  }
762
0
  if (!timestamp_stripping_fragmented_range_tombstone_list_) {
763
    // TODO: plumb Env::IOActivity, Env::IOPriority
764
0
    auto* unfragmented_iter = new TimestampStrippingIterator(
765
0
        MemTableIterator::kRangeDelEntries, *this, ReadOptions(),
766
0
        /*seqno_to_time_mapping*/ nullptr, /* arena */ nullptr,
767
0
        /* prefix_extractor */ nullptr, ts_sz);
768
769
0
    timestamp_stripping_fragmented_range_tombstone_list_ =
770
0
        std::make_unique<FragmentedRangeTombstoneList>(
771
0
            std::unique_ptr<InternalIterator>(unfragmented_iter),
772
0
            comparator_.comparator);
773
0
  }
774
0
  return new FragmentedRangeTombstoneIterator(
775
0
      timestamp_stripping_fragmented_range_tombstone_list_.get(),
776
0
      comparator_.comparator, read_seq, read_options.timestamp);
777
0
}
778
779
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
780
    const ReadOptions& read_options, SequenceNumber read_seq,
781
3.72k
    bool immutable_memtable) {
782
3.72k
  if (immutable_memtable) {
783
    // Note that caller should already have verified that
784
    // !is_range_del_table_empty_
785
0
    assert(IsFragmentedRangeTombstonesConstructed());
786
0
    return new FragmentedRangeTombstoneIterator(
787
0
        fragmented_range_tombstone_list_.get(), comparator_.comparator,
788
0
        read_seq, read_options.timestamp);
789
0
  }
790
791
  // takes current cache
792
3.72k
  std::shared_ptr<FragmentedRangeTombstoneListCache> cache =
793
3.72k
      std::atomic_load_explicit(cached_range_tombstone_.Access(),
794
3.72k
                                std::memory_order_relaxed);
795
  // construct fragmented tombstone list if necessary
796
3.72k
  if (!cache->initialized.load(std::memory_order_acquire)) {
797
3.72k
    cache->reader_mutex.lock();
798
3.72k
    if (!cache->tombstones) {
799
3.72k
      auto* unfragmented_iter = new MemTableIterator(
800
3.72k
          MemTableIterator::kRangeDelEntries, *this, read_options);
801
3.72k
      cache->tombstones.reset(new FragmentedRangeTombstoneList(
802
3.72k
          std::unique_ptr<InternalIterator>(unfragmented_iter),
803
3.72k
          comparator_.comparator));
804
3.72k
      cache->initialized.store(true, std::memory_order_release);
805
3.72k
    }
806
3.72k
    cache->reader_mutex.unlock();
807
3.72k
  }
808
809
3.72k
  auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
810
3.72k
      cache, comparator_.comparator, read_seq, read_options.timestamp);
811
3.72k
  return fragmented_iter;
812
3.72k
}
813
814
2.23k
void MemTable::ConstructFragmentedRangeTombstones() {
815
  // There should be no concurrent Construction.
816
  // We could also check fragmented_range_tombstone_list_ to avoid repeate
817
  // constructions. We just construct them here again to be safe.
818
2.23k
  if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) {
819
    // TODO: plumb Env::IOActivity, Env::IOPriority
820
0
    auto* unfragmented_iter = new MemTableIterator(
821
0
        MemTableIterator::kRangeDelEntries, *this, ReadOptions());
822
823
0
    fragmented_range_tombstone_list_ =
824
0
        std::make_unique<FragmentedRangeTombstoneList>(
825
0
            std::unique_ptr<InternalIterator>(unfragmented_iter),
826
0
            comparator_.comparator);
827
0
  }
828
2.23k
}
829
830
0
port::RWMutex* MemTable::GetLock(const Slice& key) {
831
0
  return &locks_[GetSliceRangedNPHash(key, locks_.size())];
832
0
}
833
834
ReadOnlyMemTable::MemTableStats MemTable::ApproximateStats(
835
0
    const Slice& start_ikey, const Slice& end_ikey) {
836
0
  uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
837
0
  entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
838
0
  if (entry_count == 0) {
839
0
    return {0, 0};
840
0
  }
841
0
  uint64_t n = num_entries_.load(std::memory_order_relaxed);
842
0
  if (n == 0) {
843
0
    return {0, 0};
844
0
  }
845
0
  if (entry_count > n) {
846
    // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
847
    // be larger than actual entries we have. Cap it to entries we have to limit
848
    // the inaccuracy.
849
0
    entry_count = n;
850
0
  }
851
0
  uint64_t data_size = data_size_.load(std::memory_order_relaxed);
852
0
  return {entry_count * (data_size / n), entry_count};
853
0
}
854
855
Status MemTable::VerifyEncodedEntry(Slice encoded,
856
931k
                                    const ProtectionInfoKVOS64& kv_prot_info) {
857
931k
  uint32_t ikey_len = 0;
858
931k
  if (!GetVarint32(&encoded, &ikey_len)) {
859
0
    return Status::Corruption("Unable to parse internal key length");
860
0
  }
861
931k
  if (ikey_len < 8 + ts_sz_) {
862
0
    return Status::Corruption("Internal key length too short");
863
0
  }
864
931k
  if (ikey_len > encoded.size()) {
865
0
    return Status::Corruption("Internal key length too long");
866
0
  }
867
931k
  uint32_t value_len = 0;
868
931k
  const size_t user_key_len = ikey_len - 8;
869
931k
  Slice key(encoded.data(), user_key_len);
870
931k
  encoded.remove_prefix(user_key_len);
871
872
931k
  uint64_t packed = DecodeFixed64(encoded.data());
873
931k
  ValueType value_type = kMaxValue;
874
931k
  SequenceNumber sequence_number = kMaxSequenceNumber;
875
931k
  UnPackSequenceAndType(packed, &sequence_number, &value_type);
876
931k
  encoded.remove_prefix(8);
877
878
931k
  if (!GetVarint32(&encoded, &value_len)) {
879
0
    return Status::Corruption("Unable to parse value length");
880
0
  }
881
931k
  if (value_len < encoded.size()) {
882
0
    return Status::Corruption("Value length too short");
883
0
  }
884
931k
  if (value_len > encoded.size()) {
885
0
    return Status::Corruption("Value length too long");
886
0
  }
887
931k
  Slice value(encoded.data(), value_len);
888
889
931k
  return kv_prot_info.StripS(sequence_number)
890
931k
      .StripKVO(key, value, value_type)
891
931k
      .GetStatus();
892
931k
}
893
894
void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
895
                                   const Slice& key, const Slice& value,
896
                                   ValueType type, SequenceNumber s,
897
1.89M
                                   char* checksum_ptr) {
898
1.89M
  if (moptions_.protection_bytes_per_key == 0) {
899
1.89M
    return;
900
1.89M
  }
901
902
0
  if (kv_prot_info == nullptr) {
903
0
    ProtectionInfo64()
904
0
        .ProtectKVO(key, value, type)
905
0
        .ProtectS(s)
906
0
        .Encode(static_cast<uint8_t>(moptions_.protection_bytes_per_key),
907
0
                checksum_ptr);
908
0
  } else {
909
0
    kv_prot_info->Encode(
910
0
        static_cast<uint8_t>(moptions_.protection_bytes_per_key), checksum_ptr);
911
0
  }
912
0
}
913
914
Status MemTable::Add(SequenceNumber s, ValueType type,
915
                     const Slice& key, /* user key */
916
                     const Slice& value,
917
                     const ProtectionInfoKVOS64* kv_prot_info,
918
                     bool allow_concurrent,
919
1.89M
                     MemTablePostProcessInfo* post_process_info, void** hint) {
920
  // Format of an entry is concatenation of:
921
  //  key_size     : varint32 of internal_key.size()
922
  //  key bytes    : char[internal_key.size()]
923
  //  value_size   : varint32 of value.size()
924
  //  value bytes  : char[value.size()]
925
  //  checksum     : char[moptions_.protection_bytes_per_key]
926
1.89M
  uint32_t key_size = static_cast<uint32_t>(key.size());
927
1.89M
  uint32_t val_size = static_cast<uint32_t>(value.size());
928
1.89M
  uint32_t internal_key_size = key_size + 8;
929
1.89M
  const uint32_t encoded_len = VarintLength(internal_key_size) +
930
1.89M
                               internal_key_size + VarintLength(val_size) +
931
1.89M
                               val_size + moptions_.protection_bytes_per_key;
932
1.89M
  char* buf = nullptr;
933
1.89M
  std::unique_ptr<MemTableRep>& table =
934
1.89M
      type == kTypeRangeDeletion ? range_del_table_ : table_;
935
1.89M
  KeyHandle handle = table->Allocate(encoded_len, &buf);
936
937
1.89M
  char* p = EncodeVarint32(buf, internal_key_size);
938
1.89M
  memcpy(p, key.data(), key_size);
939
1.89M
  Slice key_slice(p, key_size);
940
1.89M
  p += key_size;
941
1.89M
  uint64_t packed = PackSequenceAndType(s, type);
942
1.89M
  EncodeFixed64(p, packed);
943
1.89M
  p += 8;
944
1.89M
  p = EncodeVarint32(p, val_size);
945
1.89M
  memcpy(p, value.data(), val_size);
946
1.89M
  assert((unsigned)(p + val_size - buf + moptions_.protection_bytes_per_key) ==
947
1.89M
         (unsigned)encoded_len);
948
949
1.89M
  UpdateEntryChecksum(kv_prot_info, key, value, type, s,
950
1.89M
                      buf + encoded_len - moptions_.protection_bytes_per_key);
951
1.89M
  Slice encoded(buf, encoded_len - moptions_.protection_bytes_per_key);
952
1.89M
  if (kv_prot_info != nullptr) {
953
931k
    TEST_SYNC_POINT_CALLBACK("MemTable::Add:Encoded", &encoded);
954
931k
    Status status = VerifyEncodedEntry(encoded, *kv_prot_info);
955
931k
    if (!status.ok()) {
956
0
      return status;
957
0
    }
958
931k
  }
959
960
1.89M
  Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_);
961
962
1.89M
  if (!allow_concurrent) {
963
    // Extract prefix for insert with hint. Hints are for point key table
964
    // (`table_`) only, not `range_del_table_`.
965
1.89M
    if (table == table_ && insert_with_hint_prefix_extractor_ != nullptr &&
966
1.89M
        insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
967
0
      Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
968
0
      bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
969
0
      if (UNLIKELY(!res)) {
970
0
        return Status::TryAgain("key+seq exists");
971
0
      }
972
1.89M
    } else {
973
1.89M
      bool res = table->InsertKey(handle);
974
1.89M
      if (UNLIKELY(!res)) {
975
0
        return Status::TryAgain("key+seq exists");
976
0
      }
977
1.89M
    }
978
979
    // this is a bit ugly, but is the way to avoid locked instructions
980
    // when incrementing an atomic
981
1.89M
    num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
982
1.89M
                       std::memory_order_relaxed);
983
1.89M
    data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
984
1.89M
                     std::memory_order_relaxed);
985
1.89M
    if (type == kTypeDeletion || type == kTypeSingleDeletion ||
986
1.89M
        type == kTypeDeletionWithTimestamp) {
987
247k
      num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
988
247k
                         std::memory_order_relaxed);
989
1.64M
    } else if (type == kTypeRangeDeletion) {
990
829k
      uint64_t val = num_range_deletes_.load(std::memory_order_relaxed) + 1;
991
829k
      num_range_deletes_.store(val, std::memory_order_relaxed);
992
829k
    }
993
994
1.89M
    if (bloom_filter_ && prefix_extractor_ &&
995
1.89M
        prefix_extractor_->InDomain(key_without_ts)) {
996
0
      bloom_filter_->Add(prefix_extractor_->Transform(key_without_ts));
997
0
    }
998
1.89M
    if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
999
0
      bloom_filter_->Add(key_without_ts);
1000
0
    }
1001
1002
    // The first sequence number inserted into the memtable
1003
1.89M
    assert(first_seqno_ == 0 || s >= first_seqno_);
1004
1.89M
    if (first_seqno_ == 0) {
1005
65.2k
      first_seqno_.store(s, std::memory_order_relaxed);
1006
1007
65.2k
      if (earliest_seqno_ == kMaxSequenceNumber) {
1008
0
        earliest_seqno_.store(GetFirstSequenceNumber(),
1009
0
                              std::memory_order_relaxed);
1010
0
      }
1011
65.2k
      assert(first_seqno_.load() >= earliest_seqno_.load());
1012
65.2k
    }
1013
1.89M
    assert(post_process_info == nullptr);
1014
    // TODO(yuzhangyu): support updating newest UDT for when `allow_concurrent`
1015
    // is true.
1016
1.89M
    MaybeUpdateNewestUDT(key_slice);
1017
1.89M
    UpdateFlushState();
1018
1.89M
  } else {
1019
0
    bool res = (hint == nullptr)
1020
0
                   ? table->InsertKeyConcurrently(handle)
1021
0
                   : table->InsertKeyWithHintConcurrently(handle, hint);
1022
0
    if (UNLIKELY(!res)) {
1023
0
      return Status::TryAgain("key+seq exists");
1024
0
    }
1025
1026
0
    assert(post_process_info != nullptr);
1027
0
    post_process_info->num_entries++;
1028
0
    post_process_info->data_size += encoded_len;
1029
0
    if (type == kTypeDeletion) {
1030
0
      post_process_info->num_deletes++;
1031
0
    }
1032
1033
0
    if (bloom_filter_ && prefix_extractor_ &&
1034
0
        prefix_extractor_->InDomain(key_without_ts)) {
1035
0
      bloom_filter_->AddConcurrently(
1036
0
          prefix_extractor_->Transform(key_without_ts));
1037
0
    }
1038
0
    if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
1039
0
      bloom_filter_->AddConcurrently(key_without_ts);
1040
0
    }
1041
1042
    // atomically update first_seqno_ and earliest_seqno_.
1043
0
    uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
1044
0
    while ((cur_seq_num == 0 || s < cur_seq_num) &&
1045
0
           !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
1046
0
    }
1047
0
    uint64_t cur_earliest_seqno =
1048
0
        earliest_seqno_.load(std::memory_order_relaxed);
1049
0
    while (
1050
0
        (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
1051
0
        !earliest_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
1052
0
    }
1053
0
  }
1054
1.89M
  if (type == kTypeRangeDeletion) {
1055
829k
    auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
1056
829k
    size_t size = cached_range_tombstone_.Size();
1057
829k
    if (allow_concurrent) {
1058
0
      post_process_info->num_range_deletes++;
1059
0
      range_del_mutex_.lock();
1060
0
    }
1061
27.3M
    for (size_t i = 0; i < size; ++i) {
1062
26.5M
      std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
1063
26.5M
          cached_range_tombstone_.AccessAtCore(i);
1064
26.5M
      auto new_local_cache_ref = std::make_shared<
1065
26.5M
          const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
1066
      // It is okay for some reader to load old cache during invalidation as
1067
      // the new sequence number is not published yet.
1068
      // Each core will have a shared_ptr to a shared_ptr to the cached
1069
      // fragmented range tombstones, so that ref count is maintianed locally
1070
      // per-core using the per-core shared_ptr.
1071
26.5M
      std::atomic_store_explicit(
1072
26.5M
          local_cache_ref_ptr,
1073
26.5M
          std::shared_ptr<FragmentedRangeTombstoneListCache>(
1074
26.5M
              new_local_cache_ref, new_cache.get()),
1075
26.5M
          std::memory_order_relaxed);
1076
26.5M
    }
1077
1078
829k
    if (allow_concurrent) {
1079
0
      range_del_mutex_.unlock();
1080
0
    }
1081
829k
    is_range_del_table_empty_.store(false, std::memory_order_relaxed);
1082
829k
  }
1083
1.89M
  UpdateOldestKeyTime();
1084
1085
1.89M
  TEST_SYNC_POINT_CALLBACK("MemTable::Add:BeforeReturn:Encoded", &encoded);
1086
1.89M
  return Status::OK();
1087
1.89M
}
1088
1089
// Callback from MemTable::Get()
1090
namespace {
1091
1092
struct Saver {
1093
  Status* status;
1094
  const LookupKey* key;
1095
  bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
1096
  bool* merge_in_progress;
1097
  std::string* value;
1098
  PinnableWideColumns* columns;
1099
  SequenceNumber seq;
1100
  std::string* timestamp;
1101
  const MergeOperator* merge_operator;
1102
  // the merge operations encountered;
1103
  MergeContext* merge_context;
1104
  SequenceNumber max_covering_tombstone_seq;
1105
  MemTable* mem;
1106
  Logger* logger;
1107
  Statistics* statistics;
1108
  bool inplace_update_support;
1109
  bool do_merge;
1110
  SystemClock* clock;
1111
1112
  ReadCallback* callback_;
1113
  bool* is_blob_index;
1114
  bool allow_data_in_errors;
1115
  uint32_t protection_bytes_per_key;
1116
21.7k
  bool CheckCallback(SequenceNumber _seq) {
1117
21.7k
    if (callback_) {
1118
0
      return callback_->IsVisible(_seq);
1119
0
    }
1120
21.7k
    return true;
1121
21.7k
  }
1122
};
1123
}  // anonymous namespace
1124
1125
22.8k
static bool SaveValue(void* arg, const char* entry) {
1126
22.8k
  Saver* s = static_cast<Saver*>(arg);
1127
22.8k
  assert(s != nullptr);
1128
22.8k
  assert(!s->value || !s->columns);
1129
22.8k
  assert(!*(s->found_final_value));
1130
22.8k
  assert(s->status->ok() || s->status->IsMergeInProgress());
1131
1132
22.8k
  MergeContext* merge_context = s->merge_context;
1133
22.8k
  SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
1134
22.8k
  const MergeOperator* merge_operator = s->merge_operator;
1135
1136
22.8k
  assert(merge_context != nullptr);
1137
1138
  // Refer to comments under MemTable::Add() for entry format.
1139
  // Check that it belongs to same user key.
1140
22.8k
  uint32_t key_length = 0;
1141
22.8k
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1142
22.8k
  assert(key_length >= 8);
1143
22.8k
  Slice user_key_slice = Slice(key_ptr, key_length - 8);
1144
22.8k
  const Comparator* user_comparator =
1145
22.8k
      s->mem->GetInternalKeyComparator().user_comparator();
1146
22.8k
  size_t ts_sz = user_comparator->timestamp_size();
1147
22.8k
  if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) {
1148
    // timestamp should already be set to range tombstone timestamp
1149
0
    assert(s->timestamp->size() == ts_sz);
1150
0
  }
1151
22.8k
  if (user_comparator->EqualWithoutTimestamp(user_key_slice,
1152
22.8k
                                             s->key->user_key())) {
1153
    // Correct user key
1154
21.7k
    TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Found:entry", &entry);
1155
21.7k
    std::optional<ReadLock> read_lock;
1156
21.7k
    if (s->inplace_update_support) {
1157
0
      read_lock.emplace(s->mem->GetLock(s->key->user_key()));
1158
0
    }
1159
1160
21.7k
    if (s->protection_bytes_per_key > 0) {
1161
0
      *(s->status) = MemTable::VerifyEntryChecksum(
1162
0
          entry, s->protection_bytes_per_key, s->allow_data_in_errors);
1163
0
      if (!s->status->ok()) {
1164
0
        *(s->found_final_value) = true;
1165
0
        ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
1166
        // Memtable entry corrupted
1167
0
        return false;
1168
0
      }
1169
0
    }
1170
1171
21.7k
    const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1172
21.7k
    ValueType type;
1173
21.7k
    SequenceNumber seq;
1174
21.7k
    UnPackSequenceAndType(tag, &seq, &type);
1175
    // If the value is not in the snapshot, skip it
1176
21.7k
    if (!s->CheckCallback(seq)) {
1177
0
      return true;  // to continue to the next seq
1178
0
    }
1179
1180
21.7k
    if (s->seq == kMaxSequenceNumber) {
1181
21.7k
      s->seq = seq;
1182
21.7k
      if (s->seq > max_covering_tombstone_seq) {
1183
21.7k
        if (ts_sz && s->timestamp != nullptr) {
1184
          // `timestamp` was set to range tombstone's timestamp before
1185
          // `SaveValue` is ever called. This key has a higher sequence number
1186
          // than range tombstone, and is the key with the highest seqno across
1187
          // all keys with this user_key, so we update timestamp here.
1188
0
          Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
1189
0
          s->timestamp->assign(ts.data(), ts_sz);
1190
0
        }
1191
21.7k
      } else {
1192
0
        s->seq = max_covering_tombstone_seq;
1193
0
      }
1194
21.7k
    }
1195
1196
21.7k
    if (ts_sz > 0 && s->timestamp != nullptr) {
1197
0
      if (!s->timestamp->empty()) {
1198
0
        assert(ts_sz == s->timestamp->size());
1199
0
      }
1200
      // TODO optimize for smaller size ts
1201
0
      const std::string kMaxTs(ts_sz, '\xff');
1202
0
      if (s->timestamp->empty() ||
1203
0
          user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) {
1204
0
        Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
1205
0
        s->timestamp->assign(ts.data(), ts_sz);
1206
0
      }
1207
0
    }
1208
1209
21.7k
    if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
1210
21.7k
         type == kTypeWideColumnEntity || type == kTypeDeletion ||
1211
21.7k
         type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp ||
1212
21.7k
         type == kTypeValuePreferredSeqno) &&
1213
21.7k
        max_covering_tombstone_seq > seq) {
1214
0
      type = kTypeRangeDeletion;
1215
0
    }
1216
21.7k
    switch (type) {
1217
0
      case kTypeBlobIndex: {
1218
0
        if (!s->do_merge) {
1219
0
          *(s->status) = Status::NotSupported(
1220
0
              "GetMergeOperands not supported by stacked BlobDB");
1221
0
          *(s->found_final_value) = true;
1222
0
          return false;
1223
0
        }
1224
1225
0
        if (*(s->merge_in_progress)) {
1226
0
          *(s->status) = Status::NotSupported(
1227
0
              "Merge operator not supported by stacked BlobDB");
1228
0
          *(s->found_final_value) = true;
1229
0
          return false;
1230
0
        }
1231
1232
0
        if (s->is_blob_index == nullptr) {
1233
0
          ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index.");
1234
0
          *(s->status) = Status::NotSupported(
1235
0
              "Encountered unexpected blob index. Please open DB with "
1236
0
              "ROCKSDB_NAMESPACE::blob_db::BlobDB.");
1237
0
          *(s->found_final_value) = true;
1238
0
          return false;
1239
0
        }
1240
1241
0
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1242
1243
0
        *(s->status) = Status::OK();
1244
1245
0
        if (s->value) {
1246
0
          s->value->assign(v.data(), v.size());
1247
0
        } else if (s->columns) {
1248
0
          s->columns->SetPlainValue(v);
1249
0
        }
1250
1251
0
        *(s->found_final_value) = true;
1252
0
        *(s->is_blob_index) = true;
1253
1254
0
        return false;
1255
0
      }
1256
21.4k
      case kTypeValue:
1257
21.4k
      case kTypeValuePreferredSeqno: {
1258
21.4k
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1259
21.4k
        if (type == kTypeValuePreferredSeqno) {
1260
0
          v = ParsePackedValueForValue(v);
1261
0
        }
1262
1263
21.4k
        ReadOnlyMemTable::HandleTypeValue(
1264
21.4k
            s->key->user_key(), v, s->inplace_update_support == false,
1265
21.4k
            s->do_merge, *(s->merge_in_progress), merge_context,
1266
21.4k
            s->merge_operator, s->clock, s->statistics, s->logger, s->status,
1267
21.4k
            s->value, s->columns, s->is_blob_index);
1268
21.4k
        *(s->found_final_value) = true;
1269
21.4k
        return false;
1270
21.4k
      }
1271
0
      case kTypeWideColumnEntity: {
1272
0
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1273
1274
0
        *(s->status) = Status::OK();
1275
1276
0
        if (!s->do_merge) {
1277
          // Preserve the value with the goal of returning it as part of
1278
          // raw merge operands to the user
1279
1280
0
          Slice value_of_default;
1281
0
          *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
1282
0
              v, value_of_default);
1283
1284
0
          if (s->status->ok()) {
1285
0
            merge_context->PushOperand(
1286
0
                value_of_default,
1287
0
                s->inplace_update_support == false /* operand_pinned */);
1288
0
          }
1289
0
        } else if (*(s->merge_in_progress)) {
1290
0
          assert(s->do_merge);
1291
1292
0
          if (s->value || s->columns) {
1293
            // `op_failure_scope` (an output parameter) is not provided (set
1294
            // to nullptr) since a failure must be propagated regardless of
1295
            // its value.
1296
0
            *(s->status) = MergeHelper::TimedFullMerge(
1297
0
                merge_operator, s->key->user_key(), MergeHelper::kWideBaseValue,
1298
0
                v, merge_context->GetOperands(), s->logger, s->statistics,
1299
0
                s->clock, /* update_num_ops_stats */ true,
1300
0
                /* op_failure_scope */ nullptr, s->value, s->columns);
1301
0
          }
1302
0
        } else if (s->value) {
1303
0
          Slice value_of_default;
1304
0
          *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
1305
0
              v, value_of_default);
1306
0
          if (s->status->ok()) {
1307
0
            s->value->assign(value_of_default.data(), value_of_default.size());
1308
0
          }
1309
0
        } else if (s->columns) {
1310
0
          *(s->status) = s->columns->SetWideColumnValue(v);
1311
0
        }
1312
1313
0
        *(s->found_final_value) = true;
1314
1315
0
        if (s->is_blob_index != nullptr) {
1316
0
          *(s->is_blob_index) = false;
1317
0
        }
1318
1319
0
        return false;
1320
21.4k
      }
1321
365
      case kTypeDeletion:
1322
365
      case kTypeDeletionWithTimestamp:
1323
365
      case kTypeSingleDeletion:
1324
365
      case kTypeRangeDeletion: {
1325
365
        ReadOnlyMemTable::HandleTypeDeletion(
1326
365
            s->key->user_key(), *(s->merge_in_progress), s->merge_context,
1327
365
            s->merge_operator, s->clock, s->statistics, s->logger, s->status,
1328
365
            s->value, s->columns);
1329
365
        *(s->found_final_value) = true;
1330
365
        return false;
1331
365
      }
1332
0
      case kTypeMerge: {
1333
0
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1334
0
        *(s->merge_in_progress) = true;
1335
0
        *(s->found_final_value) = ReadOnlyMemTable::HandleTypeMerge(
1336
0
            s->key->user_key(), v, s->inplace_update_support == false,
1337
0
            s->do_merge, merge_context, s->merge_operator, s->clock,
1338
0
            s->statistics, s->logger, s->status, s->value, s->columns);
1339
0
        return !*(s->found_final_value);
1340
365
      }
1341
0
      default: {
1342
0
        std::string msg("Corrupted value not expected.");
1343
0
        if (s->allow_data_in_errors) {
1344
0
          msg.append("Unrecognized value type: " +
1345
0
                     std::to_string(static_cast<int>(type)) + ". ");
1346
0
          msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) +
1347
0
                     ". ");
1348
0
          msg.append("seq: " + std::to_string(seq) + ".");
1349
0
        }
1350
0
        *(s->found_final_value) = true;
1351
0
        *(s->status) = Status::Corruption(msg.c_str());
1352
0
        return false;
1353
365
      }
1354
21.7k
    }
1355
21.7k
  }
1356
1357
  // s->state could be Corrupt, merge or notfound
1358
1.05k
  return false;
1359
22.8k
}
1360
1361
bool MemTable::Get(const LookupKey& key, std::string* value,
1362
                   PinnableWideColumns* columns, std::string* timestamp,
1363
                   Status* s, MergeContext* merge_context,
1364
                   SequenceNumber* max_covering_tombstone_seq,
1365
                   SequenceNumber* seq, const ReadOptions& read_opts,
1366
                   bool immutable_memtable, ReadCallback* callback,
1367
24.7k
                   bool* is_blob_index, bool do_merge) {
1368
  // The sequence number is updated synchronously in version_set.h
1369
24.7k
  if (IsEmpty()) {
1370
    // Avoiding recording stats for speed.
1371
1.79k
    return false;
1372
1.79k
  }
1373
1374
22.9k
  PERF_TIMER_GUARD(get_from_memtable_time);
1375
1376
22.9k
  std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
1377
22.9k
      NewRangeTombstoneIterator(read_opts,
1378
22.9k
                                GetInternalKeySeqno(key.internal_key()),
1379
22.9k
                                immutable_memtable));
1380
22.9k
  if (range_del_iter != nullptr) {
1381
0
    SequenceNumber covering_seq =
1382
0
        range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key());
1383
0
    if (covering_seq > *max_covering_tombstone_seq) {
1384
0
      *max_covering_tombstone_seq = covering_seq;
1385
0
      if (timestamp) {
1386
        // Will be overwritten in SaveValue() if there is a point key with
1387
        // a higher seqno.
1388
0
        timestamp->assign(range_del_iter->timestamp().data(),
1389
0
                          range_del_iter->timestamp().size());
1390
0
      }
1391
0
    }
1392
0
  }
1393
1394
22.9k
  bool found_final_value = false;
1395
22.9k
  bool merge_in_progress = s->IsMergeInProgress();
1396
22.9k
  bool may_contain = true;
1397
22.9k
  Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz_);
1398
22.9k
  bool bloom_checked = false;
1399
22.9k
  if (bloom_filter_) {
1400
    // when both memtable_whole_key_filtering and prefix_extractor_ are set,
1401
    // only do whole key filtering for Get() to save CPU
1402
0
    if (moptions_.memtable_whole_key_filtering) {
1403
0
      may_contain = bloom_filter_->MayContain(user_key_without_ts);
1404
0
      bloom_checked = true;
1405
0
    } else {
1406
0
      assert(prefix_extractor_);
1407
0
      if (prefix_extractor_->InDomain(user_key_without_ts)) {
1408
0
        may_contain = bloom_filter_->MayContain(
1409
0
            prefix_extractor_->Transform(user_key_without_ts));
1410
0
        bloom_checked = true;
1411
0
      }
1412
0
    }
1413
0
  }
1414
1415
22.9k
  if (bloom_filter_ && !may_contain) {
1416
    // iter is null if prefix bloom says the key does not exist
1417
0
    PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
1418
0
    *seq = kMaxSequenceNumber;
1419
22.9k
  } else {
1420
22.9k
    if (bloom_checked) {
1421
0
      PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
1422
0
    }
1423
22.9k
    GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
1424
22.9k
                 is_blob_index, value, columns, timestamp, s, merge_context,
1425
22.9k
                 seq, &found_final_value, &merge_in_progress);
1426
22.9k
  }
1427
1428
  // No change to value, since we have not yet found a Put/Delete
1429
  // Propagate corruption error
1430
22.9k
  if (!found_final_value && merge_in_progress) {
1431
0
    if (s->ok()) {
1432
0
      *s = Status::MergeInProgress();
1433
0
    } else {
1434
0
      assert(s->IsMergeInProgress());
1435
0
    }
1436
0
  }
1437
22.9k
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
1438
22.9k
  return found_final_value;
1439
24.7k
}
1440
1441
void MemTable::GetFromTable(const LookupKey& key,
1442
                            SequenceNumber max_covering_tombstone_seq,
1443
                            bool do_merge, ReadCallback* callback,
1444
                            bool* is_blob_index, std::string* value,
1445
                            PinnableWideColumns* columns,
1446
                            std::string* timestamp, Status* s,
1447
                            MergeContext* merge_context, SequenceNumber* seq,
1448
22.9k
                            bool* found_final_value, bool* merge_in_progress) {
1449
22.9k
  Saver saver;
1450
22.9k
  saver.status = s;
1451
22.9k
  saver.found_final_value = found_final_value;
1452
22.9k
  saver.merge_in_progress = merge_in_progress;
1453
22.9k
  saver.key = &key;
1454
22.9k
  saver.value = value;
1455
22.9k
  saver.columns = columns;
1456
22.9k
  saver.timestamp = timestamp;
1457
22.9k
  saver.seq = kMaxSequenceNumber;
1458
22.9k
  saver.mem = this;
1459
22.9k
  saver.merge_context = merge_context;
1460
22.9k
  saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
1461
22.9k
  saver.merge_operator = moptions_.merge_operator;
1462
22.9k
  saver.logger = moptions_.info_log;
1463
22.9k
  saver.inplace_update_support = moptions_.inplace_update_support;
1464
22.9k
  saver.statistics = moptions_.statistics;
1465
22.9k
  saver.clock = clock_;
1466
22.9k
  saver.callback_ = callback;
1467
22.9k
  saver.is_blob_index = is_blob_index;
1468
22.9k
  saver.do_merge = do_merge;
1469
22.9k
  saver.allow_data_in_errors = moptions_.allow_data_in_errors;
1470
22.9k
  saver.protection_bytes_per_key = moptions_.protection_bytes_per_key;
1471
1472
22.9k
  if (!moptions_.paranoid_memory_checks) {
1473
22.9k
    table_->Get(key, &saver, SaveValue);
1474
22.9k
  } else {
1475
0
    Status check_s = table_->GetAndValidate(key, &saver, SaveValue,
1476
0
                                            moptions_.allow_data_in_errors);
1477
0
    if (check_s.IsCorruption()) {
1478
0
      *(saver.status) = check_s;
1479
      // Should stop searching the LSM.
1480
0
      *(saver.found_final_value) = true;
1481
0
    }
1482
0
  }
1483
22.9k
  assert(s->ok() || s->IsMergeInProgress() || *found_final_value);
1484
22.9k
  *seq = saver.seq;
1485
22.9k
}
1486
1487
void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
1488
0
                        ReadCallback* callback, bool immutable_memtable) {
1489
  // The sequence number is updated synchronously in version_set.h
1490
0
  if (IsEmpty()) {
1491
    // Avoiding recording stats for speed.
1492
0
    return;
1493
0
  }
1494
0
  PERF_TIMER_GUARD(get_from_memtable_time);
1495
1496
  // For now, memtable Bloom filter is effectively disabled if there are any
1497
  // range tombstones. This is the simplest way to ensure range tombstones are
1498
  // handled. TODO: allow Bloom checks where max_covering_tombstone_seq==0
1499
0
  bool no_range_del = read_options.ignore_range_deletions ||
1500
0
                      is_range_del_table_empty_.load(std::memory_order_relaxed);
1501
0
  MultiGetRange temp_range(*range, range->begin(), range->end());
1502
0
  if (bloom_filter_ && no_range_del) {
1503
0
    bool whole_key =
1504
0
        !prefix_extractor_ || moptions_.memtable_whole_key_filtering;
1505
0
    std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys;
1506
0
    std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match;
1507
0
    std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes;
1508
0
    int num_keys = 0;
1509
0
    for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
1510
0
      if (whole_key) {
1511
0
        bloom_keys[num_keys] = iter->ukey_without_ts;
1512
0
        range_indexes[num_keys++] = iter.index();
1513
0
      } else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) {
1514
0
        bloom_keys[num_keys] =
1515
0
            prefix_extractor_->Transform(iter->ukey_without_ts);
1516
0
        range_indexes[num_keys++] = iter.index();
1517
0
      }
1518
0
    }
1519
0
    bloom_filter_->MayContain(num_keys, bloom_keys.data(), may_match.data());
1520
0
    for (int i = 0; i < num_keys; ++i) {
1521
0
      if (!may_match[i]) {
1522
0
        temp_range.SkipIndex(range_indexes[i]);
1523
0
        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
1524
0
      } else {
1525
0
        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
1526
0
      }
1527
0
    }
1528
0
  }
1529
0
  for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
1530
0
    bool found_final_value{false};
1531
0
    bool merge_in_progress = iter->s->IsMergeInProgress();
1532
0
    if (!no_range_del) {
1533
0
      std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
1534
0
          NewRangeTombstoneIteratorInternal(
1535
0
              read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
1536
0
              immutable_memtable));
1537
0
      SequenceNumber covering_seq =
1538
0
          range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key());
1539
0
      if (covering_seq > iter->max_covering_tombstone_seq) {
1540
0
        iter->max_covering_tombstone_seq = covering_seq;
1541
0
        if (iter->timestamp) {
1542
          // Will be overwritten in SaveValue() if there is a point key with
1543
          // a higher seqno.
1544
0
          iter->timestamp->assign(range_del_iter->timestamp().data(),
1545
0
                                  range_del_iter->timestamp().size());
1546
0
        }
1547
0
      }
1548
0
    }
1549
0
    SequenceNumber dummy_seq;
1550
0
    GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
1551
0
                 callback, &iter->is_blob_index,
1552
0
                 iter->value ? iter->value->GetSelf() : nullptr, iter->columns,
1553
0
                 iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq,
1554
0
                 &found_final_value, &merge_in_progress);
1555
1556
0
    if (!found_final_value && merge_in_progress) {
1557
0
      if (iter->s->ok()) {
1558
0
        *(iter->s) = Status::MergeInProgress();
1559
0
      } else {
1560
0
        assert(iter->s->IsMergeInProgress());
1561
0
      }
1562
0
    }
1563
1564
0
    if (found_final_value ||
1565
0
        (!iter->s->ok() && !iter->s->IsMergeInProgress())) {
1566
      // `found_final_value` should be set if an error/corruption occurs.
1567
      // The check on iter->s is just there in case GetFromTable() did not
1568
      // set `found_final_value` properly.
1569
0
      assert(found_final_value);
1570
0
      if (iter->value) {
1571
0
        iter->value->PinSelf();
1572
0
        range->AddValueSize(iter->value->size());
1573
0
      } else {
1574
0
        assert(iter->columns);
1575
0
        range->AddValueSize(iter->columns->serialized_size());
1576
0
      }
1577
1578
0
      range->MarkKeyDone(iter);
1579
0
      RecordTick(moptions_.statistics, MEMTABLE_HIT);
1580
0
      if (range->GetValueSize() > read_options.value_size_soft_limit) {
1581
        // Set all remaining keys in range to Abort
1582
0
        for (auto range_iter = range->begin(); range_iter != range->end();
1583
0
             ++range_iter) {
1584
0
          range->MarkKeyDone(range_iter);
1585
0
          *(range_iter->s) = Status::Aborted();
1586
0
        }
1587
0
        break;
1588
0
      }
1589
0
    }
1590
0
  }
1591
0
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
1592
0
}
1593
1594
Status MemTable::Update(SequenceNumber seq, ValueType value_type,
1595
                        const Slice& key, const Slice& value,
1596
0
                        const ProtectionInfoKVOS64* kv_prot_info) {
1597
0
  LookupKey lkey(key, seq);
1598
0
  Slice mem_key = lkey.memtable_key();
1599
1600
0
  std::unique_ptr<MemTableRep::Iterator> iter(
1601
0
      table_->GetDynamicPrefixIterator());
1602
0
  iter->Seek(lkey.internal_key(), mem_key.data());
1603
1604
0
  if (iter->Valid()) {
1605
    // Refer to comments under MemTable::Add() for entry format.
1606
    // Check that it belongs to same user key.  We do not check the
1607
    // sequence number since the Seek() call above should have skipped
1608
    // all entries with overly large sequence numbers.
1609
0
    const char* entry = iter->key();
1610
0
    uint32_t key_length = 0;
1611
0
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1612
0
    if (comparator_.comparator.user_comparator()->Equal(
1613
0
            Slice(key_ptr, key_length - 8), lkey.user_key())) {
1614
      // Correct user key
1615
0
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1616
0
      ValueType type;
1617
0
      SequenceNumber existing_seq;
1618
0
      UnPackSequenceAndType(tag, &existing_seq, &type);
1619
0
      assert(existing_seq != seq);
1620
0
      if (type == value_type) {
1621
0
        Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
1622
0
        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1623
0
        uint32_t new_size = static_cast<uint32_t>(value.size());
1624
1625
        // Update value, if new value size  <= previous value size
1626
0
        if (new_size <= prev_size) {
1627
0
          WriteLock wl(GetLock(lkey.user_key()));
1628
0
          char* p =
1629
0
              EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
1630
0
          memcpy(p, value.data(), value.size());
1631
0
          assert((unsigned)((p + value.size()) - entry) ==
1632
0
                 (unsigned)(VarintLength(key_length) + key_length +
1633
0
                            VarintLength(value.size()) + value.size()));
1634
0
          RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
1635
0
          if (kv_prot_info != nullptr) {
1636
0
            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1637
            // `seq` is swallowed and `existing_seq` prevails.
1638
0
            updated_kv_prot_info.UpdateS(seq, existing_seq);
1639
0
            UpdateEntryChecksum(&updated_kv_prot_info, key, value, type,
1640
0
                                existing_seq, p + value.size());
1641
0
            Slice encoded(entry, p + value.size() - entry);
1642
0
            return VerifyEncodedEntry(encoded, updated_kv_prot_info);
1643
0
          } else {
1644
0
            UpdateEntryChecksum(nullptr, key, value, type, existing_seq,
1645
0
                                p + value.size());
1646
0
          }
1647
0
          return Status::OK();
1648
0
        }
1649
0
      }
1650
0
    }
1651
0
  }
1652
1653
  // The latest value is not value_type or key doesn't exist
1654
0
  return Add(seq, value_type, key, value, kv_prot_info);
1655
0
}
1656
1657
Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
1658
                                const Slice& delta,
1659
0
                                const ProtectionInfoKVOS64* kv_prot_info) {
1660
0
  LookupKey lkey(key, seq);
1661
0
  Slice memkey = lkey.memtable_key();
1662
1663
0
  std::unique_ptr<MemTableRep::Iterator> iter(
1664
0
      table_->GetDynamicPrefixIterator());
1665
0
  iter->Seek(lkey.internal_key(), memkey.data());
1666
1667
0
  if (iter->Valid()) {
1668
    // Refer to comments under MemTable::Add() for entry format.
1669
    // Check that it belongs to same user key.  We do not check the
1670
    // sequence number since the Seek() call above should have skipped
1671
    // all entries with overly large sequence numbers.
1672
0
    const char* entry = iter->key();
1673
0
    uint32_t key_length = 0;
1674
0
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1675
0
    if (comparator_.comparator.user_comparator()->Equal(
1676
0
            Slice(key_ptr, key_length - 8), lkey.user_key())) {
1677
      // Correct user key
1678
0
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1679
0
      ValueType type;
1680
0
      uint64_t existing_seq;
1681
0
      UnPackSequenceAndType(tag, &existing_seq, &type);
1682
0
      if (type == kTypeValue) {
1683
0
        Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
1684
0
        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1685
1686
0
        char* prev_buffer = const_cast<char*>(prev_value.data());
1687
0
        uint32_t new_prev_size = prev_size;
1688
1689
0
        std::string str_value;
1690
0
        WriteLock wl(GetLock(lkey.user_key()));
1691
0
        auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
1692
0
                                                 delta, &str_value);
1693
0
        if (status == UpdateStatus::UPDATED_INPLACE) {
1694
          // Value already updated by callback.
1695
0
          assert(new_prev_size <= prev_size);
1696
0
          if (new_prev_size < prev_size) {
1697
            // overwrite the new prev_size
1698
0
            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
1699
0
                                     new_prev_size);
1700
0
            if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
1701
              // shift the value buffer as well.
1702
0
              memcpy(p, prev_buffer, new_prev_size);
1703
0
              prev_buffer = p;
1704
0
            }
1705
0
          }
1706
0
          RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
1707
0
          UpdateFlushState();
1708
0
          Slice new_value(prev_buffer, new_prev_size);
1709
0
          if (kv_prot_info != nullptr) {
1710
0
            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1711
            // `seq` is swallowed and `existing_seq` prevails.
1712
0
            updated_kv_prot_info.UpdateS(seq, existing_seq);
1713
0
            updated_kv_prot_info.UpdateV(delta, new_value);
1714
0
            Slice encoded(entry, prev_buffer + new_prev_size - entry);
1715
0
            UpdateEntryChecksum(&updated_kv_prot_info, key, new_value, type,
1716
0
                                existing_seq, prev_buffer + new_prev_size);
1717
0
            return VerifyEncodedEntry(encoded, updated_kv_prot_info);
1718
0
          } else {
1719
0
            UpdateEntryChecksum(nullptr, key, new_value, type, existing_seq,
1720
0
                                prev_buffer + new_prev_size);
1721
0
          }
1722
0
          return Status::OK();
1723
0
        } else if (status == UpdateStatus::UPDATED) {
1724
0
          Status s;
1725
0
          if (kv_prot_info != nullptr) {
1726
0
            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1727
0
            updated_kv_prot_info.UpdateV(delta, str_value);
1728
0
            s = Add(seq, kTypeValue, key, Slice(str_value),
1729
0
                    &updated_kv_prot_info);
1730
0
          } else {
1731
0
            s = Add(seq, kTypeValue, key, Slice(str_value),
1732
0
                    nullptr /* kv_prot_info */);
1733
0
          }
1734
0
          RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
1735
0
          UpdateFlushState();
1736
0
          return s;
1737
0
        } else if (status == UpdateStatus::UPDATE_FAILED) {
1738
          // `UPDATE_FAILED` is named incorrectly. It indicates no update
1739
          // happened. It does not indicate a failure happened.
1740
0
          UpdateFlushState();
1741
0
          return Status::OK();
1742
0
        }
1743
0
      }
1744
0
    }
1745
0
  }
1746
  // The latest value is not `kTypeValue` or key doesn't exist
1747
0
  return Status::NotFound();
1748
0
}
1749
1750
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key,
1751
0
                                             size_t limit) {
1752
0
  Slice memkey = key.memtable_key();
1753
1754
  // A total ordered iterator is costly for some memtablerep (prefix aware
1755
  // reps). By passing in the user key, we allow efficient iterator creation.
1756
  // The iterator only needs to be ordered within the same user key.
1757
0
  std::unique_ptr<MemTableRep::Iterator> iter(
1758
0
      table_->GetDynamicPrefixIterator());
1759
0
  iter->Seek(key.internal_key(), memkey.data());
1760
1761
0
  size_t num_successive_merges = 0;
1762
1763
0
  for (; iter->Valid() && num_successive_merges < limit; iter->Next()) {
1764
0
    const char* entry = iter->key();
1765
0
    uint32_t key_length = 0;
1766
0
    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1767
0
    if (!comparator_.comparator.user_comparator()->Equal(
1768
0
            Slice(iter_key_ptr, key_length - 8), key.user_key())) {
1769
0
      break;
1770
0
    }
1771
1772
0
    const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
1773
0
    ValueType type;
1774
0
    uint64_t unused;
1775
0
    UnPackSequenceAndType(tag, &unused, &type);
1776
0
    if (type != kTypeMerge) {
1777
0
      break;
1778
0
    }
1779
1780
0
    ++num_successive_merges;
1781
0
  }
1782
1783
0
  return num_successive_merges;
1784
0
}
1785
1786
void MemTableRep::Get(const LookupKey& k, void* callback_args,
1787
0
                      bool (*callback_func)(void* arg, const char* entry)) {
1788
0
  auto iter = GetDynamicPrefixIterator();
1789
0
  for (iter->Seek(k.internal_key(), k.memtable_key().data());
1790
0
       iter->Valid() && callback_func(callback_args, iter->key());
1791
0
       iter->Next()) {
1792
0
  }
1793
0
}
1794
1795
0
void MemTable::RefLogContainingPrepSection(uint64_t log) {
1796
0
  assert(log > 0);
1797
0
  auto cur = min_prep_log_referenced_.load();
1798
0
  while ((log < cur || cur == 0) &&
1799
0
         !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
1800
0
    cur = min_prep_log_referenced_.load();
1801
0
  }
1802
0
}
1803
1804
0
uint64_t MemTable::GetMinLogContainingPrepSection() {
1805
0
  return min_prep_log_referenced_.load();
1806
0
}
1807
1808
1.89M
void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
1809
1.89M
  if (ts_sz_ == 0) {
1810
1.89M
    return;
1811
1.89M
  }
1812
0
  const Comparator* ucmp = GetInternalKeyComparator().user_comparator();
1813
0
  Slice udt = ExtractTimestampFromUserKey(user_key, ts_sz_);
1814
0
  if (newest_udt_.empty() || ucmp->CompareTimestamp(udt, newest_udt_) > 0) {
1815
0
    newest_udt_ = udt;
1816
0
  }
1817
0
}
1818
1819
0
const Slice& MemTable::GetNewestUDT() const {
1820
0
  assert(ts_sz_ > 0);
1821
0
  return newest_udt_;
1822
0
}
1823
1824
}  // namespace ROCKSDB_NAMESPACE