Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/memtable.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/memtable.h"
11
12
#include <algorithm>
13
#include <array>
14
#include <limits>
15
#include <memory>
16
#include <optional>
17
18
#include "db/dbformat.h"
19
#include "db/kv_checksum.h"
20
#include "db/merge_context.h"
21
#include "db/merge_helper.h"
22
#include "db/pinned_iterators_manager.h"
23
#include "db/range_tombstone_fragmenter.h"
24
#include "db/read_callback.h"
25
#include "db/wide/wide_column_serialization.h"
26
#include "logging/logging.h"
27
#include "memory/arena.h"
28
#include "memory/memory_usage.h"
29
#include "monitoring/perf_context_imp.h"
30
#include "monitoring/statistics_impl.h"
31
#include "port/lang.h"
32
#include "port/port.h"
33
#include "rocksdb/comparator.h"
34
#include "rocksdb/env.h"
35
#include "rocksdb/iterator.h"
36
#include "rocksdb/merge_operator.h"
37
#include "rocksdb/slice_transform.h"
38
#include "rocksdb/types.h"
39
#include "rocksdb/write_buffer_manager.h"
40
#include "table/internal_iterator.h"
41
#include "table/iterator_wrapper.h"
42
#include "table/merging_iterator.h"
43
#include "util/autovector.h"
44
#include "util/coding.h"
45
#include "util/mutexlock.h"
46
47
namespace ROCKSDB_NAMESPACE {
48
49
ImmutableMemTableOptions::ImmutableMemTableOptions(
50
    const ImmutableOptions& ioptions,
51
    const MutableCFOptions& mutable_cf_options)
52
    : arena_block_size(mutable_cf_options.arena_block_size),
53
      memtable_prefix_bloom_bits(
54
          static_cast<uint32_t>(
55
              static_cast<double>(mutable_cf_options.write_buffer_size) *
56
              mutable_cf_options.memtable_prefix_bloom_size_ratio) *
57
          8u),
58
      memtable_huge_page_size(mutable_cf_options.memtable_huge_page_size),
59
      memtable_whole_key_filtering(
60
          mutable_cf_options.memtable_whole_key_filtering),
61
      inplace_update_support(ioptions.inplace_update_support),
62
      inplace_update_num_locks(mutable_cf_options.inplace_update_num_locks),
63
      inplace_callback(ioptions.inplace_callback),
64
      max_successive_merges(mutable_cf_options.max_successive_merges),
65
      strict_max_successive_merges(
66
          mutable_cf_options.strict_max_successive_merges),
67
      statistics(ioptions.stats),
68
      merge_operator(ioptions.merge_operator.get()),
69
      info_log(ioptions.logger),
70
      protection_bytes_per_key(
71
          mutable_cf_options.memtable_protection_bytes_per_key),
72
      allow_data_in_errors(ioptions.allow_data_in_errors),
73
17.3k
      paranoid_memory_checks(mutable_cf_options.paranoid_memory_checks) {}
74
75
MemTable::MemTable(const InternalKeyComparator& cmp,
76
                   const ImmutableOptions& ioptions,
77
                   const MutableCFOptions& mutable_cf_options,
78
                   WriteBufferManager* write_buffer_manager,
79
                   SequenceNumber latest_seq, uint32_t column_family_id)
80
    : comparator_(cmp),
81
      moptions_(ioptions, mutable_cf_options),
82
      refs_(0),
83
      kArenaBlockSize(Arena::OptimizeBlockSize(moptions_.arena_block_size)),
84
      mem_tracker_(write_buffer_manager),
85
      arena_(moptions_.arena_block_size,
86
             (write_buffer_manager != nullptr &&
87
              (write_buffer_manager->enabled() ||
88
               write_buffer_manager->cost_to_cache()))
89
                 ? &mem_tracker_
90
                 : nullptr,
91
             mutable_cf_options.memtable_huge_page_size),
92
      table_(ioptions.memtable_factory->CreateMemTableRep(
93
          comparator_, &arena_, mutable_cf_options.prefix_extractor.get(),
94
          ioptions.logger, column_family_id)),
95
      range_del_table_(SkipListFactory().CreateMemTableRep(
96
          comparator_, &arena_, nullptr /* transform */, ioptions.logger,
97
          column_family_id)),
98
      is_range_del_table_empty_(true),
99
      data_size_(0),
100
      num_entries_(0),
101
      num_deletes_(0),
102
      num_range_deletes_(0),
103
      write_buffer_size_(mutable_cf_options.write_buffer_size),
104
      flush_in_progress_(false),
105
      flush_completed_(false),
106
      file_number_(0),
107
      first_seqno_(0),
108
      earliest_seqno_(latest_seq),
109
      creation_seq_(latest_seq),
110
      mem_next_logfile_number_(0),
111
      min_prep_log_referenced_(0),
112
      locks_(moptions_.inplace_update_support
113
                 ? moptions_.inplace_update_num_locks
114
                 : 0),
115
      prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
116
      flush_state_(FLUSH_NOT_REQUESTED),
117
      clock_(ioptions.clock),
118
      insert_with_hint_prefix_extractor_(
119
          ioptions.memtable_insert_with_hint_prefix_extractor.get()),
120
      oldest_key_time_(std::numeric_limits<uint64_t>::max()),
121
      atomic_flush_seqno_(kMaxSequenceNumber),
122
      approximate_memory_usage_(0),
123
      memtable_max_range_deletions_(
124
17.3k
          mutable_cf_options.memtable_max_range_deletions) {
125
17.3k
  UpdateFlushState();
126
  // something went wrong if we need to flush before inserting anything
127
17.3k
  assert(!ShouldScheduleFlush());
128
129
  // use bloom_filter_ for both whole key and prefix bloom filter
130
17.3k
  if ((prefix_extractor_ || moptions_.memtable_whole_key_filtering) &&
131
17.3k
      moptions_.memtable_prefix_bloom_bits > 0) {
132
0
    bloom_filter_.reset(
133
0
        new DynamicBloom(&arena_, moptions_.memtable_prefix_bloom_bits,
134
0
                         6 /* hard coded 6 probes */,
135
0
                         moptions_.memtable_huge_page_size, ioptions.logger));
136
0
  }
137
  // Initialize cached_range_tombstone_ here since it could
138
  // be read before it is constructed in MemTable::Add(), which could also lead
139
  // to a data race on the global mutex table backing atomic shared_ptr.
140
17.3k
  auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
141
17.3k
  size_t size = cached_range_tombstone_.Size();
142
572k
  for (size_t i = 0; i < size; ++i) {
143
555k
    std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
144
555k
        cached_range_tombstone_.AccessAtCore(i);
145
555k
    auto new_local_cache_ref = std::make_shared<
146
555k
        const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
147
555k
    std::atomic_store_explicit(
148
555k
        local_cache_ref_ptr,
149
555k
        std::shared_ptr<FragmentedRangeTombstoneListCache>(new_local_cache_ref,
150
555k
                                                           new_cache.get()),
151
555k
        std::memory_order_relaxed);
152
555k
  }
153
17.3k
  const Comparator* ucmp = cmp.user_comparator();
154
17.3k
  assert(ucmp);
155
17.3k
  ts_sz_ = ucmp->timestamp_size();
156
17.3k
  persist_user_defined_timestamps_ = ioptions.persist_user_defined_timestamps;
157
17.3k
}
158
159
17.3k
MemTable::~MemTable() {
160
17.3k
  mem_tracker_.FreeMem();
161
17.3k
  assert(refs_ == 0);
162
17.3k
}
163
164
532
size_t MemTable::ApproximateMemoryUsage() {
165
532
  autovector<size_t> usages = {
166
532
      arena_.ApproximateMemoryUsage(), table_->ApproximateMemoryUsage(),
167
532
      range_del_table_->ApproximateMemoryUsage(),
168
532
      ROCKSDB_NAMESPACE::ApproximateMemoryUsage(insert_hints_)};
169
532
  size_t total_usage = 0;
170
2.12k
  for (size_t usage : usages) {
171
    // If usage + total_usage >= kMaxSizet, return kMaxSizet.
172
    // the following variation is to avoid numeric overflow.
173
2.12k
    if (usage >= std::numeric_limits<size_t>::max() - total_usage) {
174
0
      return std::numeric_limits<size_t>::max();
175
0
    }
176
2.12k
    total_usage += usage;
177
2.12k
  }
178
532
  approximate_memory_usage_.store(total_usage, std::memory_order_relaxed);
179
  // otherwise, return the actual usage
180
532
  return total_usage;
181
532
}
182
183
2.20M
bool MemTable::ShouldFlushNow() {
184
  // This is set if memtable_max_range_deletions is > 0,
185
  // and that many range deletions are done
186
2.20M
  if (memtable_max_range_deletions_ > 0 &&
187
2.20M
      num_range_deletes_.load(std::memory_order_relaxed) >=
188
0
          static_cast<uint64_t>(memtable_max_range_deletions_)) {
189
0
    return true;
190
0
  }
191
192
2.20M
  size_t write_buffer_size = write_buffer_size_.load(std::memory_order_relaxed);
193
  // In a lot of times, we cannot allocate arena blocks that exactly matches the
194
  // buffer size. Thus we have to decide if we should over-allocate or
195
  // under-allocate.
196
  // This constant variable can be interpreted as: if we still have more than
197
  // "kAllowOverAllocationRatio * kArenaBlockSize" space left, we'd try to over
198
  // allocate one more block.
199
2.20M
  const double kAllowOverAllocationRatio = 0.6;
200
201
  // If arena still have room for new block allocation, we can safely say it
202
  // shouldn't flush.
203
2.20M
  auto allocated_memory = table_->ApproximateMemoryUsage() +
204
2.20M
                          range_del_table_->ApproximateMemoryUsage() +
205
2.20M
                          arena_.MemoryAllocatedBytes();
206
207
2.20M
  approximate_memory_usage_.store(allocated_memory, std::memory_order_relaxed);
208
209
  // if we can still allocate one more block without exceeding the
210
  // over-allocation ratio, then we should not flush.
211
2.20M
  if (allocated_memory + kArenaBlockSize <
212
2.20M
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
213
2.20M
    return false;
214
2.20M
  }
215
216
  // if user keeps adding entries that exceeds write_buffer_size, we need to
217
  // flush earlier even though we still have much available memory left.
218
0
  if (allocated_memory >
219
0
      write_buffer_size + kArenaBlockSize * kAllowOverAllocationRatio) {
220
0
    return true;
221
0
  }
222
223
  // In this code path, Arena has already allocated its "last block", which
224
  // means the total allocatedmemory size is either:
225
  //  (1) "moderately" over allocated the memory (no more than `0.6 * arena
226
  // block size`. Or,
227
  //  (2) the allocated memory is less than write buffer size, but we'll stop
228
  // here since if we allocate a new arena block, we'll over allocate too much
229
  // more (half of the arena block size) memory.
230
  //
231
  // In either case, to avoid over-allocate, the last block will stop allocation
232
  // when its usage reaches a certain ratio, which we carefully choose "0.75
233
  // full" as the stop condition because it addresses the following issue with
234
  // great simplicity: What if the next inserted entry's size is
235
  // bigger than AllocatedAndUnused()?
236
  //
237
  // The answer is: if the entry size is also bigger than 0.25 *
238
  // kArenaBlockSize, a dedicated block will be allocated for it; otherwise
239
  // arena will anyway skip the AllocatedAndUnused() and allocate a new, empty
240
  // and regular block. In either case, we *overly* over-allocated.
241
  //
242
  // Therefore, setting the last block to be at most "0.75 full" avoids both
243
  // cases.
244
  //
245
  // NOTE: the average percentage of waste space of this approach can be counted
246
  // as: "arena block size * 0.25 / write buffer size". User who specify a small
247
  // write buffer size and/or big arena block size may suffer.
248
0
  return arena_.AllocatedAndUnused() < kArenaBlockSize / 4;
249
0
}
250
251
2.20M
void MemTable::UpdateFlushState() {
252
2.20M
  auto state = flush_state_.load(std::memory_order_relaxed);
253
2.20M
  if (state == FLUSH_NOT_REQUESTED && ShouldFlushNow()) {
254
    // ignore CAS failure, because that means somebody else requested
255
    // a flush
256
0
    flush_state_.compare_exchange_strong(state, FLUSH_REQUESTED,
257
0
                                         std::memory_order_relaxed,
258
0
                                         std::memory_order_relaxed);
259
0
  }
260
2.20M
}
261
262
2.18M
void MemTable::UpdateOldestKeyTime() {
263
2.18M
  uint64_t oldest_key_time = oldest_key_time_.load(std::memory_order_relaxed);
264
2.18M
  if (oldest_key_time == std::numeric_limits<uint64_t>::max()) {
265
10.9k
    int64_t current_time = 0;
266
10.9k
    auto s = clock_->GetCurrentTime(&current_time);
267
10.9k
    if (s.ok()) {
268
10.9k
      assert(current_time >= 0);
269
      // If fail, the timestamp is already set.
270
10.9k
      oldest_key_time_.compare_exchange_strong(
271
10.9k
          oldest_key_time, static_cast<uint64_t>(current_time),
272
10.9k
          std::memory_order_relaxed, std::memory_order_relaxed);
273
10.9k
    }
274
10.9k
  }
275
2.18M
}
276
277
Status MemTable::VerifyEntryChecksum(const char* entry,
278
                                     uint32_t protection_bytes_per_key,
279
0
                                     bool allow_data_in_errors) {
280
0
  if (protection_bytes_per_key == 0) {
281
0
    return Status::OK();
282
0
  }
283
0
  uint32_t key_length;
284
0
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
285
0
  if (key_ptr == nullptr) {
286
0
    return Status::Corruption("Unable to parse internal key length");
287
0
  }
288
0
  if (key_length < 8) {
289
0
    return Status::Corruption("Memtable entry internal key length too short.");
290
0
  }
291
0
  Slice user_key = Slice(key_ptr, key_length - 8);
292
293
0
  const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
294
0
  ValueType type;
295
0
  SequenceNumber seq;
296
0
  UnPackSequenceAndType(tag, &seq, &type);
297
298
0
  uint32_t value_length = 0;
299
0
  const char* value_ptr = GetVarint32Ptr(
300
0
      key_ptr + key_length, key_ptr + key_length + 5, &value_length);
301
0
  if (value_ptr == nullptr) {
302
0
    return Status::Corruption("Unable to parse internal key value");
303
0
  }
304
0
  Slice value = Slice(value_ptr, value_length);
305
306
0
  const char* checksum_ptr = value_ptr + value_length;
307
0
  bool match =
308
0
      ProtectionInfo64()
309
0
          .ProtectKVO(user_key, value, type)
310
0
          .ProtectS(seq)
311
0
          .Verify(static_cast<uint8_t>(protection_bytes_per_key), checksum_ptr);
312
0
  if (!match) {
313
0
    std::string msg(
314
0
        "Corrupted memtable entry, per key-value checksum verification "
315
0
        "failed.");
316
0
    if (allow_data_in_errors) {
317
0
      msg.append("Unrecognized value type: " +
318
0
                 std::to_string(static_cast<int>(type)) + ". ");
319
0
      msg.append("User key: " + user_key.ToString(/*hex=*/true) + ". ");
320
0
      msg.append("seq: " + std::to_string(seq) + ".");
321
0
    }
322
0
    return Status::Corruption(msg.c_str());
323
0
  }
324
0
  return Status::OK();
325
0
}
326
327
int MemTable::KeyComparator::operator()(const char* prefix_len_key1,
328
0
                                        const char* prefix_len_key2) const {
329
  // Internal keys are encoded as length-prefixed strings.
330
0
  Slice k1 = GetLengthPrefixedSlice(prefix_len_key1);
331
0
  Slice k2 = GetLengthPrefixedSlice(prefix_len_key2);
332
0
  return comparator.CompareKeySeq(k1, k2);
333
0
}
334
335
int MemTable::KeyComparator::operator()(
336
22.8M
    const char* prefix_len_key, const KeyComparator::DecodedType& key) const {
337
  // Internal keys are encoded as length-prefixed strings.
338
22.8M
  Slice a = GetLengthPrefixedSlice(prefix_len_key);
339
22.8M
  return comparator.CompareKeySeq(a, key);
340
22.8M
}
341
342
0
void MemTableRep::InsertConcurrently(KeyHandle /*handle*/) {
343
0
  throw std::runtime_error("concurrent insert not supported");
344
0
}
345
346
0
Slice MemTableRep::UserKey(const char* key) const {
347
0
  Slice slice = GetLengthPrefixedSlice(key);
348
0
  return Slice(slice.data(), slice.size() - 8);
349
0
}
350
351
0
KeyHandle MemTableRep::Allocate(const size_t len, char** buf) {
352
0
  *buf = allocator_->Allocate(len);
353
0
  return static_cast<KeyHandle>(*buf);
354
0
}
355
356
// Encode a suitable internal key target for "target" and return it.
357
// Uses *scratch as scratch space, and the returned pointer will point
358
// into this scratch space.
359
870
const char* EncodeKey(std::string* scratch, const Slice& target) {
360
870
  scratch->clear();
361
870
  PutVarint32(scratch, static_cast<uint32_t>(target.size()));
362
870
  scratch->append(target.data(), target.size());
363
870
  return scratch->data();
364
870
}
365
366
class MemTableIterator : public InternalIterator {
367
 public:
368
  MemTableIterator(const MemTable& mem, const ReadOptions& read_options,
369
                   UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping,
370
                   Arena* arena, bool use_range_del_table = false)
371
      : bloom_(nullptr),
372
        prefix_extractor_(mem.prefix_extractor_),
373
        comparator_(mem.comparator_),
374
        seqno_to_time_mapping_(seqno_to_time_mapping),
375
        status_(Status::OK()),
376
        logger_(mem.moptions_.info_log),
377
        ts_sz_(mem.ts_sz_),
378
        protection_bytes_per_key_(mem.moptions_.protection_bytes_per_key),
379
        valid_(false),
380
        value_pinned_(
381
            !mem.GetImmutableMemTableOptions()->inplace_update_support),
382
        arena_mode_(arena != nullptr),
383
        paranoid_memory_checks_(mem.moptions_.paranoid_memory_checks),
384
14.8k
        allow_data_in_error(mem.moptions_.allow_data_in_errors) {
385
14.8k
    if (use_range_del_table) {
386
3.12k
      iter_ = mem.range_del_table_->GetIterator(arena);
387
11.7k
    } else if (prefix_extractor_ != nullptr && !read_options.total_order_seek &&
388
11.7k
               !read_options.auto_prefix_mode) {
389
      // Auto prefix mode is not implemented in memtable yet.
390
0
      bloom_ = mem.bloom_filter_.get();
391
0
      iter_ = mem.table_->GetDynamicPrefixIterator(arena);
392
11.7k
    } else {
393
11.7k
      iter_ = mem.table_->GetIterator(arena);
394
11.7k
    }
395
14.8k
    status_.PermitUncheckedError();
396
14.8k
  }
397
  // No copying allowed
398
  MemTableIterator(const MemTableIterator&) = delete;
399
  void operator=(const MemTableIterator&) = delete;
400
401
14.8k
  ~MemTableIterator() override {
402
#ifndef NDEBUG
403
    // Assert that the MemTableIterator is never deleted while
404
    // Pinning is Enabled.
405
    assert(!pinned_iters_mgr_ || !pinned_iters_mgr_->PinningEnabled());
406
#endif
407
14.8k
    if (arena_mode_) {
408
11.7k
      iter_->~Iterator();
409
11.7k
    } else {
410
3.12k
      delete iter_;
411
3.12k
    }
412
14.8k
    status_.PermitUncheckedError();
413
14.8k
  }
414
415
#ifndef NDEBUG
416
  void SetPinnedItersMgr(PinnedIteratorsManager* pinned_iters_mgr) override {
417
    pinned_iters_mgr_ = pinned_iters_mgr;
418
  }
419
  PinnedIteratorsManager* pinned_iters_mgr_ = nullptr;
420
#endif
421
422
1.53M
  bool Valid() const override {
423
    // If inner iter_ is not valid, then this iter should also not be valid.
424
1.53M
    assert(iter_->Valid() || !(valid_ && status_.ok()));
425
1.53M
    return valid_ && status_.ok();
426
1.53M
  }
427
428
381
  void Seek(const Slice& k) override {
429
381
    PERF_TIMER_GUARD(seek_on_memtable_time);
430
381
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
431
381
    status_ = Status::OK();
432
381
    if (bloom_) {
433
      // iterator should only use prefix bloom filter
434
0
      Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
435
0
      if (prefix_extractor_->InDomain(user_k_without_ts)) {
436
0
        if (!bloom_->MayContain(
437
0
                prefix_extractor_->Transform(user_k_without_ts))) {
438
0
          PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
439
0
          valid_ = false;
440
0
          return;
441
0
        } else {
442
0
          PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
443
0
        }
444
0
      }
445
0
    }
446
381
    if (paranoid_memory_checks_) {
447
0
      status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error);
448
381
    } else {
449
381
      iter_->Seek(k, nullptr);
450
381
    }
451
381
    valid_ = iter_->Valid();
452
381
    VerifyEntryChecksum();
453
381
  }
454
489
  void SeekForPrev(const Slice& k) override {
455
489
    PERF_TIMER_GUARD(seek_on_memtable_time);
456
489
    PERF_COUNTER_ADD(seek_on_memtable_count, 1);
457
489
    status_ = Status::OK();
458
489
    if (bloom_) {
459
0
      Slice user_k_without_ts(ExtractUserKeyAndStripTimestamp(k, ts_sz_));
460
0
      if (prefix_extractor_->InDomain(user_k_without_ts)) {
461
0
        if (!bloom_->MayContain(
462
0
                prefix_extractor_->Transform(user_k_without_ts))) {
463
0
          PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
464
0
          valid_ = false;
465
0
          return;
466
0
        } else {
467
0
          PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
468
0
        }
469
0
      }
470
0
    }
471
489
    if (paranoid_memory_checks_) {
472
0
      status_ = iter_->SeekAndValidate(k, nullptr, allow_data_in_error);
473
489
    } else {
474
489
      iter_->Seek(k, nullptr);
475
489
    }
476
489
    valid_ = iter_->Valid();
477
489
    VerifyEntryChecksum();
478
489
    if (!Valid() && status().ok()) {
479
467
      SeekToLast();
480
467
    }
481
511
    while (Valid() && comparator_.comparator.Compare(k, key()) < 0) {
482
22
      Prev();
483
22
    }
484
489
  }
485
16.7k
  void SeekToFirst() override {
486
16.7k
    status_ = Status::OK();
487
16.7k
    iter_->SeekToFirst();
488
16.7k
    valid_ = iter_->Valid();
489
16.7k
    VerifyEntryChecksum();
490
16.7k
  }
491
467
  void SeekToLast() override {
492
467
    status_ = Status::OK();
493
467
    iter_->SeekToLast();
494
467
    valid_ = iter_->Valid();
495
467
    VerifyEntryChecksum();
496
467
  }
497
1.46M
  void Next() override {
498
1.46M
    PERF_COUNTER_ADD(next_on_memtable_count, 1);
499
1.46M
    assert(Valid());
500
1.46M
    if (paranoid_memory_checks_) {
501
0
      status_ = iter_->NextAndValidate(allow_data_in_error);
502
1.46M
    } else {
503
1.46M
      iter_->Next();
504
1.46M
      TEST_SYNC_POINT_CALLBACK("MemTableIterator::Next:0", iter_);
505
1.46M
    }
506
1.46M
    valid_ = iter_->Valid();
507
1.46M
    VerifyEntryChecksum();
508
1.46M
  }
509
189
  bool NextAndGetResult(IterateResult* result) override {
510
189
    Next();
511
189
    bool is_valid = Valid();
512
189
    if (is_valid) {
513
105
      result->key = key();
514
105
      result->bound_check_result = IterBoundCheck::kUnknown;
515
105
      result->value_prepared = true;
516
105
    }
517
189
    return is_valid;
518
189
  }
519
582
  void Prev() override {
520
582
    PERF_COUNTER_ADD(prev_on_memtable_count, 1);
521
582
    assert(Valid());
522
582
    if (paranoid_memory_checks_) {
523
0
      status_ = iter_->PrevAndValidate(allow_data_in_error);
524
582
    } else {
525
582
      iter_->Prev();
526
582
    }
527
582
    valid_ = iter_->Valid();
528
582
    VerifyEntryChecksum();
529
582
  }
530
2.21M
  Slice key() const override {
531
2.21M
    assert(Valid());
532
2.21M
    return GetLengthPrefixedSlice(iter_->key());
533
2.21M
  }
534
535
411
  uint64_t write_unix_time() const override {
536
411
    assert(Valid());
537
411
    ParsedInternalKey pikey;
538
411
    Status s = ParseInternalKey(key(), &pikey, /*log_err_key=*/false);
539
411
    if (!s.ok()) {
540
0
      return std::numeric_limits<uint64_t>::max();
541
411
    } else if (kTypeValuePreferredSeqno == pikey.type) {
542
0
      return ParsePackedValueForWriteTime(value());
543
411
    } else if (!seqno_to_time_mapping_ || seqno_to_time_mapping_->Empty()) {
544
411
      return std::numeric_limits<uint64_t>::max();
545
411
    }
546
0
    return seqno_to_time_mapping_->GetProximalTimeBeforeSeqno(pikey.sequence);
547
411
  }
548
549
1.46M
  Slice value() const override {
550
1.46M
    assert(Valid());
551
1.46M
    Slice key_slice = GetLengthPrefixedSlice(iter_->key());
552
1.46M
    return GetLengthPrefixedSlice(key_slice.data() + key_slice.size());
553
1.46M
  }
554
555
17.9k
  Status status() const override { return status_; }
556
557
750k
  bool IsKeyPinned() const override {
558
    // memtable data is always pinned
559
750k
    return true;
560
750k
  }
561
562
375k
  bool IsValuePinned() const override {
563
    // memtable value is always pinned, except if we allow inplace update.
564
375k
    return value_pinned_;
565
375k
  }
566
567
 private:
568
  DynamicBloom* bloom_;
569
  const SliceTransform* const prefix_extractor_;
570
  const MemTable::KeyComparator comparator_;
571
  MemTableRep::Iterator* iter_;
572
  // The seqno to time mapping is owned by the SuperVersion.
573
  UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping_;
574
  Status status_;
575
  Logger* logger_;
576
  size_t ts_sz_;
577
  uint32_t protection_bytes_per_key_;
578
  bool valid_;
579
  bool value_pinned_;
580
  bool arena_mode_;
581
  const bool paranoid_memory_checks_;
582
  const bool allow_data_in_error;
583
584
1.48M
  void VerifyEntryChecksum() {
585
1.48M
    if (protection_bytes_per_key_ > 0 && Valid()) {
586
0
      status_ = MemTable::VerifyEntryChecksum(iter_->key(),
587
0
                                              protection_bytes_per_key_);
588
0
      if (!status_.ok()) {
589
0
        ROCKS_LOG_ERROR(logger_, "In MemtableIterator: %s", status_.getState());
590
0
      }
591
0
    }
592
1.48M
  }
593
};
594
595
InternalIterator* MemTable::NewIterator(
596
    const ReadOptions& read_options,
597
11.7k
    UnownedPtr<const SeqnoToTimeMapping> seqno_to_time_mapping, Arena* arena) {
598
11.7k
  assert(arena != nullptr);
599
11.7k
  auto mem = arena->AllocateAligned(sizeof(MemTableIterator));
600
11.7k
  return new (mem)
601
11.7k
      MemTableIterator(*this, read_options, seqno_to_time_mapping, arena);
602
11.7k
}
603
604
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIterator(
605
    const ReadOptions& read_options, SequenceNumber read_seq,
606
12.1k
    bool immutable_memtable) {
607
12.1k
  if (read_options.ignore_range_deletions ||
608
12.1k
      is_range_del_table_empty_.load(std::memory_order_relaxed)) {
609
9.00k
    return nullptr;
610
9.00k
  }
611
3.12k
  return NewRangeTombstoneIteratorInternal(read_options, read_seq,
612
3.12k
                                           immutable_memtable);
613
12.1k
}
614
615
FragmentedRangeTombstoneIterator* MemTable::NewRangeTombstoneIteratorInternal(
616
    const ReadOptions& read_options, SequenceNumber read_seq,
617
3.12k
    bool immutable_memtable) {
618
3.12k
  if (immutable_memtable) {
619
    // Note that caller should already have verified that
620
    // !is_range_del_table_empty_
621
0
    assert(IsFragmentedRangeTombstonesConstructed());
622
0
    return new FragmentedRangeTombstoneIterator(
623
0
        fragmented_range_tombstone_list_.get(), comparator_.comparator,
624
0
        read_seq, read_options.timestamp);
625
0
  }
626
627
  // takes current cache
628
3.12k
  std::shared_ptr<FragmentedRangeTombstoneListCache> cache =
629
3.12k
      std::atomic_load_explicit(cached_range_tombstone_.Access(),
630
3.12k
                                std::memory_order_relaxed);
631
  // construct fragmented tombstone list if necessary
632
3.12k
  if (!cache->initialized.load(std::memory_order_acquire)) {
633
3.12k
    cache->reader_mutex.lock();
634
3.12k
    if (!cache->tombstones) {
635
3.12k
      auto* unfragmented_iter = new MemTableIterator(
636
3.12k
          *this, read_options, nullptr /* seqno_to_time_mapping= */,
637
3.12k
          nullptr /* arena */, true /* use_range_del_table */);
638
3.12k
      cache->tombstones.reset(new FragmentedRangeTombstoneList(
639
3.12k
          std::unique_ptr<InternalIterator>(unfragmented_iter),
640
3.12k
          comparator_.comparator));
641
3.12k
      cache->initialized.store(true, std::memory_order_release);
642
3.12k
    }
643
3.12k
    cache->reader_mutex.unlock();
644
3.12k
  }
645
646
3.12k
  auto* fragmented_iter = new FragmentedRangeTombstoneIterator(
647
3.12k
      cache, comparator_.comparator, read_seq, read_options.timestamp);
648
3.12k
  return fragmented_iter;
649
3.12k
}
650
651
133
void MemTable::ConstructFragmentedRangeTombstones() {
652
  // There should be no concurrent Construction.
653
  // We could also check fragmented_range_tombstone_list_ to avoid repeate
654
  // constructions. We just construct them here again to be safe.
655
133
  if (!is_range_del_table_empty_.load(std::memory_order_relaxed)) {
656
    // TODO: plumb Env::IOActivity, Env::IOPriority
657
0
    auto* unfragmented_iter = new MemTableIterator(
658
0
        *this, ReadOptions(), nullptr /*seqno_to_time_mapping=*/,
659
0
        nullptr /* arena */, true /* use_range_del_table */);
660
661
0
    fragmented_range_tombstone_list_ =
662
0
        std::make_unique<FragmentedRangeTombstoneList>(
663
0
            std::unique_ptr<InternalIterator>(unfragmented_iter),
664
0
            comparator_.comparator);
665
0
  }
666
133
}
667
668
0
port::RWMutex* MemTable::GetLock(const Slice& key) {
669
0
  return &locks_[GetSliceRangedNPHash(key, locks_.size())];
670
0
}
671
672
MemTable::MemTableStats MemTable::ApproximateStats(const Slice& start_ikey,
673
0
                                                   const Slice& end_ikey) {
674
0
  uint64_t entry_count = table_->ApproximateNumEntries(start_ikey, end_ikey);
675
0
  entry_count += range_del_table_->ApproximateNumEntries(start_ikey, end_ikey);
676
0
  if (entry_count == 0) {
677
0
    return {0, 0};
678
0
  }
679
0
  uint64_t n = num_entries_.load(std::memory_order_relaxed);
680
0
  if (n == 0) {
681
0
    return {0, 0};
682
0
  }
683
0
  if (entry_count > n) {
684
    // (range_del_)table_->ApproximateNumEntries() is just an estimate so it can
685
    // be larger than actual entries we have. Cap it to entries we have to limit
686
    // the inaccuracy.
687
0
    entry_count = n;
688
0
  }
689
0
  uint64_t data_size = data_size_.load(std::memory_order_relaxed);
690
0
  return {entry_count * (data_size / n), entry_count};
691
0
}
692
693
Status MemTable::VerifyEncodedEntry(Slice encoded,
694
1.09M
                                    const ProtectionInfoKVOS64& kv_prot_info) {
695
1.09M
  uint32_t ikey_len = 0;
696
1.09M
  if (!GetVarint32(&encoded, &ikey_len)) {
697
0
    return Status::Corruption("Unable to parse internal key length");
698
0
  }
699
1.09M
  if (ikey_len < 8 + ts_sz_) {
700
0
    return Status::Corruption("Internal key length too short");
701
0
  }
702
1.09M
  if (ikey_len > encoded.size()) {
703
0
    return Status::Corruption("Internal key length too long");
704
0
  }
705
1.09M
  uint32_t value_len = 0;
706
1.09M
  const size_t user_key_len = ikey_len - 8;
707
1.09M
  Slice key(encoded.data(), user_key_len);
708
1.09M
  encoded.remove_prefix(user_key_len);
709
710
1.09M
  uint64_t packed = DecodeFixed64(encoded.data());
711
1.09M
  ValueType value_type = kMaxValue;
712
1.09M
  SequenceNumber sequence_number = kMaxSequenceNumber;
713
1.09M
  UnPackSequenceAndType(packed, &sequence_number, &value_type);
714
1.09M
  encoded.remove_prefix(8);
715
716
1.09M
  if (!GetVarint32(&encoded, &value_len)) {
717
0
    return Status::Corruption("Unable to parse value length");
718
0
  }
719
1.09M
  if (value_len < encoded.size()) {
720
0
    return Status::Corruption("Value length too short");
721
0
  }
722
1.09M
  if (value_len > encoded.size()) {
723
0
    return Status::Corruption("Value length too long");
724
0
  }
725
1.09M
  Slice value(encoded.data(), value_len);
726
727
1.09M
  return kv_prot_info.StripS(sequence_number)
728
1.09M
      .StripKVO(key, value, value_type)
729
1.09M
      .GetStatus();
730
1.09M
}
731
732
void MemTable::UpdateEntryChecksum(const ProtectionInfoKVOS64* kv_prot_info,
733
                                   const Slice& key, const Slice& value,
734
                                   ValueType type, SequenceNumber s,
735
2.18M
                                   char* checksum_ptr) {
736
2.18M
  if (moptions_.protection_bytes_per_key == 0) {
737
2.18M
    return;
738
2.18M
  }
739
740
0
  if (kv_prot_info == nullptr) {
741
0
    ProtectionInfo64()
742
0
        .ProtectKVO(key, value, type)
743
0
        .ProtectS(s)
744
0
        .Encode(static_cast<uint8_t>(moptions_.protection_bytes_per_key),
745
0
                checksum_ptr);
746
0
  } else {
747
0
    kv_prot_info->Encode(
748
0
        static_cast<uint8_t>(moptions_.protection_bytes_per_key), checksum_ptr);
749
0
  }
750
0
}
751
752
Status MemTable::Add(SequenceNumber s, ValueType type,
753
                     const Slice& key, /* user key */
754
                     const Slice& value,
755
                     const ProtectionInfoKVOS64* kv_prot_info,
756
                     bool allow_concurrent,
757
2.18M
                     MemTablePostProcessInfo* post_process_info, void** hint) {
758
  // Format of an entry is concatenation of:
759
  //  key_size     : varint32 of internal_key.size()
760
  //  key bytes    : char[internal_key.size()]
761
  //  value_size   : varint32 of value.size()
762
  //  value bytes  : char[value.size()]
763
  //  checksum     : char[moptions_.protection_bytes_per_key]
764
2.18M
  uint32_t key_size = static_cast<uint32_t>(key.size());
765
2.18M
  uint32_t val_size = static_cast<uint32_t>(value.size());
766
2.18M
  uint32_t internal_key_size = key_size + 8;
767
2.18M
  const uint32_t encoded_len = VarintLength(internal_key_size) +
768
2.18M
                               internal_key_size + VarintLength(val_size) +
769
2.18M
                               val_size + moptions_.protection_bytes_per_key;
770
2.18M
  char* buf = nullptr;
771
2.18M
  std::unique_ptr<MemTableRep>& table =
772
2.18M
      type == kTypeRangeDeletion ? range_del_table_ : table_;
773
2.18M
  KeyHandle handle = table->Allocate(encoded_len, &buf);
774
775
2.18M
  char* p = EncodeVarint32(buf, internal_key_size);
776
2.18M
  memcpy(p, key.data(), key_size);
777
2.18M
  Slice key_slice(p, key_size);
778
2.18M
  p += key_size;
779
2.18M
  uint64_t packed = PackSequenceAndType(s, type);
780
2.18M
  EncodeFixed64(p, packed);
781
2.18M
  p += 8;
782
2.18M
  p = EncodeVarint32(p, val_size);
783
2.18M
  memcpy(p, value.data(), val_size);
784
2.18M
  assert((unsigned)(p + val_size - buf + moptions_.protection_bytes_per_key) ==
785
2.18M
         (unsigned)encoded_len);
786
787
2.18M
  UpdateEntryChecksum(kv_prot_info, key, value, type, s,
788
2.18M
                      buf + encoded_len - moptions_.protection_bytes_per_key);
789
2.18M
  Slice encoded(buf, encoded_len - moptions_.protection_bytes_per_key);
790
2.18M
  if (kv_prot_info != nullptr) {
791
1.09M
    TEST_SYNC_POINT_CALLBACK("MemTable::Add:Encoded", &encoded);
792
1.09M
    Status status = VerifyEncodedEntry(encoded, *kv_prot_info);
793
1.09M
    if (!status.ok()) {
794
0
      return status;
795
0
    }
796
1.09M
  }
797
798
2.18M
  Slice key_without_ts = StripTimestampFromUserKey(key, ts_sz_);
799
800
2.18M
  if (!allow_concurrent) {
801
    // Extract prefix for insert with hint. Hints are for point key table
802
    // (`table_`) only, not `range_del_table_`.
803
2.18M
    if (table == table_ && insert_with_hint_prefix_extractor_ != nullptr &&
804
2.18M
        insert_with_hint_prefix_extractor_->InDomain(key_slice)) {
805
0
      Slice prefix = insert_with_hint_prefix_extractor_->Transform(key_slice);
806
0
      bool res = table->InsertKeyWithHint(handle, &insert_hints_[prefix]);
807
0
      if (UNLIKELY(!res)) {
808
0
        return Status::TryAgain("key+seq exists");
809
0
      }
810
2.18M
    } else {
811
2.18M
      bool res = table->InsertKey(handle);
812
2.18M
      if (UNLIKELY(!res)) {
813
0
        return Status::TryAgain("key+seq exists");
814
0
      }
815
2.18M
    }
816
817
    // this is a bit ugly, but is the way to avoid locked instructions
818
    // when incrementing an atomic
819
2.18M
    num_entries_.store(num_entries_.load(std::memory_order_relaxed) + 1,
820
2.18M
                       std::memory_order_relaxed);
821
2.18M
    data_size_.store(data_size_.load(std::memory_order_relaxed) + encoded_len,
822
2.18M
                     std::memory_order_relaxed);
823
2.18M
    if (type == kTypeDeletion || type == kTypeSingleDeletion ||
824
2.18M
        type == kTypeDeletionWithTimestamp) {
825
256k
      num_deletes_.store(num_deletes_.load(std::memory_order_relaxed) + 1,
826
256k
                         std::memory_order_relaxed);
827
1.92M
    } else if (type == kTypeRangeDeletion) {
828
750k
      uint64_t val = num_range_deletes_.load(std::memory_order_relaxed) + 1;
829
750k
      num_range_deletes_.store(val, std::memory_order_relaxed);
830
750k
    }
831
832
2.18M
    if (bloom_filter_ && prefix_extractor_ &&
833
2.18M
        prefix_extractor_->InDomain(key_without_ts)) {
834
0
      bloom_filter_->Add(prefix_extractor_->Transform(key_without_ts));
835
0
    }
836
2.18M
    if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
837
0
      bloom_filter_->Add(key_without_ts);
838
0
    }
839
840
    // The first sequence number inserted into the memtable
841
2.18M
    assert(first_seqno_ == 0 || s >= first_seqno_);
842
2.18M
    if (first_seqno_ == 0) {
843
10.9k
      first_seqno_.store(s, std::memory_order_relaxed);
844
845
10.9k
      if (earliest_seqno_ == kMaxSequenceNumber) {
846
0
        earliest_seqno_.store(GetFirstSequenceNumber(),
847
0
                              std::memory_order_relaxed);
848
0
      }
849
10.9k
      assert(first_seqno_.load() >= earliest_seqno_.load());
850
10.9k
    }
851
2.18M
    assert(post_process_info == nullptr);
852
    // TODO(yuzhangyu): support updating newest UDT for when `allow_concurrent`
853
    // is true.
854
2.18M
    MaybeUpdateNewestUDT(key_slice);
855
2.18M
    UpdateFlushState();
856
2.18M
  } else {
857
0
    bool res = (hint == nullptr)
858
0
                   ? table->InsertKeyConcurrently(handle)
859
0
                   : table->InsertKeyWithHintConcurrently(handle, hint);
860
0
    if (UNLIKELY(!res)) {
861
0
      return Status::TryAgain("key+seq exists");
862
0
    }
863
864
0
    assert(post_process_info != nullptr);
865
0
    post_process_info->num_entries++;
866
0
    post_process_info->data_size += encoded_len;
867
0
    if (type == kTypeDeletion) {
868
0
      post_process_info->num_deletes++;
869
0
    }
870
871
0
    if (bloom_filter_ && prefix_extractor_ &&
872
0
        prefix_extractor_->InDomain(key_without_ts)) {
873
0
      bloom_filter_->AddConcurrently(
874
0
          prefix_extractor_->Transform(key_without_ts));
875
0
    }
876
0
    if (bloom_filter_ && moptions_.memtable_whole_key_filtering) {
877
0
      bloom_filter_->AddConcurrently(key_without_ts);
878
0
    }
879
880
    // atomically update first_seqno_ and earliest_seqno_.
881
0
    uint64_t cur_seq_num = first_seqno_.load(std::memory_order_relaxed);
882
0
    while ((cur_seq_num == 0 || s < cur_seq_num) &&
883
0
           !first_seqno_.compare_exchange_weak(cur_seq_num, s)) {
884
0
    }
885
0
    uint64_t cur_earliest_seqno =
886
0
        earliest_seqno_.load(std::memory_order_relaxed);
887
0
    while (
888
0
        (cur_earliest_seqno == kMaxSequenceNumber || s < cur_earliest_seqno) &&
889
0
        !earliest_seqno_.compare_exchange_weak(cur_earliest_seqno, s)) {
890
0
    }
891
0
  }
