Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/memtable/wbwi_memtable.cc
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "memtable/wbwi_memtable.h"
7
8
#include "db/memtable.h"
9
10
namespace ROCKSDB_NAMESPACE {
11
12
const std::unordered_map<WriteType, ValueType>
13
    WBWIMemTableIterator::WriteTypeToValueTypeMap = {
14
        {kPutRecord, kTypeValue},
15
        {kMergeRecord, kTypeMerge},
16
        {kDeleteRecord, kTypeDeletion},
17
        {kSingleDeleteRecord, kTypeSingleDeletion},
18
        {kDeleteRangeRecord, kTypeRangeDeletion},
19
        {kPutEntityRecord, kTypeWideColumnEntity},
20
        // Only the above record types are added to WBWI.
21
        // kLogDataRecord, kXIDRecord, kUnknownRecord
22
};
23
24
InternalIterator* WBWIMemTable::NewIterator(
25
    const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena,
26
0
    const SliceTransform* /* prefix_extractor */, bool for_flush) {
27
  // Ingested WBWIMemTable should have an assigned seqno
28
0
  assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
29
0
  assert(assigned_seqno_.lower_bound != kMaxSequenceNumber);
30
0
  assert(arena);
31
0
  auto mem = arena->AllocateAligned(sizeof(WBWIMemTableIterator));
32
0
  return new (mem) WBWIMemTableIterator(
33
0
      std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)),
34
0
      assigned_seqno_, comparator_, for_flush);
35
0
}
36
37
0
inline InternalIterator* WBWIMemTable::NewIterator() const {
38
0
  assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
39
0
  assert(assigned_seqno_.lower_bound != kMaxSequenceNumber);
40
0
  return new WBWIMemTableIterator(
41
0
      std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)),
42
0
      assigned_seqno_, comparator_, /*for_flush=*/false);
