/src/rocksdb/table/get_context.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "table/get_context.h" |
7 | | |
8 | | #include <vector> |
9 | | |
10 | | #include "db/blob/blob_fetcher.h" |
11 | | #include "db/blob/blob_index.h" |
12 | | #include "db/merge_helper.h" |
13 | | #include "db/pinned_iterators_manager.h" |
14 | | #include "db/read_callback.h" |
15 | | #include "db/wide/wide_column_serialization.h" |
16 | | #include "db/wide/wide_columns_helper.h" |
17 | | #include "monitoring/file_read_sample.h" |
18 | | #include "monitoring/perf_context_imp.h" |
19 | | #include "monitoring/statistics_impl.h" |
20 | | #include "port/likely.h" |
21 | | #include "rocksdb/merge_operator.h" |
22 | | #include "rocksdb/statistics.h" |
23 | | #include "rocksdb/status.h" |
24 | | #include "rocksdb/system_clock.h" |
25 | | |
26 | | namespace ROCKSDB_NAMESPACE { |
27 | | |
28 | | GetContext::GetContext( |
29 | | const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger, |
30 | | Statistics* statistics, GetState init_state, const Slice& user_key, |
31 | | PinnableSlice* pinnable_val, PinnableWideColumns* columns, |
32 | | std::string* timestamp, bool* value_found, MergeContext* merge_context, |
33 | | bool do_merge, SequenceNumber* _max_covering_tombstone_seq, |
34 | | SystemClock* clock, SequenceNumber* seq, |
35 | | PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback, |
36 | | bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher) |
37 | 2.38k | : ucmp_(ucmp), |
38 | 2.38k | merge_operator_(merge_operator), |
39 | 2.38k | logger_(logger), |
40 | 2.38k | statistics_(statistics), |
41 | 2.38k | state_(init_state), |
42 | 2.38k | user_key_(user_key), |
43 | 2.38k | pinnable_val_(pinnable_val), |
44 | 2.38k | columns_(columns), |
45 | 2.38k | timestamp_(timestamp), |
46 | 2.38k | value_found_(value_found), |
47 | 2.38k | merge_context_(merge_context), |
48 | 2.38k | max_covering_tombstone_seq_(_max_covering_tombstone_seq), |
49 | 2.38k | clock_(clock), |
50 | 2.38k | seq_(seq), |
51 | 2.38k | replay_log_(nullptr), |
52 | 2.38k | pinned_iters_mgr_(_pinned_iters_mgr), |
53 | 2.38k | callback_(callback), |
54 | 2.38k | do_merge_(do_merge), |
55 | 2.38k | is_blob_index_(is_blob_index), |
56 | 2.38k | tracing_get_id_(tracing_get_id), |
57 | 2.38k | blob_fetcher_(blob_fetcher) { |
58 | 2.38k | if (seq_) { |
59 | 0 | *seq_ = kMaxSequenceNumber; |
60 | 0 | } |
61 | 2.38k | sample_ = should_sample_file_read(); |
62 | 2.38k | } |
63 | | |
64 | | GetContext::GetContext(const Comparator* ucmp, |
65 | | const MergeOperator* merge_operator, Logger* logger, |
66 | | Statistics* statistics, GetState init_state, |
67 | | const Slice& user_key, PinnableSlice* pinnable_val, |
68 | | PinnableWideColumns* columns, bool* value_found, |
69 | | MergeContext* merge_context, bool do_merge, |
70 | | SequenceNumber* _max_covering_tombstone_seq, |
71 | | SystemClock* clock, SequenceNumber* seq, |
72 | | PinnedIteratorsManager* _pinned_iters_mgr, |
73 | | ReadCallback* callback, bool* is_blob_index, |
74 | | uint64_t tracing_get_id, BlobFetcher* blob_fetcher) |
75 | 0 | : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key, |
76 | 0 | pinnable_val, columns, /*timestamp=*/nullptr, value_found, |
77 | 0 | merge_context, do_merge, _max_covering_tombstone_seq, clock, |
78 | 0 | seq, _pinned_iters_mgr, callback, is_blob_index, |
79 | 0 | tracing_get_id, blob_fetcher) {} |
80 | | |
81 | 994 | void GetContext::appendToReplayLog(ValueType type, Slice value, Slice ts) { |
82 | 994 | if (replay_log_) { |
83 | 0 | if (replay_log_->empty()) { |
84 | | // Optimization: in the common case of only one operation in the |
85 | | // log, we allocate the exact amount of space needed. |
86 | 0 | replay_log_->reserve(1 + VarintLength(value.size()) + value.size()); |
87 | 0 | } |
88 | 0 | replay_log_->push_back(type); |
89 | 0 | PutLengthPrefixedSlice(replay_log_, value); |
90 | | |
91 | | // If cf enables ts, there should always be a ts following each value |
92 | 0 | if (ucmp_->timestamp_size() > 0) { |
93 | 0 | assert(ts.size() == ucmp_->timestamp_size()); |
94 | 0 | PutLengthPrefixedSlice(replay_log_, ts); |
95 | 0 | } |
96 | 0 | } |
97 | 994 | } |
98 | | |
99 | | // Called from TableCache::Get and Table::Get when file/block in which |
100 | | // key may exist are not there in TableCache/BlockCache respectively. In this |
101 | | // case we can't guarantee that key does not exist and are not permitted to do |
102 | | // IO to be certain.Set the status=kFound and value_found=false to let the |
103 | | // caller know that key may exist but is not there in memory |
104 | 0 | void GetContext::MarkKeyMayExist() { |
105 | 0 | state_ = kFound; |
106 | 0 | if (value_found_ != nullptr) { |
107 | 0 | *value_found_ = false; |
108 | 0 | } |
109 | 0 | } |
110 | | |
111 | | Status GetContext::SaveWideColumnEntityToPinnable(const Slice& user_key, |
112 | | const Slice& entity, |
113 | 0 | Cleanable* value_pinner) { |
114 | 0 | assert(pinnable_val_ != nullptr); |
115 | | |
116 | | // Try the fast path first: GetValueOfDefaultColumn handles both V1 and V2 |
117 | | // entities with inline default column without full deserialization. It |
118 | | // returns NotSupported only when the default column is a blob reference. |
119 | 0 | Slice value_of_default; |
120 | 0 | Slice entity_ref = entity; |
121 | 0 | Status status = WideColumnSerialization::GetValueOfDefaultColumn( |
122 | 0 | entity_ref, value_of_default); |
123 | 0 | if (status.ok()) { |
124 | 0 | if (LIKELY(value_pinner != nullptr)) { |
125 | 0 | pinnable_val_->PinSlice(value_of_default, value_pinner); |
126 | 0 | } else { |
127 | 0 | pinnable_val_->PinSelf(value_of_default); |
128 | 0 | } |
129 | 0 | } else if (status.IsNotSupported()) { |
130 | | // Default column is a blob reference, so resolve it into the output value. |
131 | 0 | bool resolved = false; |
132 | 0 | status = WideColumnSerialization::GetValueOfDefaultColumnResolvingBlobs( |
133 | 0 | entity, user_key, blob_fetcher_, *pinnable_val_, resolved); |
134 | 0 | } |
135 | 0 | return status; |
136 | 0 | } |
137 | | |
138 | | Status GetContext::SaveWideColumnEntityToColumns(const Slice& user_key, |
139 | | const Slice& entity, |
140 | 0 | Cleanable* value_pinner) { |
141 | 0 | assert(columns_ != nullptr); |
142 | |
|
143 | 0 | std::vector<WideColumn> entity_columns; |
144 | 0 | std::vector<std::pair<size_t, BlobIndex>> blob_cols; |
145 | 0 | Slice entity_ref = entity; |
146 | 0 | Status status = WideColumnSerialization::DeserializeV2( |
147 | 0 | entity_ref, entity_columns, blob_cols); |
148 | 0 | if (status.ok()) { |
149 | 0 | if (LIKELY(blob_cols.empty())) { |
150 | 0 | return columns_->SetWideColumnValue(entity, value_pinner); |
151 | 0 | } |
152 | | |
153 | | // TODO: Add lazy resolution support for GetEntity point lookups. This |
154 | | // requires SuperVersion pinning on PinnableWideColumns to keep the |
155 | | // Version* alive after GetImpl returns. Currently, lazy_column_resolution |
156 | | // only takes effect for iterators (DBIter path). |
157 | | // |
158 | | // Eager path: resolve blob columns inline to avoid intermediate |
159 | | // std::string copies per blob value. Keep fetched blob values as |
160 | | // PinnableSlice. |
161 | 0 | std::vector<PinnableSlice> resolved_blob_values(blob_cols.size()); |
162 | 0 | for (size_t bi = 0; bi < blob_cols.size() && status.ok(); ++bi) { |
163 | 0 | const BlobIndex& blob_idx = blob_cols[bi].second; |
164 | 0 | if (blob_idx.IsInlined()) { |
165 | 0 | resolved_blob_values[bi].PinSelf(blob_idx.value()); |
166 | 0 | continue; |
167 | 0 | } |
168 | | |
169 | 0 | status = blob_fetcher_->FetchBlob( |
170 | 0 | user_key, blob_idx, nullptr /* prefetch_buffer */, |
171 | 0 | &resolved_blob_values[bi], nullptr /* bytes_read */); |
172 | 0 | } |
173 | |
|
174 | 0 | if (status.ok()) { |
175 | 0 | WideColumns result_columns; |
176 | 0 | result_columns.reserve(entity_columns.size()); |
177 | 0 | size_t blob_cursor = 0; |
178 | 0 | for (size_t ci = 0; ci < entity_columns.size(); ++ci) { |
179 | 0 | if (blob_cursor < blob_cols.size() && |
180 | 0 | blob_cols[blob_cursor].first == ci) { |
181 | 0 | result_columns.emplace_back(entity_columns[ci].name(), |
182 | 0 | Slice(resolved_blob_values[blob_cursor])); |
183 | 0 | ++blob_cursor; |
184 | 0 | } else { |
185 | 0 | result_columns.emplace_back(entity_columns[ci].name(), |
186 | 0 | entity_columns[ci].value()); |
187 | 0 | } |
188 | 0 | } |
189 | |
|
190 | 0 | std::string resolved_entity; |
191 | 0 | status = |
192 | 0 | WideColumnSerialization::Serialize(result_columns, resolved_entity); |
193 | 0 | if (status.ok()) { |
194 | | // TODO: A combined SerializeAndBuildIndex method could avoid the |
195 | | // serialize + deserialize round trip inside |
196 | | // SetWideColumnValue -> CreateIndexForWideColumns. |
197 | 0 | return columns_->SetWideColumnValue(std::move(resolved_entity)); |
198 | 0 | } |
199 | 0 | } |
200 | 0 | } |
201 | 0 | return status; |
202 | 0 | } |
203 | | |
204 | 0 | void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) { |
205 | 0 | assert(state_ == kNotFound); |
206 | 0 | assert(ucmp_->timestamp_size() == 0); |
207 | |
|
208 | 0 | appendToReplayLog(kTypeValue, value, Slice()); |
209 | |
|
210 | 0 | state_ = kFound; |
211 | 0 | if (LIKELY(pinnable_val_ != nullptr)) { |
212 | 0 | pinnable_val_->PinSelf(value); |
213 | 0 | } |
214 | 0 | } |
215 | | |
216 | 0 | void GetContext::ReportCounters() { |
217 | 0 | if (get_context_stats_.num_cache_hit > 0) { |
218 | 0 | RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit); |
219 | 0 | } |
220 | 0 | if (get_context_stats_.num_cache_index_hit > 0) { |
221 | 0 | RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT, |
222 | 0 | get_context_stats_.num_cache_index_hit); |
223 | 0 | } |
224 | 0 | if (get_context_stats_.num_cache_data_hit > 0) { |
225 | 0 | RecordTick(statistics_, BLOCK_CACHE_DATA_HIT, |
226 | 0 | get_context_stats_.num_cache_data_hit); |
227 | 0 | } |
228 | 0 | if (get_context_stats_.num_cache_filter_hit > 0) { |
229 | 0 | RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT, |
230 | 0 | get_context_stats_.num_cache_filter_hit); |
231 | 0 | } |
232 | 0 | if (get_context_stats_.num_cache_compression_dict_hit > 0) { |
233 | 0 | RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT, |
234 | 0 | get_context_stats_.num_cache_compression_dict_hit); |
235 | 0 | } |
236 | 0 | if (get_context_stats_.num_cache_index_miss > 0) { |
237 | 0 | RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS, |
238 | 0 | get_context_stats_.num_cache_index_miss); |
239 | 0 | } |
240 | 0 | if (get_context_stats_.num_cache_filter_miss > 0) { |
241 | 0 | RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS, |
242 | 0 | get_context_stats_.num_cache_filter_miss); |
243 | 0 | } |
244 | 0 | if (get_context_stats_.num_cache_data_miss > 0) { |
245 | 0 | RecordTick(statistics_, BLOCK_CACHE_DATA_MISS, |
246 | 0 | get_context_stats_.num_cache_data_miss); |
247 | 0 | } |
248 | 0 | if (get_context_stats_.num_cache_compression_dict_miss > 0) { |
249 | 0 | RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS, |
250 | 0 | get_context_stats_.num_cache_compression_dict_miss); |
251 | 0 | } |
252 | 0 | if (get_context_stats_.num_cache_bytes_read > 0) { |
253 | 0 | RecordTick(statistics_, BLOCK_CACHE_BYTES_READ, |
254 | 0 | get_context_stats_.num_cache_bytes_read); |
255 | 0 | } |
256 | 0 | if (get_context_stats_.num_cache_miss > 0) { |
257 | 0 | RecordTick(statistics_, BLOCK_CACHE_MISS, |
258 | 0 | get_context_stats_.num_cache_miss); |
259 | 0 | } |
260 | 0 | if (get_context_stats_.num_cache_add > 0) { |
261 | 0 | RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add); |
262 | 0 | } |
263 | 0 | if (get_context_stats_.num_cache_add_redundant > 0) { |
264 | 0 | RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT, |
265 | 0 | get_context_stats_.num_cache_add_redundant); |
266 | 0 | } |
267 | 0 | if (get_context_stats_.num_cache_bytes_write > 0) { |
268 | 0 | RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE, |
269 | 0 | get_context_stats_.num_cache_bytes_write); |
270 | 0 | } |
271 | 0 | if (get_context_stats_.num_cache_index_add > 0) { |
272 | 0 | RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD, |
273 | 0 | get_context_stats_.num_cache_index_add); |
274 | 0 | } |
275 | 0 | if (get_context_stats_.num_cache_index_add_redundant > 0) { |
276 | 0 | RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT, |
277 | 0 | get_context_stats_.num_cache_index_add_redundant); |
278 | 0 | } |
279 | 0 | if (get_context_stats_.num_cache_index_bytes_insert > 0) { |
280 | 0 | RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT, |
281 | 0 | get_context_stats_.num_cache_index_bytes_insert); |
282 | 0 | } |
283 | 0 | if (get_context_stats_.num_cache_data_add > 0) { |
284 | 0 | RecordTick(statistics_, BLOCK_CACHE_DATA_ADD, |
285 | 0 | get_context_stats_.num_cache_data_add); |
286 | 0 | } |
287 | 0 | if (get_context_stats_.num_cache_data_add_redundant > 0) { |
288 | 0 | RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT, |
289 | 0 | get_context_stats_.num_cache_data_add_redundant); |
290 | 0 | } |
291 | 0 | if (get_context_stats_.num_cache_data_bytes_insert > 0) { |
292 | 0 | RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT, |
293 | 0 | get_context_stats_.num_cache_data_bytes_insert); |
294 | 0 | } |
295 | 0 | if (get_context_stats_.num_cache_filter_add > 0) { |
296 | 0 | RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD, |
297 | 0 | get_context_stats_.num_cache_filter_add); |
298 | 0 | } |
299 | 0 | if (get_context_stats_.num_cache_filter_add_redundant > 0) { |
300 | 0 | RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT, |
301 | 0 | get_context_stats_.num_cache_filter_add_redundant); |
302 | 0 | } |
303 | 0 | if (get_context_stats_.num_cache_filter_bytes_insert > 0) { |
304 | 0 | RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT, |
305 | 0 | get_context_stats_.num_cache_filter_bytes_insert); |
306 | 0 | } |
307 | 0 | if (get_context_stats_.num_cache_compression_dict_add > 0) { |
308 | 0 | RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD, |
309 | 0 | get_context_stats_.num_cache_compression_dict_add); |
310 | 0 | } |
311 | 0 | if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) { |
312 | 0 | RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT, |
313 | 0 | get_context_stats_.num_cache_compression_dict_add_redundant); |
314 | 0 | } |
315 | 0 | if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) { |
316 | 0 | RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT, |
317 | 0 | get_context_stats_.num_cache_compression_dict_bytes_insert); |
318 | 0 | } |
319 | 0 | } |
320 | | |
321 | | bool GetContext::SaveValue(const ParsedInternalKey& parsed_key, |
322 | | const Slice& value, bool* matched, |
323 | 1.43k | Status* read_status, Cleanable* value_pinner) { |
324 | 1.43k | assert(matched); |
325 | 1.43k | assert((state_ != kMerge && parsed_key.type != kTypeMerge) || |
326 | 1.43k | merge_context_ != nullptr); |
327 | 1.43k | if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) { |
328 | 994 | *matched = true; |
329 | | // If the value is not in the snapshot, skip it |
330 | 994 | if (!CheckCallback(parsed_key.sequence)) { |
331 | 0 | return true; // to continue to the next seq |
332 | 0 | } |
333 | | |
334 | 994 | if (seq_ != nullptr) { |
335 | | // Set the sequence number if it is uninitialized |
336 | 0 | if (*seq_ == kMaxSequenceNumber) { |
337 | 0 | *seq_ = parsed_key.sequence; |
338 | 0 | } |
339 | 0 | if (max_covering_tombstone_seq_) { |
340 | 0 | *seq_ = std::max(*seq_, *max_covering_tombstone_seq_); |
341 | 0 | } |
342 | 0 | } |
343 | | |
344 | 994 | size_t ts_sz = ucmp_->timestamp_size(); |
345 | 994 | Slice ts; |
346 | | |
347 | 994 | if (ts_sz > 0) { |
348 | | // ensure always have ts if cf enables ts. |
349 | 0 | ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz); |
350 | 0 | if (timestamp_ != nullptr) { |
351 | 0 | if (!timestamp_->empty()) { |
352 | 0 | assert(ts_sz == timestamp_->size()); |
353 | | // `timestamp` can be set before `SaveValue` is ever called |
354 | | // when max_covering_tombstone_seq_ was set. |
355 | | // If this key has a higher sequence number than range tombstone, |
356 | | // then timestamp should be updated. `ts_from_rangetombstone_` is |
357 | | // set to false afterwards so that only the key with highest seqno |
358 | | // updates the timestamp. |
359 | 0 | if (ts_from_rangetombstone_) { |
360 | 0 | assert(max_covering_tombstone_seq_); |
361 | 0 | if (parsed_key.sequence > *max_covering_tombstone_seq_) { |
362 | 0 | timestamp_->assign(ts.data(), ts.size()); |
363 | 0 | ts_from_rangetombstone_ = false; |
364 | 0 | } |
365 | 0 | } |
366 | 0 | } |
367 | | // TODO optimize for small size ts |
368 | 0 | const std::string kMaxTs(ts_sz, '\xff'); |
369 | 0 | if (timestamp_->empty() || |
370 | 0 | ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) { |
371 | 0 | timestamp_->assign(ts.data(), ts.size()); |
372 | 0 | } |
373 | 0 | } |
374 | 0 | } |
375 | 994 | appendToReplayLog(parsed_key.type, value, ts); |
376 | | |
377 | 994 | auto type = parsed_key.type; |
378 | 994 | Slice unpacked_value = value; |
379 | | // Key matches. Process it |
380 | 994 | if ((type == kTypeValue || type == kTypeValuePreferredSeqno || |
381 | 396 | type == kTypeMerge || type == kTypeBlobIndex || |
382 | 396 | type == kTypeWideColumnEntity || type == kTypeDeletion || |
383 | 0 | type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) && |
384 | 994 | max_covering_tombstone_seq_ != nullptr && |
385 | 994 | *max_covering_tombstone_seq_ > parsed_key.sequence) { |
386 | | // Note that deletion types are also considered, this is for the case |
387 | | // when we need to return timestamp to user. If a range tombstone has a |
388 | | // higher seqno than point tombstone, its timestamp should be returned. |
389 | 0 | type = kTypeRangeDeletion; |
390 | 0 | } |
391 | 994 | switch (type) { |
392 | 598 | case kTypeValue: |
393 | 598 | case kTypeValuePreferredSeqno: |
394 | 598 | case kTypeBlobIndex: |
395 | 598 | case kTypeWideColumnEntity: |
396 | 598 | assert(state_ == kNotFound || state_ == kMerge); |
397 | 598 | if (type == kTypeValuePreferredSeqno) { |
398 | 0 | unpacked_value = ParsePackedValueForValue(value); |
399 | 0 | } |
400 | 598 | if (type == kTypeBlobIndex) { |
401 | 0 | if (is_blob_index_ == nullptr) { |
402 | | // Blob value not supported. Stop. |
403 | 0 | state_ = kUnexpectedBlobIndex; |
404 | 0 | return false; |
405 | 0 | } |
406 | 0 | } |
407 | | |
408 | 598 | if (is_blob_index_ != nullptr) { |
409 | 598 | *is_blob_index_ = (type == kTypeBlobIndex); |
410 | 598 | } |
411 | | |
412 | 598 | if (kNotFound == state_) { |
413 | 598 | state_ = kFound; |
414 | 598 | if (do_merge_) { |
415 | 598 | if (type == kTypeBlobIndex && ucmp_->timestamp_size() != 0) { |
416 | 0 | ukey_with_ts_found_.PinSelf(parsed_key.user_key); |
417 | 0 | } |
418 | 598 | if (LIKELY(pinnable_val_ != nullptr)) { |
419 | 598 | Slice value_to_use = unpacked_value; |
420 | | |
421 | 598 | if (type == kTypeWideColumnEntity) { |
422 | 0 | const Status s = SaveWideColumnEntityToPinnable( |
423 | 0 | parsed_key.user_key, unpacked_value, value_pinner); |
424 | 0 | if (!s.ok()) { |
425 | 0 | if (s.IsIncomplete()) { |
426 | 0 | MarkKeyMayExist(); |
427 | 0 | } else { |
428 | 0 | state_ = kCorrupt; |
429 | 0 | } |
430 | 0 | return false; |
431 | 0 | } |
432 | 598 | } else { |
433 | | // Non-entity type |
434 | 598 | if (LIKELY(value_pinner != nullptr)) { |
435 | | // If the backing resources for the value are provided, pin |
436 | | // them |
437 | 598 | pinnable_val_->PinSlice(value_to_use, value_pinner); |
438 | 598 | } else { |
439 | 0 | TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf", |
440 | 0 | this); |
441 | | // Otherwise copy the value |
442 | 0 | pinnable_val_->PinSelf(value_to_use); |
443 | 0 | } |
444 | 598 | } |
445 | 598 | } else if (columns_ != nullptr) { |
446 | 0 | if (type == kTypeWideColumnEntity) { |
447 | 0 | const Status s = SaveWideColumnEntityToColumns( |
448 | 0 | parsed_key.user_key, unpacked_value, value_pinner); |
449 | 0 | if (!s.ok()) { |
450 | 0 | if (s.IsIncomplete()) { |
451 | 0 | MarkKeyMayExist(); |
452 | 0 | } else { |
453 | 0 | state_ = kCorrupt; |
454 | 0 | } |
455 | 0 | return false; |
456 | 0 | } |
457 | 0 | } else { |
458 | 0 | columns_->SetPlainValue(unpacked_value, value_pinner); |
459 | 0 | } |
460 | 0 | } |
461 | 598 | } else { |
462 | | // It means this function is called as part of DB GetMergeOperands |
463 | | // API and the current value should be part of |
464 | | // merge_context_->operand_list |
465 | 0 | if (type == kTypeBlobIndex) { |
466 | 0 | PinnableSlice pin_val; |
467 | 0 | if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val, |
468 | 0 | read_status) == false) { |
469 | 0 | return false; |
470 | 0 | } |
471 | 0 | Slice blob_value(pin_val); |
472 | 0 | push_operand(blob_value, nullptr); |
473 | 0 | } else if (type == kTypeWideColumnEntity) { |
474 | 0 | Slice value_copy = unpacked_value; |
475 | 0 | Slice value_of_default; |
476 | |
|
477 | 0 | if (!WideColumnSerialization::GetValueOfDefaultColumn( |
478 | 0 | value_copy, value_of_default) |
479 | 0 | .ok()) { |
480 | 0 | state_ = kCorrupt; |
481 | 0 | return false; |
482 | 0 | } |
483 | | |
484 | 0 | push_operand(value_of_default, value_pinner); |
485 | 0 | } else { |
486 | 0 | assert(type == kTypeValue || type == kTypeValuePreferredSeqno); |
487 | 0 | push_operand(unpacked_value, value_pinner); |
488 | 0 | } |
489 | 0 | } |
490 | 598 | } else if (kMerge == state_) { |
491 | 0 | assert(merge_operator_ != nullptr); |
492 | 0 | if (type == kTypeBlobIndex) { |
493 | 0 | PinnableSlice pin_val; |
494 | 0 | if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val, |
495 | 0 | read_status) == false) { |
496 | 0 | return false; |
497 | 0 | } |
498 | 0 | Slice blob_value(pin_val); |
499 | 0 | state_ = kFound; |
500 | 0 | if (do_merge_) { |
501 | 0 | MergeWithPlainBaseValue(blob_value); |
502 | 0 | } else { |
503 | | // It means this function is called as part of DB GetMergeOperands |
504 | | // API and the current value should be part of |
505 | | // merge_context_->operand_list |
506 | 0 | push_operand(blob_value, nullptr); |
507 | 0 | } |
508 | 0 | } else if (type == kTypeWideColumnEntity) { |
509 | 0 | state_ = kFound; |
510 | |
|
511 | 0 | if (do_merge_) { |
512 | 0 | MergeWithWideColumnBaseValue(unpacked_value); |
513 | 0 | } else { |
514 | | // It means this function is called as part of DB GetMergeOperands |
515 | | // API and the current value should be part of |
516 | | // merge_context_->operand_list |
517 | 0 | Slice value_copy = unpacked_value; |
518 | 0 | Slice value_of_default; |
519 | |
|
520 | 0 | if (!WideColumnSerialization::GetValueOfDefaultColumn( |
521 | 0 | value_copy, value_of_default) |
522 | 0 | .ok()) { |
523 | 0 | state_ = kCorrupt; |
524 | 0 | return false; |
525 | 0 | } |
526 | | |
527 | 0 | push_operand(value_of_default, value_pinner); |
528 | 0 | } |
529 | 0 | } else { |
530 | 0 | assert(type == kTypeValue || type == kTypeValuePreferredSeqno); |
531 | |
|
532 | 0 | state_ = kFound; |
533 | 0 | if (do_merge_) { |
534 | 0 | MergeWithPlainBaseValue(unpacked_value); |
535 | 0 | } else { |
536 | | // It means this function is called as part of DB GetMergeOperands |
537 | | // API and the current value should be part of |
538 | | // merge_context_->operand_list |
539 | 0 | push_operand(unpacked_value, value_pinner); |
540 | 0 | } |
541 | 0 | } |
542 | 0 | } |
543 | 598 | return false; |
544 | | |
545 | 396 | case kTypeDeletion: |
546 | 396 | case kTypeDeletionWithTimestamp: |
547 | 396 | case kTypeSingleDeletion: |
548 | 396 | case kTypeRangeDeletion: |
549 | | // TODO(noetzli): Verify correctness once merge of single-deletes |
550 | | // is supported |
551 | 396 | assert(state_ == kNotFound || state_ == kMerge); |
552 | 396 | if (kNotFound == state_) { |
553 | 396 | state_ = kDeleted; |
554 | 396 | } else if (kMerge == state_) { |
555 | 0 | state_ = kFound; |
556 | 0 | if (do_merge_) { |
557 | 0 | MergeWithNoBaseValue(); |
558 | 0 | } |
559 | | // If do_merge_ = false then the current value shouldn't be part of |
560 | | // merge_context_->operand_list |
561 | 0 | } |
562 | 396 | return false; |
563 | | |
564 | 0 | case kTypeMerge: |
565 | 0 | assert(state_ == kNotFound || state_ == kMerge); |
566 | 0 | state_ = kMerge; |
567 | | // value_pinner is not set from plain_table_reader.cc for example. |
568 | 0 | push_operand(value, value_pinner); |
569 | 0 | PERF_COUNTER_ADD(internal_merge_point_lookup_count, 1); |
570 | |
|
571 | 0 | if (do_merge_ && merge_operator_ != nullptr && |
572 | 0 | merge_operator_->ShouldMerge( |
573 | 0 | merge_context_->GetOperandsDirectionBackward())) { |
574 | 0 | state_ = kFound; |
575 | 0 | MergeWithNoBaseValue(); |
576 | 0 | return false; |
577 | 0 | } |
578 | 0 | if (merge_context_->get_merge_operands_options != nullptr && |
579 | 0 | merge_context_->get_merge_operands_options->continue_cb != |
580 | 0 | nullptr && |
581 | 0 | !merge_context_->get_merge_operands_options->continue_cb(value)) { |
582 | 0 | state_ = kFound; |
583 | 0 | return false; |
584 | 0 | } |
585 | 0 | return true; |
586 | | |
587 | 0 | default: |
588 | 0 | assert(false); |
589 | 0 | break; |
590 | 994 | } |
591 | 994 | } |
592 | | |
593 | | // state_ could be Corrupt, merge or notfound |
594 | 438 | return false; |
595 | 1.43k | } |
596 | | |
597 | 0 | void GetContext::PostprocessMerge(const Status& merge_status) { |
598 | 0 | if (!merge_status.ok()) { |
599 | 0 | if (merge_status.subcode() == Status::SubCode::kMergeOperatorFailed) { |
600 | 0 | state_ = kMergeOperatorFailed; |
601 | 0 | } else { |
602 | 0 | state_ = kCorrupt; |
603 | 0 | } |
604 | 0 | return; |
605 | 0 | } |
606 | | |
607 | 0 | if (LIKELY(pinnable_val_ != nullptr)) { |
608 | 0 | pinnable_val_->PinSelf(); |
609 | 0 | } |
610 | 0 | } |
611 | | |
612 | 0 | void GetContext::MergeWithNoBaseValue() { |
613 | 0 | assert(do_merge_); |
614 | 0 | assert(pinnable_val_ || columns_); |
615 | 0 | assert(!pinnable_val_ || !columns_); |
616 | | |
617 | | // `op_failure_scope` (an output parameter) is not provided (set to nullptr) |
618 | | // since a failure must be propagated regardless of its value. |
619 | 0 | const Status s = MergeHelper::TimedFullMerge( |
620 | 0 | merge_operator_, user_key_, MergeHelper::kNoBaseValue, |
621 | 0 | merge_context_->GetOperands(), logger_, statistics_, clock_, |
622 | 0 | /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, |
623 | 0 | pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_); |
624 | 0 | PostprocessMerge(s); |
625 | 0 | } |
626 | | |
627 | 0 | void GetContext::MergeWithPlainBaseValue(const Slice& value) { |
628 | 0 | assert(do_merge_); |
629 | 0 | assert(pinnable_val_ || columns_); |
630 | 0 | assert(!pinnable_val_ || !columns_); |
631 | | |
632 | | // `op_failure_scope` (an output parameter) is not provided (set to nullptr) |
633 | | // since a failure must be propagated regardless of its value. |
634 | 0 | const Status s = MergeHelper::TimedFullMerge( |
635 | 0 | merge_operator_, user_key_, MergeHelper::kPlainBaseValue, value, |
636 | 0 | merge_context_->GetOperands(), logger_, statistics_, clock_, |
637 | 0 | /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, |
638 | 0 | pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_); |
639 | 0 | PostprocessMerge(s); |
640 | 0 | } |
641 | | |
642 | 0 | void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) { |
643 | 0 | assert(do_merge_); |
644 | 0 | assert(pinnable_val_ || columns_); |
645 | 0 | assert(!pinnable_val_ || !columns_); |
646 | | |
647 | | // Resolve V2 entity blob columns if present, since TimedFullMerge only |
648 | | // supports V1 format. |
649 | 0 | std::string resolved_entity; |
650 | 0 | Slice effective_entity; |
651 | 0 | const Status s_resolve = WideColumnSerialization::ResolveEntityForMerge( |
652 | 0 | entity, user_key_, blob_fetcher_, nullptr /* prefetch_buffers */, |
653 | 0 | resolved_entity, effective_entity); |
654 | 0 | if (!s_resolve.ok()) { |
655 | 0 | if (s_resolve.IsIncomplete()) { |
656 | 0 | MarkKeyMayExist(); |
657 | 0 | return; |
658 | 0 | } |
659 | 0 | state_ = kCorrupt; |
660 | 0 | return; |
661 | 0 | } |
662 | | |
663 | | // `op_failure_scope` (an output parameter) is not provided (set to nullptr) |
664 | | // since a failure must be propagated regardless of its value. |
665 | 0 | const Status s = MergeHelper::TimedFullMerge( |
666 | 0 | merge_operator_, user_key_, MergeHelper::kWideBaseValue, effective_entity, |
667 | 0 | merge_context_->GetOperands(), logger_, statistics_, clock_, |
668 | 0 | /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr, |
669 | 0 | pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_); |
670 | 0 | PostprocessMerge(s); |
671 | 0 | } |
672 | | |
673 | | bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index, |
674 | 0 | PinnableSlice* blob_value, Status* read_status) { |
675 | 0 | constexpr FilePrefetchBuffer* prefetch_buffer = nullptr; |
676 | 0 | constexpr uint64_t* bytes_read = nullptr; |
677 | |
|
678 | 0 | *read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer, |
679 | 0 | blob_value, bytes_read); |
680 | 0 | if (!read_status->ok()) { |
681 | 0 | if (read_status->IsIncomplete()) { |
682 | | // FIXME: this code is not covered by unit tests |
683 | 0 | MarkKeyMayExist(); |
684 | 0 | return false; |
685 | 0 | } |
686 | 0 | state_ = kCorrupt; |
687 | 0 | return false; |
688 | 0 | } |
689 | 0 | *is_blob_index_ = false; |
690 | 0 | return true; |
691 | 0 | } |
692 | | |
693 | 0 | void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) { |
694 | | // TODO(yanqin) preserve timestamps information in merge_context |
695 | 0 | if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() && |
696 | 0 | value_pinner != nullptr) { |
697 | 0 | value_pinner->DelegateCleanupsTo(pinned_iters_mgr()); |
698 | 0 | merge_context_->PushOperand(value, true /*value_pinned*/); |
699 | 0 | } else { |
700 | 0 | merge_context_->PushOperand(value, false); |
701 | 0 | } |
702 | 0 | } |
703 | | |
704 | | Status replayGetContextLog(const Slice& replay_log, const Slice& user_key, |
705 | | GetContext* get_context, Cleanable* value_pinner, |
706 | 0 | SequenceNumber seq_no) { |
707 | 0 | Slice s = replay_log; |
708 | 0 | Slice ts; |
709 | 0 | size_t ts_sz = get_context->TimestampSize(); |
710 | 0 | bool ret = false; |
711 | |
|
712 | 0 | while (s.size()) { |
713 | 0 | auto type = static_cast<ValueType>(*s.data()); |
714 | 0 | s.remove_prefix(1); |
715 | 0 | Slice value; |
716 | 0 | ret = GetLengthPrefixedSlice(&s, &value); |
717 | 0 | assert(ret); |
718 | |
|
719 | 0 | bool dont_care __attribute__((__unused__)); |
720 | | |
721 | | // Use a copy to prevent modifying user_key. Modification of user_key |
722 | | // could result to potential cache miss. |
723 | 0 | std::string user_key_str = user_key.ToString(); |
724 | 0 | ParsedInternalKey ikey = ParsedInternalKey(user_key_str, seq_no, type); |
725 | | |
726 | | // If ts enabled for current cf, there will always be ts appended after each |
727 | | // piece of value. |
728 | 0 | if (ts_sz > 0) { |
729 | 0 | ret = GetLengthPrefixedSlice(&s, &ts); |
730 | 0 | assert(ts_sz == ts.size()); |
731 | 0 | assert(ret); |
732 | 0 | ikey.SetTimestamp(ts); |
733 | 0 | } |
734 | |
|
735 | 0 | (void)ret; |
736 | |
|
737 | 0 | Status read_status; |
738 | 0 | get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner); |
739 | 0 | if (!read_status.ok()) { |
740 | 0 | return read_status; |
741 | 0 | } |
742 | 0 | } |
743 | 0 | return Status::OK(); |
744 | 0 | } |
745 | | |
746 | | } // namespace ROCKSDB_NAMESPACE |