892
2.18M
  if (type == kTypeRangeDeletion) {
893
750k
    auto new_cache = std::make_shared<FragmentedRangeTombstoneListCache>();
894
750k
    size_t size = cached_range_tombstone_.Size();
895
750k
    if (allow_concurrent) {
896
0
      post_process_info->num_range_deletes++;
897
0
      range_del_mutex_.lock();
898
0
    }
899
24.7M
    for (size_t i = 0; i < size; ++i) {
900
24.0M
      std::shared_ptr<FragmentedRangeTombstoneListCache>* local_cache_ref_ptr =
901
24.0M
          cached_range_tombstone_.AccessAtCore(i);
902
24.0M
      auto new_local_cache_ref = std::make_shared<
903
24.0M
          const std::shared_ptr<FragmentedRangeTombstoneListCache>>(new_cache);
904
      // It is okay for some reader to load old cache during invalidation as
905
      // the new sequence number is not published yet.
906
      // Each core will have a shared_ptr to a shared_ptr to the cached
907
      // fragmented range tombstones, so that ref count is maintianed locally
908
      // per-core using the per-core shared_ptr.
909
24.0M
      std::atomic_store_explicit(
910
24.0M
          local_cache_ref_ptr,
911
24.0M
          std::shared_ptr<FragmentedRangeTombstoneListCache>(
912
24.0M
              new_local_cache_ref, new_cache.get()),
913
24.0M
          std::memory_order_relaxed);
914
24.0M
    }
915
916
750k
    if (allow_concurrent) {
917
0
      range_del_mutex_.unlock();
918
0
    }
919
750k
    is_range_del_table_empty_.store(false, std::memory_order_relaxed);
920
750k
  }
