/src/rocksdb/memtable/wbwi_memtable.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "memtable/wbwi_memtable.h" |
7 | | |
8 | | #include "db/memtable.h" |
9 | | |
10 | | namespace ROCKSDB_NAMESPACE { |
11 | | |
12 | | const std::unordered_map<WriteType, ValueType> |
13 | | WBWIMemTableIterator::WriteTypeToValueTypeMap = { |
14 | | {kPutRecord, kTypeValue}, |
15 | | {kMergeRecord, kTypeMerge}, |
16 | | {kDeleteRecord, kTypeDeletion}, |
17 | | {kSingleDeleteRecord, kTypeSingleDeletion}, |
18 | | {kDeleteRangeRecord, kTypeRangeDeletion}, |
19 | | {kPutEntityRecord, kTypeWideColumnEntity}, |
20 | | // Only the above record types are added to WBWI. |
21 | | // kLogDataRecord, kXIDRecord, kUnknownRecord |
22 | | }; |
23 | | |
24 | | InternalIterator* WBWIMemTable::NewIterator( |
25 | | const ReadOptions&, UnownedPtr<const SeqnoToTimeMapping>, Arena* arena, |
26 | 0 | const SliceTransform* /* prefix_extractor */, bool for_flush) { |
27 | | // Ingested WBWIMemTable should have an assigned seqno |
28 | 0 | assert(assigned_seqno_.upper_bound != kMaxSequenceNumber); |
29 | 0 | assert(assigned_seqno_.lower_bound != kMaxSequenceNumber); |
30 | 0 | assert(arena); |
31 | 0 | auto mem = arena->AllocateAligned(sizeof(WBWIMemTableIterator)); |
32 | 0 | return new (mem) WBWIMemTableIterator( |
33 | 0 | std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)), |
34 | 0 | assigned_seqno_, comparator_, for_flush); |
35 | 0 | } |
36 | | |
37 | 0 | inline InternalIterator* WBWIMemTable::NewIterator() const { |
38 | 0 | assert(assigned_seqno_.upper_bound != kMaxSequenceNumber); |
39 | 0 | assert(assigned_seqno_.lower_bound != kMaxSequenceNumber); |
40 | 0 | return new WBWIMemTableIterator( |
41 | 0 | std::unique_ptr<WBWIIterator>(wbwi_->NewIterator(cf_id_)), |
42 | 0 | assigned_seqno_, comparator_, /*for_flush=*/false); |
43 | 0 | } |
44 | | |
45 | | bool WBWIMemTable::Get(const LookupKey& key, std::string* value, |
46 | | PinnableWideColumns* columns, std::string* timestamp, |
47 | | Status* s, MergeContext* merge_context, |
48 | | SequenceNumber* max_covering_tombstone_seq, |
49 | | SequenceNumber* out_seq, const ReadOptions&, |
50 | | bool immutable_memtable, ReadCallback* callback, |
51 | | bool* is_blob_index, bool do_merge, |
52 | 0 | const BlobFetcher* blob_fetcher) { |
53 | 0 | assert(s->ok() || s->IsMergeInProgress()); |
54 | 0 | (void)immutable_memtable; |
55 | 0 | (void)timestamp; |
56 | 0 | (void)columns; |
57 | 0 | (void)blob_fetcher; |
58 | 0 | assert(immutable_memtable); |
59 | 0 | assert(!timestamp); // TODO: support UDT |
60 | | // IngestWriteBatchWithIndex() is rejected while any live column family has |
61 | | // blob direct write enabled, so WBWI should never need blob resolution. |
62 | 0 | assert(blob_fetcher == nullptr); |
63 | 0 | assert(assigned_seqno_.upper_bound != kMaxSequenceNumber); |
64 | 0 | assert(assigned_seqno_.lower_bound != kMaxSequenceNumber); |
65 | | // WBWI does not support DeleteRange yet. |
66 | 0 | assert(!wbwi_->GetWriteBatch()->HasDeleteRange()); |
67 | 0 | assert(merge_context); |
68 | |
|
69 | 0 | *out_seq = kMaxSequenceNumber; |
70 | 0 | [[maybe_unused]] SequenceNumber read_seq = |
71 | 0 | GetInternalKeySeqno(key.internal_key()); |
72 | | // This is memtable is a single write batch, no snapshot can be taken within |
73 | | // assigned seqnos for this memtable. |
74 | 0 | assert(read_seq >= assigned_seqno_.upper_bound || |
75 | 0 | read_seq < assigned_seqno_.lower_bound); |
76 | 0 | std::unique_ptr<InternalIterator> iter{NewIterator()}; |
77 | 0 | iter->Seek(key.internal_key()); |
78 | 0 | const Slice lookup_user_key = key.user_key(); |
79 | 0 | bool merge_in_progress = s->IsMergeInProgress(); |
80 | |
|
81 | 0 | while (iter->Valid() && comparator_->EqualWithoutTimestamp( |
82 | 0 | ExtractUserKey(iter->key()), lookup_user_key)) { |
83 | 0 | uint64_t tag = ExtractInternalKeyFooter(iter->key()); |
84 | 0 | ValueType type; |
85 | 0 | SequenceNumber seq; |
86 | 0 | UnPackSequenceAndType(tag, &seq, &type); |
87 | | // Unsupported operations. |
88 | 0 | assert(type != kTypeBlobIndex); |
89 | 0 | assert(type != kTypeWideColumnEntity); |
90 | 0 | assert(type != kTypeValuePreferredSeqno); |
91 | 0 | assert(type != kTypeDeletionWithTimestamp); |
92 | 0 | if (!callback || callback->IsVisible(seq)) { |
93 | 0 | if (*out_seq == kMaxSequenceNumber) { |
94 | 0 | *out_seq = std::max(seq, *max_covering_tombstone_seq); |
95 | 0 | } |
96 | 0 | if (*max_covering_tombstone_seq > seq) { |
97 | 0 | type = kTypeRangeDeletion; |
98 | 0 | } |
99 | 0 | switch (type) { |
100 | 0 | case kTypeValue: { |
101 | 0 | HandleTypeValue(lookup_user_key, iter->value(), iter->IsValuePinned(), |
102 | 0 | do_merge, merge_in_progress, merge_context, |
103 | 0 | moptions_.merge_operator, clock_, |
104 | 0 | moptions_.statistics, moptions_.info_log, s, value, |
105 | 0 | columns, is_blob_index); |
106 | 0 | assert(seq <= read_seq); |
107 | 0 | return /*found_final_value=*/true; |
108 | 0 | } |
109 | 0 | case kTypeDeletion: |
110 | 0 | case kTypeSingleDeletion: |
111 | 0 | case kTypeRangeDeletion: { |
112 | 0 | HandleTypeDeletion(lookup_user_key, merge_in_progress, merge_context, |
113 | 0 | moptions_.merge_operator, clock_, |
114 | 0 | moptions_.statistics, moptions_.info_log, s, value, |
115 | 0 | columns); |
116 | 0 | assert(seq <= read_seq); |
117 | 0 | return /*found_final_value=*/true; |
118 | 0 | } |
119 | 0 | case kTypeMerge: { |
120 | 0 | merge_in_progress = true; |
121 | 0 | if (ReadOnlyMemTable::HandleTypeMerge( |
122 | 0 | lookup_user_key, iter->value(), iter->IsValuePinned(), |
123 | 0 | do_merge, merge_context, moptions_.merge_operator, clock_, |
124 | 0 | moptions_.statistics, moptions_.info_log, s, value, |
125 | 0 | columns)) { |
126 | 0 | return true; |
127 | 0 | } |
128 | 0 | break; |
129 | 0 | } |
130 | 0 | default: { |
131 | 0 | std::string msg( |
132 | 0 | "Unrecognized or unsupported value type for " |
133 | 0 | "WBWI-based memtable: " + |
134 | 0 | std::to_string(static_cast<int>(type)) + ". "); |
135 | 0 | msg.append("User key: " + |
136 | 0 | ExtractUserKey(iter->key()).ToString(/*hex=*/true) + ". "); |
137 | 0 | msg.append("seq: " + std::to_string(seq) + "."); |
138 | 0 | *s = Status::Corruption(msg.c_str()); |
139 | 0 | return /*found_final_value=*/true; |
140 | 0 | } |
141 | 0 | } |
142 | 0 | } |
143 | | // Current key is a merge key or not visible |
144 | 0 | assert(merge_in_progress || (callback && !callback->IsVisible(seq))); |
145 | 0 | iter->Next(); |
146 | 0 | } |
147 | 0 | if (!iter->status().ok() && |
148 | 0 | (s->ok() || s->IsMergeInProgress() || s->IsNotFound())) { |
149 | 0 | *s = iter->status(); |
150 | | // stop further look up |
151 | 0 | return true; |
152 | 0 | } |
153 | 0 | if (merge_in_progress) { |
154 | 0 | assert(s->ok() || s->IsMergeInProgress()); |
155 | 0 | *s = Status::MergeInProgress(); |
156 | 0 | } |
157 | 0 | return /*found_final_value=*/false; |
158 | 0 | } |
159 | | |
160 | | void WBWIMemTable::MultiGet(const ReadOptions& read_options, |
161 | | MultiGetRange* range, ReadCallback* callback, |
162 | | bool immutable_memtable, |
163 | 0 | const BlobFetcher* blob_fetcher) { |
164 | 0 | (void)immutable_memtable; |
165 | 0 | (void)blob_fetcher; |
166 | | // Should only be used as immutable memtable. |
167 | 0 | assert(immutable_memtable); |
168 | | // IngestWriteBatchWithIndex() is rejected while any live column family has |
169 | | // blob direct write enabled, so WBWI should never need blob resolution. |
170 | 0 | assert(blob_fetcher == nullptr); |
171 | | // TODO: reuse the InternalIterator created in Get(). |
172 | 0 | for (auto iter = range->begin(); iter != range->end(); ++iter) { |
173 | 0 | SequenceNumber dummy_seq = 0; |
174 | 0 | bool found_final_value = |
175 | 0 | Get(*iter->lkey, iter->value ? iter->value->GetSelf() : nullptr, |
176 | 0 | iter->columns, iter->timestamp, iter->s, &(iter->merge_context), |
177 | 0 | &(iter->max_covering_tombstone_seq), &dummy_seq, read_options, true, |
178 | 0 | callback, nullptr, true); |
179 | 0 | if (found_final_value) { |
180 | 0 | if (iter->s->ok() || iter->s->IsNotFound()) { |
181 | 0 | if (iter->value) { |
182 | 0 | iter->value->PinSelf(); |
183 | 0 | range->AddValueSize(iter->value->size()); |
184 | 0 | } else { |
185 | 0 | assert(iter->columns); |
186 | 0 | range->AddValueSize(iter->columns->serialized_size()); |
187 | 0 | } |
188 | 0 | } |
189 | 0 | range->MarkKeyDone(iter); |
190 | 0 | if (range->GetValueSize() > read_options.value_size_soft_limit) { |
191 | | // Set all remaining keys in range to Abort |
192 | 0 | for (auto range_iter = range->begin(); range_iter != range->end(); |
193 | 0 | ++range_iter) { |
194 | 0 | range->MarkKeyDone(range_iter); |
195 | 0 | *(range_iter->s) = Status::Aborted(); |
196 | 0 | } |
197 | 0 | break; |
198 | 0 | } |
199 | 0 | } |
200 | 0 | } |
201 | 0 | } |
202 | | } // namespace ROCKSDB_NAMESPACE |