43
0
}
44
45
bool WBWIMemTable::Get(const LookupKey& key, std::string* value,
46
                       PinnableWideColumns* columns, std::string* timestamp,
47
                       Status* s, MergeContext* merge_context,
48
                       SequenceNumber* max_covering_tombstone_seq,
49
                       SequenceNumber* out_seq, const ReadOptions&,
50
                       bool immutable_memtable, ReadCallback* callback,
51
                       bool* is_blob_index, bool do_merge,
52
0
                       const BlobFetcher* blob_fetcher) {
53
0
  assert(s->ok() || s->IsMergeInProgress());
54
0
  (void)immutable_memtable;
55
0
  (void)timestamp;
56
0
  (void)columns;
57
0
  (void)blob_fetcher;
58
0
  assert(immutable_memtable);
59
0
  assert(!timestamp);  // TODO: support UDT
60
  // IngestWriteBatchWithIndex() is rejected while any live column family has
61
  // blob direct write enabled, so WBWI should never need blob resolution.
62
0
  assert(blob_fetcher == nullptr);
63
0
  assert(assigned_seqno_.upper_bound != kMaxSequenceNumber);
64
0
  assert(assigned_seqno_.lower_bound != kMaxSequenceNumber);
65
  // WBWI does not support DeleteRange yet.
66
0
  assert(!wbwi_->GetWriteBatch()->HasDeleteRange());
67
0
  assert(merge_context);
68
69
0
  *out_seq = kMaxSequenceNumber;
70
0
  [[maybe_unused]] SequenceNumber read_seq =
71
0
      GetInternalKeySeqno(key.internal_key());
72
  // This is memtable is a single write batch, no snapshot can be taken within
73
  // assigned seqnos for this memtable.
74
0
  assert(read_seq >= assigned_seqno_.upper_bound ||
75
0
         read_seq < assigned_seqno_.lower_bound);
76
0
  std::unique_ptr<InternalIterator> iter{NewIterator()};
77
0
  iter->Seek(key.internal_key());
78
0
  const Slice lookup_user_key = key.user_key();
79
0
  bool merge_in_progress = s->IsMergeInProgress();
80
81
0
  while (iter->Valid() && comparator_->EqualWithoutTimestamp(
82
0
                              ExtractUserKey(iter->key()), lookup_user_key)) {
83
0
    uint64_t tag = ExtractInternalKeyFooter(iter->key());
84
0
    ValueType type;
85
0
    SequenceNumber seq;
86
0
    UnPackSequenceAndType(tag, &seq, &type);
87
    // Unsupported operations.
88
0
    assert(type != kTypeBlobIndex);
89
0
    assert(type != kTypeWideColumnEntity);
90
0
    assert(type != kTypeValuePreferredSeqno);
91
0
    assert(type != kTypeDeletionWithTimestamp);
92
0
    if (!callback || callback->IsVisible(seq)) {
93
0
      if (*out_seq == kMaxSequenceNumber) {
94
0
        *out_seq = std::max(seq, *max_covering_tombstone_seq);
95
0
      }
96
0
      if (*max_covering_tombstone_seq > seq) {
97
0
        type = kTypeRangeDeletion;
98
0
      }
99
0
      switch (type) {
100
0
        case kTypeValue: {
101
0
          HandleTypeValue(lookup_user_key, iter->value(), iter->IsValuePinned(),
102
0
                          do_merge, merge_in_progress, merge_context,
103
0
                          moptions_.merge_operator, clock_,
104
0
                          moptions_.statistics, moptions_.info_log, s, value,
105
0
                          columns, is_blob_index);
106
0
          assert(seq <= read_seq);
107
0
          return /*found_final_value=*/true;
108
0
        }
109
0
        case kTypeDeletion:
110
0
        case kTypeSingleDeletion:
111
0
        case kTypeRangeDeletion: {
112
0
          HandleTypeDeletion(lookup_user_key, merge_in_progress, merge_context,
113
0
                             moptions_.merge_operator, clock_,
114
0
                             moptions_.statistics, moptions_.info_log, s, value,
115
0
                             columns);
116
0
          assert(seq <= read_seq);
117
0
          return /*found_final_value=*/true;
118
0
        }
119
0
        case kTypeMerge: {
120
0
          merge_in_progress = true;
121
0
          if (ReadOnlyMemTable::HandleTypeMerge(
122
0
                  lookup_user_key, iter->value(), iter->IsValuePinned(),
123
0
                  do_merge, merge_context, moptions_.merge_operator, clock_,
124
0
                  moptions_.statistics, moptions_.info_log, s, value,
125
0
                  columns)) {
126
0
            return true;
127
0
          }
128
0
          break;
129
0
        }
130
0
        default: {
131
0
          std::string msg(
132
0
              "Unrecognized or unsupported value type for "
133
0
              "WBWI-based memtable: " +
134
0
              std::to_string(static_cast<int>(type)) + ". ");
135
0
          msg.append("User key: " +
136
0
                     ExtractUserKey(iter->key()).ToString(/*hex=*/true) + ". ");
137
0
          msg.append("seq: " + std::to_string(seq) + ".");
138
0
          *s = Status::Corruption(msg.c_str());
139
0
          return /*found_final_value=*/true;
140
0
        }
141
0
      }
142
0
    }
143
    // Current key is a merge key or not visible
144
0
    assert(merge_in_progress || (callback && !callback->IsVisible(seq)));
145
0
    iter->Next();
146
0
  }
147
0
  if (!iter->status().ok() &&
148
0
      (s->ok() || s->IsMergeInProgress() || s->IsNotFound())) {
149
0
    *s = iter->status();
150
    // stop further look up
151
0
    return true;
152
0
  }
153
0
  if (merge_in_progress) {
154
0
    assert(s->ok() || s->IsMergeInProgress());
155
0
    *s = Status::MergeInProgress();
156
0
  }
157
0
  return /*found_final_value=*/false;
158
0
}
159
160
void WBWIMemTable::MultiGet(const ReadOptions& read_options,
161
                            MultiGetRange* range, ReadCallback* callback,
162
                            bool immutable_memtable,
163
0
                            const BlobFetcher* blob_fetcher) {
164
0
  (void)immutable_memtable;
165
0
  (void)blob_fetcher;
166
  // Should only be used as immutable memtable.
167
0
  assert(immutable_memtable);
168
  // IngestWriteBatchWithIndex() is rejected while any live column family has
169
  // blob direct write enabled, so WBWI should never need blob resolution.
170
0
  assert(blob_fetcher == nullptr);
171
  // TODO: reuse the InternalIterator created in Get().
172
0
  for (auto iter = range->begin(); iter != range->end(); ++iter) {
173
0
    SequenceNumber dummy_seq = 0;
174
0
    bool found_final_value =
175
0
        Get(*iter->lkey, iter->value ? iter->value->GetSelf() : nullptr,
176
0
            iter->columns, iter->timestamp, iter->s, &(iter->merge_context),
177
0
            &(iter->max_covering_tombstone_seq), &dummy_seq, read_options, true,
178
0
            callback, nullptr, true);
179
0
    if (found_final_value) {
180
0
      if (iter->s->ok() || iter->s->IsNotFound()) {
181
0
        if (iter->value) {
182
0
          iter->value->PinSelf();
183
0
          range->AddValueSize(iter->value->size());
184
0
        } else {
185
0
          assert(iter->columns);
186
0
          range->AddValueSize(iter->columns->serialized_size());
187
0
        }
188
0
      }
189
0
      range->MarkKeyDone(iter);
190
0
      if (range->GetValueSize() > read_options.value_size_soft_limit) {
191
        // Set all remaining keys in range to Abort
192
0
        for (auto range_iter = range->begin(); range_iter != range->end();
193
0
             ++range_iter) {
194
0
          range->MarkKeyDone(range_iter);
195
0
          *(range_iter->s) = Status::Aborted();
196
0
        }
197
0
        break;
198
0
      }
199
0
    }
200
0
  }
201
0
}
202
}  // namespace ROCKSDB_NAMESPACE