921
2.18M
  UpdateOldestKeyTime();
922
923
2.18M
  TEST_SYNC_POINT_CALLBACK("MemTable::Add:BeforeReturn:Encoded", &encoded);
924
2.18M
  return Status::OK();
925
2.18M
}
926
927
// Callback from MemTable::Get()
928
namespace {
929
930
struct Saver {
931
  Status* status;
932
  const LookupKey* key;
933
  bool* found_final_value;  // Is value set correctly? Used by KeyMayExist
934
  bool* merge_in_progress;
935
  std::string* value;
936
  PinnableWideColumns* columns;
937
  SequenceNumber seq;
938
  std::string* timestamp;
939
  const MergeOperator* merge_operator;
940
  // the merge operations encountered;
941
  MergeContext* merge_context;
942
  SequenceNumber max_covering_tombstone_seq;
943
  MemTable* mem;
944
  Logger* logger;
945
  Statistics* statistics;
946
  bool inplace_update_support;
947
  bool do_merge;
948
  SystemClock* clock;
949
950
  ReadCallback* callback_;
951
  bool* is_blob_index;
952
  bool allow_data_in_errors;
953
  uint32_t protection_bytes_per_key;
954
399
  bool CheckCallback(SequenceNumber _seq) {
955
399
    if (callback_) {
956
0
      return callback_->IsVisible(_seq);
957
0
    }
958
399
    return true;
959
399
  }
960
};
961
}  // anonymous namespace
962
963
429
static bool SaveValue(void* arg, const char* entry) {
964
429
  Saver* s = static_cast<Saver*>(arg);
965
429
  assert(s != nullptr);
966
429
  assert(!s->value || !s->columns);
967
429
  assert(!*(s->found_final_value));
968
429
  assert(s->status->ok() || s->status->IsMergeInProgress());
969
970
429
  MergeContext* merge_context = s->merge_context;
971
429
  SequenceNumber max_covering_tombstone_seq = s->max_covering_tombstone_seq;
972
429
  const MergeOperator* merge_operator = s->merge_operator;
973
974
429
  assert(merge_context != nullptr);
975
976
  // Refer to comments under MemTable::Add() for entry format.
977
  // Check that it belongs to same user key.
978
429
  uint32_t key_length = 0;
979
429
  const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
980
429
  assert(key_length >= 8);
981
429
  Slice user_key_slice = Slice(key_ptr, key_length - 8);
982
429
  const Comparator* user_comparator =
983
429
      s->mem->GetInternalKeyComparator().user_comparator();
984
429
  size_t ts_sz = user_comparator->timestamp_size();
985
429
  if (ts_sz && s->timestamp && max_covering_tombstone_seq > 0) {
986
    // timestamp should already be set to range tombstone timestamp
987
0
    assert(s->timestamp->size() == ts_sz);
988
0
  }
989
429
  if (user_comparator->EqualWithoutTimestamp(user_key_slice,
990
429
                                             s->key->user_key())) {
991
    // Correct user key
992
399
    TEST_SYNC_POINT_CALLBACK("Memtable::SaveValue:Found:entry", &entry);
993
399
    std::optional<ReadLock> read_lock;
994
399
    if (s->inplace_update_support) {
995
0
      read_lock.emplace(s->mem->GetLock(s->key->user_key()));
996
0
    }
997
998
399
    if (s->protection_bytes_per_key > 0) {
999
0
      *(s->status) = MemTable::VerifyEntryChecksum(
1000
0
          entry, s->protection_bytes_per_key, s->allow_data_in_errors);
1001
0
      if (!s->status->ok()) {
1002
0
        *(s->found_final_value) = true;
1003
0
        ROCKS_LOG_ERROR(s->logger, "In SaveValue: %s", s->status->getState());
1004
        // Memtable entry corrupted
1005
0
        return false;
1006
0
      }
1007
0
    }
1008
1009
399
    const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1010
399
    ValueType type;
1011
399
    SequenceNumber seq;
1012
399
    UnPackSequenceAndType(tag, &seq, &type);
1013
    // If the value is not in the snapshot, skip it
1014
399
    if (!s->CheckCallback(seq)) {
1015
0
      return true;  // to continue to the next seq
1016
0
    }
1017
1018
399
    if (s->seq == kMaxSequenceNumber) {
1019
399
      s->seq = seq;
1020
399
      if (s->seq > max_covering_tombstone_seq) {
1021
399
        if (ts_sz && s->timestamp != nullptr) {
1022
          // `timestamp` was set to range tombstone's timestamp before
1023
          // `SaveValue` is ever called. This key has a higher sequence number
1024
          // than range tombstone, and is the key with the highest seqno across
1025
          // all keys with this user_key, so we update timestamp here.
1026
0
          Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
1027
0
          s->timestamp->assign(ts.data(), ts_sz);
1028
0
        }
1029
399
      } else {
1030
0
        s->seq = max_covering_tombstone_seq;
1031
0
      }
1032
399
    }
1033
1034
399
    if (ts_sz > 0 && s->timestamp != nullptr) {
1035
0
      if (!s->timestamp->empty()) {
1036
0
        assert(ts_sz == s->timestamp->size());
1037
0
      }
1038
      // TODO optimize for smaller size ts
1039
0
      const std::string kMaxTs(ts_sz, '\xff');
1040
0
      if (s->timestamp->empty() ||
1041
0
          user_comparator->CompareTimestamp(*(s->timestamp), kMaxTs) == 0) {
1042
0
        Slice ts = ExtractTimestampFromUserKey(user_key_slice, ts_sz);
1043
0
        s->timestamp->assign(ts.data(), ts_sz);
1044
0
      }
1045
0
    }
1046
1047
399
    if ((type == kTypeValue || type == kTypeMerge || type == kTypeBlobIndex ||
1048
399
         type == kTypeWideColumnEntity || type == kTypeDeletion ||
1049
399
         type == kTypeSingleDeletion || type == kTypeDeletionWithTimestamp ||
1050
399
         type == kTypeValuePreferredSeqno) &&
1051
399
        max_covering_tombstone_seq > seq) {
1052
0
      type = kTypeRangeDeletion;
1053
0
    }
1054
399
    switch (type) {
1055
0
      case kTypeBlobIndex: {
1056
0
        if (!s->do_merge) {
1057
0
          *(s->status) = Status::NotSupported(
1058
0
              "GetMergeOperands not supported by stacked BlobDB");
1059
0
          *(s->found_final_value) = true;
1060
0
          return false;
1061
0
        }
1062
1063
0
        if (*(s->merge_in_progress)) {
1064
0
          *(s->status) = Status::NotSupported(
1065
0
              "Merge operator not supported by stacked BlobDB");
1066
0
          *(s->found_final_value) = true;
1067
0
          return false;
1068
0
        }
1069
1070
0
        if (s->is_blob_index == nullptr) {
1071
0
          ROCKS_LOG_ERROR(s->logger, "Encountered unexpected blob index.");
1072
0
          *(s->status) = Status::NotSupported(
1073
0
              "Encountered unexpected blob index. Please open DB with "
1074
0
              "ROCKSDB_NAMESPACE::blob_db::BlobDB.");
1075
0
          *(s->found_final_value) = true;
1076
0
          return false;
1077
0
        }
1078
1079
0
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1080
1081
0
        *(s->status) = Status::OK();
1082
1083
0
        if (s->value) {
1084
0
          s->value->assign(v.data(), v.size());
1085
0
        } else if (s->columns) {
1086
0
          s->columns->SetPlainValue(v);
1087
0
        }
1088
1089
0
        *(s->found_final_value) = true;
1090
0
        *(s->is_blob_index) = true;
1091
1092
0
        return false;
1093
0
      }
1094
385
      case kTypeValue:
1095
385
      case kTypeValuePreferredSeqno: {
1096
385
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1097
1098
385
        if (type == kTypeValuePreferredSeqno) {
1099
0
          v = ParsePackedValueForValue(v);
1100
0
        }
1101
1102
385
        *(s->status) = Status::OK();
1103
1104
385
        if (!s->do_merge) {
1105
          // Preserve the value with the goal of returning it as part of
1106
          // raw merge operands to the user
1107
          // TODO(yanqin) update MergeContext so that timestamps information
1108
          // can also be retained.
1109
1110
0
          merge_context->PushOperand(
1111
0
              v, s->inplace_update_support == false /* operand_pinned */);
1112
385
        } else if (*(s->merge_in_progress)) {
1113
0
          assert(s->do_merge);
1114
1115
0
          if (s->value || s->columns) {
1116
            // `op_failure_scope` (an output parameter) is not provided (set to
1117
            // nullptr) since a failure must be propagated regardless of its
1118
            // value.
1119
0
            *(s->status) = MergeHelper::TimedFullMerge(
1120
0
                merge_operator, s->key->user_key(),
1121
0
                MergeHelper::kPlainBaseValue, v, merge_context->GetOperands(),
1122
0
                s->logger, s->statistics, s->clock,
1123
0
                /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
1124
0
                s->value, s->columns);
1125
0
          }
1126
385
        } else if (s->value) {
1127
385
          s->value->assign(v.data(), v.size());
1128
385
        } else if (s->columns) {
1129
0
          s->columns->SetPlainValue(v);
1130
0
        }
1131
1132
385
        *(s->found_final_value) = true;
1133
1134
385
        if (s->is_blob_index != nullptr) {
1135
0
          *(s->is_blob_index) = false;
1136
0
        }
1137
1138
385
        return false;
1139
385
      }
1140
0
      case kTypeWideColumnEntity: {
1141
0
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1142
1143
0
        *(s->status) = Status::OK();
1144
1145
0
        if (!s->do_merge) {
1146
          // Preserve the value with the goal of returning it as part of
1147
          // raw merge operands to the user
1148
1149
0
          Slice value_of_default;
1150
0
          *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
1151
0
              v, value_of_default);
1152
1153
0
          if (s->status->ok()) {
1154
0
            merge_context->PushOperand(
1155
0
                value_of_default,
1156
0
                s->inplace_update_support == false /* operand_pinned */);
1157
0
          }
1158
0
        } else if (*(s->merge_in_progress)) {
1159
0
          assert(s->do_merge);
1160
1161
0
          if (s->value || s->columns) {
1162
            // `op_failure_scope` (an output parameter) is not provided (set
1163
            // to nullptr) since a failure must be propagated regardless of
1164
            // its value.
1165
0
            *(s->status) = MergeHelper::TimedFullMerge(
1166
0
                merge_operator, s->key->user_key(), MergeHelper::kWideBaseValue,
1167
0
                v, merge_context->GetOperands(), s->logger, s->statistics,
1168
0
                s->clock, /* update_num_ops_stats */ true,
1169
0
                /* op_failure_scope */ nullptr, s->value, s->columns);
1170
0
          }
1171
0
        } else if (s->value) {
1172
0
          Slice value_of_default;
1173
0
          *(s->status) = WideColumnSerialization::GetValueOfDefaultColumn(
1174
0
              v, value_of_default);
1175
0
          if (s->status->ok()) {
1176
0
            s->value->assign(value_of_default.data(), value_of_default.size());
1177
0
          }
1178
0
        } else if (s->columns) {
1179
0
          *(s->status) = s->columns->SetWideColumnValue(v);
1180
0
        }
1181
1182
0
        *(s->found_final_value) = true;
1183
1184
0
        if (s->is_blob_index != nullptr) {
1185
0
          *(s->is_blob_index) = false;
1186
0
        }
1187
1188
0
        return false;
1189
385
      }
1190
14
      case kTypeDeletion:
1191
14
      case kTypeDeletionWithTimestamp:
1192
14
      case kTypeSingleDeletion:
1193
14
      case kTypeRangeDeletion: {
1194
14
        if (*(s->merge_in_progress)) {
1195
0
          if (s->value || s->columns) {
1196
            // `op_failure_scope` (an output parameter) is not provided (set to
1197
            // nullptr) since a failure must be propagated regardless of its
1198
            // value.
1199
0
            *(s->status) = MergeHelper::TimedFullMerge(
1200
0
                merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue,
1201
0
                merge_context->GetOperands(), s->logger, s->statistics,
1202
0
                s->clock, /* update_num_ops_stats */ true,
1203
0
                /* op_failure_scope */ nullptr, s->value, s->columns);
1204
0
          } else {
1205
            // We have found a final value (a base deletion) and have newer
1206
            // merge operands that we do not intend to merge. Nothing remains
1207
            // to be done so assign status to OK.
1208
0
            *(s->status) = Status::OK();
1209
0
          }
1210
14
        } else {
1211
14
          *(s->status) = Status::NotFound();
1212
14
        }
1213
14
        *(s->found_final_value) = true;
1214
14
        return false;
1215
14
      }
1216
0
      case kTypeMerge: {
1217
0
        if (!merge_operator) {
1218
0
          *(s->status) = Status::InvalidArgument(
1219
0
              "merge_operator is not properly initialized.");
1220
          // Normally we continue the loop (return true) when we see a merge
1221
          // operand.  But in case of an error, we should stop the loop
1222
          // immediately and pretend we have found the value to stop further
1223
          // seek.  Otherwise, the later call will override this error status.
1224
0
          *(s->found_final_value) = true;
1225
0
          return false;
1226
0
        }
1227
0
        Slice v = GetLengthPrefixedSlice(key_ptr + key_length);
1228
0
        *(s->merge_in_progress) = true;
1229
0
        merge_context->PushOperand(
1230
0
            v, s->inplace_update_support == false /* operand_pinned */);
1231
0
        PERF_COUNTER_ADD(internal_merge_point_lookup_count, 1);
1232
1233
0
        if (s->do_merge && merge_operator->ShouldMerge(
1234
0
                               merge_context->GetOperandsDirectionBackward())) {
1235
0
          if (s->value || s->columns) {
1236
            // `op_failure_scope` (an output parameter) is not provided (set to
1237
            // nullptr) since a failure must be propagated regardless of its
1238
            // value.
1239
0
            *(s->status) = MergeHelper::TimedFullMerge(
1240
0
                merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue,
1241
0
                merge_context->GetOperands(), s->logger, s->statistics,
1242
0
                s->clock, /* update_num_ops_stats */ true,
1243
0
                /* op_failure_scope */ nullptr, s->value, s->columns);
1244
0
          }
1245
1246
0
          *(s->found_final_value) = true;
1247
0
          return false;
1248
0
        }
1249
0
        if (merge_context->get_merge_operands_options != nullptr &&
1250
0
            merge_context->get_merge_operands_options->continue_cb != nullptr &&
1251
0
            !merge_context->get_merge_operands_options->continue_cb(v)) {
1252
          // We were told not to continue.
1253
0
          *(s->found_final_value) = true;
1254
0
          return false;
1255
0
        }
1256
1257
0
        return true;
1258
0
      }
1259
0
      default: {
1260
0
        std::string msg("Corrupted value not expected.");
1261
0
        if (s->allow_data_in_errors) {
1262
0
          msg.append("Unrecognized value type: " +
1263
0
                     std::to_string(static_cast<int>(type)) + ". ");
1264
0
          msg.append("User key: " + user_key_slice.ToString(/*hex=*/true) +
1265
0
                     ". ");
1266
0
          msg.append("seq: " + std::to_string(seq) + ".");
1267
0
        }
1268
0
        *(s->found_final_value) = true;
1269
0
        *(s->status) = Status::Corruption(msg.c_str());
1270
0
        return false;
1271
0
      }
1272
399
    }
1273
399
  }
1274
1275
  // s->state could be Corrupt, merge or notfound
1276
30
  return false;
1277
429
}
1278
1279
bool MemTable::Get(const LookupKey& key, std::string* value,
1280
                   PinnableWideColumns* columns, std::string* timestamp,
1281
                   Status* s, MergeContext* merge_context,
1282
                   SequenceNumber* max_covering_tombstone_seq,
1283
                   SequenceNumber* seq, const ReadOptions& read_opts,
1284
                   bool immutable_memtable, ReadCallback* callback,
1285
584
                   bool* is_blob_index, bool do_merge) {
1286
  // The sequence number is updated synchronously in version_set.h
1287
584
  if (IsEmpty()) {
1288
    // Avoiding recording stats for speed.
1289
155
    return false;
1290
155
  }
1291
1292
429
  PERF_TIMER_GUARD(get_from_memtable_time);
1293
1294
429
  std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
1295
429
      NewRangeTombstoneIterator(read_opts,
1296
429
                                GetInternalKeySeqno(key.internal_key()),
1297
429
                                immutable_memtable));
1298
429
  if (range_del_iter != nullptr) {
1299
0
    SequenceNumber covering_seq =
1300
0
        range_del_iter->MaxCoveringTombstoneSeqnum(key.user_key());
1301
0
    if (covering_seq > *max_covering_tombstone_seq) {
1302
0
      *max_covering_tombstone_seq = covering_seq;
1303
0
      if (timestamp) {
1304
        // Will be overwritten in SaveValue() if there is a point key with
1305
        // a higher seqno.
1306
0
        timestamp->assign(range_del_iter->timestamp().data(),
1307
0
                          range_del_iter->timestamp().size());
1308
0
      }
1309
0
    }
1310
0
  }
1311
1312
429
  bool found_final_value = false;
1313
429
  bool merge_in_progress = s->IsMergeInProgress();
1314
429
  bool may_contain = true;
1315
429
  Slice user_key_without_ts = StripTimestampFromUserKey(key.user_key(), ts_sz_);
1316
429
  bool bloom_checked = false;
1317
429
  if (bloom_filter_) {
1318
    // when both memtable_whole_key_filtering and prefix_extractor_ are set,
1319
    // only do whole key filtering for Get() to save CPU
1320
0
    if (moptions_.memtable_whole_key_filtering) {
1321
0
      may_contain = bloom_filter_->MayContain(user_key_without_ts);
1322
0
      bloom_checked = true;
1323
0
    } else {
1324
0
      assert(prefix_extractor_);
1325
0
      if (prefix_extractor_->InDomain(user_key_without_ts)) {
1326
0
        may_contain = bloom_filter_->MayContain(
1327
0
            prefix_extractor_->Transform(user_key_without_ts));
1328
0
        bloom_checked = true;
1329
0
      }
1330
0
    }
1331
0
  }
1332
1333
429
  if (bloom_filter_ && !may_contain) {
1334
    // iter is null if prefix bloom says the key does not exist
1335
0
    PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
1336
0
    *seq = kMaxSequenceNumber;
1337
429
  } else {
1338
429
    if (bloom_checked) {
1339
0
      PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
1340
0
    }
1341
429
    GetFromTable(key, *max_covering_tombstone_seq, do_merge, callback,
1342
429
                 is_blob_index, value, columns, timestamp, s, merge_context,
1343
429
                 seq, &found_final_value, &merge_in_progress);
1344
429
  }
1345
1346
  // No change to value, since we have not yet found a Put/Delete
1347
  // Propagate corruption error
1348
429
  if (!found_final_value && merge_in_progress) {
1349
0
    if (s->ok()) {
1350
0
      *s = Status::MergeInProgress();
1351
0
    } else {
1352
0
      assert(s->IsMergeInProgress());
1353
0
    }
1354
0
  }
1355
429
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
1356
429
  return found_final_value;
1357
584
}
1358
1359
void MemTable::GetFromTable(const LookupKey& key,
1360
                            SequenceNumber max_covering_tombstone_seq,
1361
                            bool do_merge, ReadCallback* callback,
1362
                            bool* is_blob_index, std::string* value,
1363
                            PinnableWideColumns* columns,
1364
                            std::string* timestamp, Status* s,
1365
                            MergeContext* merge_context, SequenceNumber* seq,
1366
429
                            bool* found_final_value, bool* merge_in_progress) {
1367
429
  Saver saver;
1368
429
  saver.status = s;
1369
429
  saver.found_final_value = found_final_value;
1370
429
  saver.merge_in_progress = merge_in_progress;
1371
429
  saver.key = &key;
1372
429
  saver.value = value;
1373
429
  saver.columns = columns;
1374
429
  saver.timestamp = timestamp;
1375
429
  saver.seq = kMaxSequenceNumber;
1376
429
  saver.mem = this;
1377
429
  saver.merge_context = merge_context;
1378
429
  saver.max_covering_tombstone_seq = max_covering_tombstone_seq;
1379
429
  saver.merge_operator = moptions_.merge_operator;
1380
429
  saver.logger = moptions_.info_log;
1381
429
  saver.inplace_update_support = moptions_.inplace_update_support;
1382
429
  saver.statistics = moptions_.statistics;
1383
429
  saver.clock = clock_;
1384
429
  saver.callback_ = callback;
1385
429
  saver.is_blob_index = is_blob_index;
1386
429
  saver.do_merge = do_merge;
1387
429
  saver.allow_data_in_errors = moptions_.allow_data_in_errors;
1388
429
  saver.protection_bytes_per_key = moptions_.protection_bytes_per_key;
1389
1390
429
  if (!moptions_.paranoid_memory_checks) {
1391
429
    table_->Get(key, &saver, SaveValue);
1392
429
  } else {
1393
0
    Status check_s = table_->GetAndValidate(key, &saver, SaveValue,
1394
0
                                            moptions_.allow_data_in_errors);
1395
0
    if (check_s.IsCorruption()) {
1396
0
      *(saver.status) = check_s;
1397
      // Should stop searching the LSM.
1398
0
      *(saver.found_final_value) = true;
1399
0
    }
1400
0
  }
1401
429
  assert(s->ok() || s->IsMergeInProgress() || *found_final_value);
1402
429
  *seq = saver.seq;
1403
429
}
1404
1405
void MemTable::MultiGet(const ReadOptions& read_options, MultiGetRange* range,
1406
0
                        ReadCallback* callback, bool immutable_memtable) {
1407
  // The sequence number is updated synchronously in version_set.h
1408
0
  if (IsEmpty()) {
1409
    // Avoiding recording stats for speed.
1410
0
    return;
1411
0
  }
1412
0
  PERF_TIMER_GUARD(get_from_memtable_time);
1413
1414
  // For now, memtable Bloom filter is effectively disabled if there are any
1415
  // range tombstones. This is the simplest way to ensure range tombstones are
1416
  // handled. TODO: allow Bloom checks where max_covering_tombstone_seq==0
1417
0
  bool no_range_del = read_options.ignore_range_deletions ||
1418
0
                      is_range_del_table_empty_.load(std::memory_order_relaxed);
1419
0
  MultiGetRange temp_range(*range, range->begin(), range->end());
1420
0
  if (bloom_filter_ && no_range_del) {
1421
0
    bool whole_key =
1422
0
        !prefix_extractor_ || moptions_.memtable_whole_key_filtering;
1423
0
    std::array<Slice, MultiGetContext::MAX_BATCH_SIZE> bloom_keys;
1424
0
    std::array<bool, MultiGetContext::MAX_BATCH_SIZE> may_match;
1425
0
    std::array<size_t, MultiGetContext::MAX_BATCH_SIZE> range_indexes;
1426
0
    int num_keys = 0;
1427
0
    for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
1428
0
      if (whole_key) {
1429
0
        bloom_keys[num_keys] = iter->ukey_without_ts;
1430
0
        range_indexes[num_keys++] = iter.index();
1431
0
      } else if (prefix_extractor_->InDomain(iter->ukey_without_ts)) {
1432
0
        bloom_keys[num_keys] =
1433
0
            prefix_extractor_->Transform(iter->ukey_without_ts);
1434
0
        range_indexes[num_keys++] = iter.index();
1435
0
      }
1436
0
    }
1437
0
    bloom_filter_->MayContain(num_keys, bloom_keys.data(), may_match.data());
1438
0
    for (int i = 0; i < num_keys; ++i) {
1439
0
      if (!may_match[i]) {
1440
0
        temp_range.SkipIndex(range_indexes[i]);
1441
0
        PERF_COUNTER_ADD(bloom_memtable_miss_count, 1);
1442
0
      } else {
1443
0
        PERF_COUNTER_ADD(bloom_memtable_hit_count, 1);
1444
0
      }
1445
0
    }
1446
0
  }
1447
0
  for (auto iter = temp_range.begin(); iter != temp_range.end(); ++iter) {
1448
0
    bool found_final_value{false};
1449
0
    bool merge_in_progress = iter->s->IsMergeInProgress();
1450
0
    if (!no_range_del) {
1451
0
      std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter(
1452
0
          NewRangeTombstoneIteratorInternal(
1453
0
              read_options, GetInternalKeySeqno(iter->lkey->internal_key()),
1454
0
              immutable_memtable));
1455
0
      SequenceNumber covering_seq =
1456
0
          range_del_iter->MaxCoveringTombstoneSeqnum(iter->lkey->user_key());
1457
0
      if (covering_seq > iter->max_covering_tombstone_seq) {
1458
0
        iter->max_covering_tombstone_seq = covering_seq;
1459
0
        if (iter->timestamp) {
1460
          // Will be overwritten in SaveValue() if there is a point key with
1461
          // a higher seqno.
1462
0
          iter->timestamp->assign(range_del_iter->timestamp().data(),
1463
0
                                  range_del_iter->timestamp().size());
1464
0
        }
1465
0
      }
1466
0
    }
1467
0
    SequenceNumber dummy_seq;
1468
0
    GetFromTable(*(iter->lkey), iter->max_covering_tombstone_seq, true,
1469
0
                 callback, &iter->is_blob_index,
1470
0
                 iter->value ? iter->value->GetSelf() : nullptr, iter->columns,
1471
0
                 iter->timestamp, iter->s, &(iter->merge_context), &dummy_seq,
1472
0
                 &found_final_value, &merge_in_progress);
1473
1474
0
    if (!found_final_value && merge_in_progress) {
1475
0
      if (iter->s->ok()) {
1476
0
        *(iter->s) = Status::MergeInProgress();
1477
0
      } else {
1478
0
        assert(iter->s->IsMergeInProgress());
1479
0
      }
1480
0
    }
1481
1482
0
    if (found_final_value ||
1483
0
        (!iter->s->ok() && !iter->s->IsMergeInProgress())) {
1484
      // `found_final_value` should be set if an error/corruption occurs.
1485
      // The check on iter->s is just there in case GetFromTable() did not
1486
      // set `found_final_value` properly.
1487
0
      assert(found_final_value);
1488
0
      if (iter->value) {
1489
0
        iter->value->PinSelf();
1490
0
        range->AddValueSize(iter->value->size());
1491
0
      } else {
1492
0
        assert(iter->columns);
1493
0
        range->AddValueSize(iter->columns->serialized_size());
1494
0
      }
1495
1496
0
      range->MarkKeyDone(iter);
1497
0
      RecordTick(moptions_.statistics, MEMTABLE_HIT);
1498
0
      if (range->GetValueSize() > read_options.value_size_soft_limit) {
1499
        // Set all remaining keys in range to Abort
1500
0
        for (auto range_iter = range->begin(); range_iter != range->end();
1501
0
             ++range_iter) {
1502
0
          range->MarkKeyDone(range_iter);
1503
0
          *(range_iter->s) = Status::Aborted();
1504
0
        }
1505
0
        break;
1506
0
      }
1507
0
    }
1508
0
  }
1509
0
  PERF_COUNTER_ADD(get_from_memtable_count, 1);
1510
0
}
1511
1512
Status MemTable::Update(SequenceNumber seq, ValueType value_type,
1513
                        const Slice& key, const Slice& value,
1514
0
                        const ProtectionInfoKVOS64* kv_prot_info) {
1515
0
  LookupKey lkey(key, seq);
1516
0
  Slice mem_key = lkey.memtable_key();
1517
1518
0
  std::unique_ptr<MemTableRep::Iterator> iter(
1519
0
      table_->GetDynamicPrefixIterator());
1520
0
  iter->Seek(lkey.internal_key(), mem_key.data());
1521
1522
0
  if (iter->Valid()) {
1523
    // Refer to comments under MemTable::Add() for entry format.
1524
    // Check that it belongs to same user key.  We do not check the
1525
    // sequence number since the Seek() call above should have skipped
1526
    // all entries with overly large sequence numbers.
1527
0
    const char* entry = iter->key();
1528
0
    uint32_t key_length = 0;
1529
0
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1530
0
    if (comparator_.comparator.user_comparator()->Equal(
1531
0
            Slice(key_ptr, key_length - 8), lkey.user_key())) {
1532
      // Correct user key
1533
0
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1534
0
      ValueType type;
1535
0
      SequenceNumber existing_seq;
1536
0
      UnPackSequenceAndType(tag, &existing_seq, &type);
1537
0
      assert(existing_seq != seq);
1538
0
      if (type == value_type) {
1539
0
        Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
1540
0
        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1541
0
        uint32_t new_size = static_cast<uint32_t>(value.size());
1542
1543
        // Update value, if new value size  <= previous value size
1544
0
        if (new_size <= prev_size) {
1545
0
          WriteLock wl(GetLock(lkey.user_key()));
1546
0
          char* p =
1547
0
              EncodeVarint32(const_cast<char*>(key_ptr) + key_length, new_size);
1548
0
          memcpy(p, value.data(), value.size());
1549
0
          assert((unsigned)((p + value.size()) - entry) ==
1550
0
                 (unsigned)(VarintLength(key_length) + key_length +
1551
0
                            VarintLength(value.size()) + value.size()));
1552
0
          RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
1553
0
          if (kv_prot_info != nullptr) {
1554
0
            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1555
            // `seq` is swallowed and `existing_seq` prevails.
1556
0
            updated_kv_prot_info.UpdateS(seq, existing_seq);
1557
0
            UpdateEntryChecksum(&updated_kv_prot_info, key, value, type,
1558
0
                                existing_seq, p + value.size());
1559
0
            Slice encoded(entry, p + value.size() - entry);
1560
0
            return VerifyEncodedEntry(encoded, updated_kv_prot_info);
1561
0
          } else {
1562
0
            UpdateEntryChecksum(nullptr, key, value, type, existing_seq,
1563
0
                                p + value.size());
1564
0
          }
1565
0
          return Status::OK();
1566
0
        }
1567
0
      }
1568
0
    }
1569
0
  }
1570
1571
  // The latest value is not value_type or key doesn't exist
1572
0
  return Add(seq, value_type, key, value, kv_prot_info);
1573
0
}
1574
1575
Status MemTable::UpdateCallback(SequenceNumber seq, const Slice& key,
1576
                                const Slice& delta,
1577
0
                                const ProtectionInfoKVOS64* kv_prot_info) {
1578
0
  LookupKey lkey(key, seq);
1579
0
  Slice memkey = lkey.memtable_key();
1580
1581
0
  std::unique_ptr<MemTableRep::Iterator> iter(
1582
0
      table_->GetDynamicPrefixIterator());
1583
0
  iter->Seek(lkey.internal_key(), memkey.data());
1584
1585
0
  if (iter->Valid()) {
1586
    // Refer to comments under MemTable::Add() for entry format.
1587
    // Check that it belongs to same user key.  We do not check the
1588
    // sequence number since the Seek() call above should have skipped
1589
    // all entries with overly large sequence numbers.
1590
0
    const char* entry = iter->key();
1591
0
    uint32_t key_length = 0;
1592
0
    const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1593
0
    if (comparator_.comparator.user_comparator()->Equal(
1594
0
            Slice(key_ptr, key_length - 8), lkey.user_key())) {
1595
      // Correct user key
1596
0
      const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8);
1597
0
      ValueType type;
1598
0
      uint64_t existing_seq;
1599
0
      UnPackSequenceAndType(tag, &existing_seq, &type);
1600
0
      if (type == kTypeValue) {
1601
0
        Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
1602
0
        uint32_t prev_size = static_cast<uint32_t>(prev_value.size());
1603
1604
0
        char* prev_buffer = const_cast<char*>(prev_value.data());
1605
0
        uint32_t new_prev_size = prev_size;
1606
1607
0
        std::string str_value;
1608
0
        WriteLock wl(GetLock(lkey.user_key()));
1609
0
        auto status = moptions_.inplace_callback(prev_buffer, &new_prev_size,
1610
0
                                                 delta, &str_value);
1611
0
        if (status == UpdateStatus::UPDATED_INPLACE) {
1612
          // Value already updated by callback.
1613
0
          assert(new_prev_size <= prev_size);
1614
0
          if (new_prev_size < prev_size) {
1615
            // overwrite the new prev_size
1616
0
            char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
1617
0
                                     new_prev_size);
1618
0
            if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
1619
              // shift the value buffer as well.
1620
0
              memcpy(p, prev_buffer, new_prev_size);
1621
0
              prev_buffer = p;
1622
0
            }
1623
0
          }
1624
0
          RecordTick(moptions_.statistics, NUMBER_KEYS_UPDATED);
1625
0
          UpdateFlushState();
1626
0
          Slice new_value(prev_buffer, new_prev_size);
1627
0
          if (kv_prot_info != nullptr) {
1628
0
            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1629
            // `seq` is swallowed and `existing_seq` prevails.
1630
0
            updated_kv_prot_info.UpdateS(seq, existing_seq);
1631
0
            updated_kv_prot_info.UpdateV(delta, new_value);
1632
0
            Slice encoded(entry, prev_buffer + new_prev_size - entry);
1633
0
            UpdateEntryChecksum(&updated_kv_prot_info, key, new_value, type,
1634
0
                                existing_seq, prev_buffer + new_prev_size);
1635
0
            return VerifyEncodedEntry(encoded, updated_kv_prot_info);
1636
0
          } else {
1637
0
            UpdateEntryChecksum(nullptr, key, new_value, type, existing_seq,
1638
0
                                prev_buffer + new_prev_size);
1639
0
          }
1640
0
          return Status::OK();
1641
0
        } else if (status == UpdateStatus::UPDATED) {
1642
0
          Status s;
1643
0
          if (kv_prot_info != nullptr) {
1644
0
            ProtectionInfoKVOS64 updated_kv_prot_info(*kv_prot_info);
1645
0
            updated_kv_prot_info.UpdateV(delta, str_value);
1646
0
            s = Add(seq, kTypeValue, key, Slice(str_value),
1647
0
                    &updated_kv_prot_info);
1648
0
          } else {
1649
0
            s = Add(seq, kTypeValue, key, Slice(str_value),
1650
0
                    nullptr /* kv_prot_info */);
1651
0
          }
1652
0
          RecordTick(moptions_.statistics, NUMBER_KEYS_WRITTEN);
1653
0
          UpdateFlushState();
1654
0
          return s;
1655
0
        } else if (status == UpdateStatus::UPDATE_FAILED) {
1656
          // `UPDATE_FAILED` is named incorrectly. It indicates no update
1657
          // happened. It does not indicate a failure happened.
1658
0
          UpdateFlushState();
1659
0
          return Status::OK();
1660
0
        }
1661
0
      }
1662
0
    }
1663
0
  }
1664
  // The latest value is not `kTypeValue` or key doesn't exist
1665
0
  return Status::NotFound();
1666
0
}
1667
1668
size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key,
1669
0
                                             size_t limit) {
1670
0
  Slice memkey = key.memtable_key();
1671
1672
  // A total ordered iterator is costly for some memtablerep (prefix aware
1673
  // reps). By passing in the user key, we allow efficient iterator creation.
1674
  // The iterator only needs to be ordered within the same user key.
1675
0
  std::unique_ptr<MemTableRep::Iterator> iter(
1676
0
      table_->GetDynamicPrefixIterator());
1677
0
  iter->Seek(key.internal_key(), memkey.data());
1678
1679
0
  size_t num_successive_merges = 0;
1680
1681
0
  for (; iter->Valid() && num_successive_merges < limit; iter->Next()) {
1682
0
    const char* entry = iter->key();
1683
0
    uint32_t key_length = 0;
1684
0
    const char* iter_key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length);
1685
0
    if (!comparator_.comparator.user_comparator()->Equal(
1686
0
            Slice(iter_key_ptr, key_length - 8), key.user_key())) {
1687
0
      break;
1688
0
    }
1689
1690
0
    const uint64_t tag = DecodeFixed64(iter_key_ptr + key_length - 8);
1691
0
    ValueType type;
1692
0
    uint64_t unused;
1693
0
    UnPackSequenceAndType(tag, &unused, &type);
1694
0
    if (type != kTypeMerge) {
1695
0
      break;
1696
0
    }
1697
1698
0
    ++num_successive_merges;
1699
0
  }
1700
1701
0
  return num_successive_merges;
1702
0
}
1703
1704
void MemTableRep::Get(const LookupKey& k, void* callback_args,
1705
0
                      bool (*callback_func)(void* arg, const char* entry)) {
1706
0
  auto iter = GetDynamicPrefixIterator();
1707
0
  for (iter->Seek(k.internal_key(), k.memtable_key().data());
1708
0
       iter->Valid() && callback_func(callback_args, iter->key());
1709
0
       iter->Next()) {
1710
0
  }
1711
0
}
1712
1713
0
void MemTable::RefLogContainingPrepSection(uint64_t log) {
1714
0
  assert(log > 0);
1715
0
  auto cur = min_prep_log_referenced_.load();
1716
0
  while ((log < cur || cur == 0) &&
1717
0
         !min_prep_log_referenced_.compare_exchange_strong(cur, log)) {
1718
0
    cur = min_prep_log_referenced_.load();
1719
0
  }
1720
0
}
1721
1722
0
uint64_t MemTable::GetMinLogContainingPrepSection() {
1723
0
  return min_prep_log_referenced_.load();
1724
0
}
1725
1726
2.18M
void MemTable::MaybeUpdateNewestUDT(const Slice& user_key) {
1727
2.18M
  if (ts_sz_ == 0 || persist_user_defined_timestamps_) {
1728
2.18M
    return;
1729
2.18M
  }
1730
0
  const Comparator* ucmp = GetInternalKeyComparator().user_comparator();
1731
0
  Slice udt = ExtractTimestampFromUserKey(user_key, ts_sz_);
1732
0
  if (newest_udt_.empty() || ucmp->CompareTimestamp(udt, newest_udt_) > 0) {
1733
0
    newest_udt_ = udt;
1734
0
  }
1735
0
}
1736
1737
0
const Slice& MemTable::GetNewestUDT() const {
1738
  // This path should not be invoked for MemTables that does not enable the UDT
1739
  // in Memtable only feature.
1740
0
  assert(ts_sz_ > 0 && !persist_user_defined_timestamps_);
1741
0
  return newest_udt_;
1742
0
}
1743
1744
}  // namespace ROCKSDB_NAMESPACE