Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/table/get_context.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "table/get_context.h"
7
8
#include <vector>
9
10
#include "db/blob/blob_fetcher.h"
11
#include "db/blob/blob_index.h"
12
#include "db/merge_helper.h"
13
#include "db/pinned_iterators_manager.h"
14
#include "db/read_callback.h"
15
#include "db/wide/wide_column_serialization.h"
16
#include "db/wide/wide_columns_helper.h"
17
#include "monitoring/file_read_sample.h"
18
#include "monitoring/perf_context_imp.h"
19
#include "monitoring/statistics_impl.h"
20
#include "port/likely.h"
21
#include "rocksdb/merge_operator.h"
22
#include "rocksdb/statistics.h"
23
#include "rocksdb/status.h"
24
#include "rocksdb/system_clock.h"
25
26
namespace ROCKSDB_NAMESPACE {
27
28
GetContext::GetContext(
29
    const Comparator* ucmp, const MergeOperator* merge_operator, Logger* logger,
30
    Statistics* statistics, GetState init_state, const Slice& user_key,
31
    PinnableSlice* pinnable_val, PinnableWideColumns* columns,
32
    std::string* timestamp, bool* value_found, MergeContext* merge_context,
33
    bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
34
    SystemClock* clock, SequenceNumber* seq,
35
    PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
36
    bool* is_blob_index, uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
37
2.38k
    : ucmp_(ucmp),
38
2.38k
      merge_operator_(merge_operator),
39
2.38k
      logger_(logger),
40
2.38k
      statistics_(statistics),
41
2.38k
      state_(init_state),
42
2.38k
      user_key_(user_key),
43
2.38k
      pinnable_val_(pinnable_val),
44
2.38k
      columns_(columns),
45
2.38k
      timestamp_(timestamp),
46
2.38k
      value_found_(value_found),
47
2.38k
      merge_context_(merge_context),
48
2.38k
      max_covering_tombstone_seq_(_max_covering_tombstone_seq),
49
2.38k
      clock_(clock),
50
2.38k
      seq_(seq),
51
2.38k
      replay_log_(nullptr),
52
2.38k
      pinned_iters_mgr_(_pinned_iters_mgr),
53
2.38k
      callback_(callback),
54
2.38k
      do_merge_(do_merge),
55
2.38k
      is_blob_index_(is_blob_index),
56
2.38k
      tracing_get_id_(tracing_get_id),
57
2.38k
      blob_fetcher_(blob_fetcher) {
58
2.38k
  if (seq_) {
59
0
    *seq_ = kMaxSequenceNumber;
60
0
  }
61
2.38k
  sample_ = should_sample_file_read();
62
2.38k
}
63
64
GetContext::GetContext(const Comparator* ucmp,
65
                       const MergeOperator* merge_operator, Logger* logger,
66
                       Statistics* statistics, GetState init_state,
67
                       const Slice& user_key, PinnableSlice* pinnable_val,
68
                       PinnableWideColumns* columns, bool* value_found,
69
                       MergeContext* merge_context, bool do_merge,
70
                       SequenceNumber* _max_covering_tombstone_seq,
71
                       SystemClock* clock, SequenceNumber* seq,
72
                       PinnedIteratorsManager* _pinned_iters_mgr,
73
                       ReadCallback* callback, bool* is_blob_index,
74
                       uint64_t tracing_get_id, BlobFetcher* blob_fetcher)
75
0
    : GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
76
0
                 pinnable_val, columns, /*timestamp=*/nullptr, value_found,
77
0
                 merge_context, do_merge, _max_covering_tombstone_seq, clock,
78
0
                 seq, _pinned_iters_mgr, callback, is_blob_index,
79
0
                 tracing_get_id, blob_fetcher) {}
80
81
994
void GetContext::appendToReplayLog(ValueType type, Slice value, Slice ts) {
82
994
  if (replay_log_) {
83
0
    if (replay_log_->empty()) {
84
      // Optimization: in the common case of only one operation in the
85
      // log, we allocate the exact amount of space needed.
86
0
      replay_log_->reserve(1 + VarintLength(value.size()) + value.size());
87
0
    }
88
0
    replay_log_->push_back(type);
89
0
    PutLengthPrefixedSlice(replay_log_, value);
90
91
    // If cf enables ts, there should always be a ts following each value
92
0
    if (ucmp_->timestamp_size() > 0) {
93
0
      assert(ts.size() == ucmp_->timestamp_size());
94
0
      PutLengthPrefixedSlice(replay_log_, ts);
95
0
    }
96
0
  }
97
994
}
98
99
// Called from TableCache::Get and Table::Get when file/block in which
100
// key may exist are not there in TableCache/BlockCache respectively. In this
101
// case we can't guarantee that key does not exist and are not permitted to do
102
// IO to be certain.Set the status=kFound and value_found=false to let the
103
// caller know that key may exist but is not there in memory
104
0
void GetContext::MarkKeyMayExist() {
105
0
  state_ = kFound;
106
0
  if (value_found_ != nullptr) {
107
0
    *value_found_ = false;
108
0
  }
109
0
}
110
111
Status GetContext::SaveWideColumnEntityToPinnable(const Slice& user_key,
112
                                                  const Slice& entity,
113
0
                                                  Cleanable* value_pinner) {
114
0
  assert(pinnable_val_ != nullptr);
115
116
  // Try the fast path first: GetValueOfDefaultColumn handles both V1 and V2
117
  // entities with inline default column without full deserialization. It
118
  // returns NotSupported only when the default column is a blob reference.
119
0
  Slice value_of_default;
120
0
  Slice entity_ref = entity;
121
0
  Status status = WideColumnSerialization::GetValueOfDefaultColumn(
122
0
      entity_ref, value_of_default);
123
0
  if (status.ok()) {
124
0
    if (LIKELY(value_pinner != nullptr)) {
125
0
      pinnable_val_->PinSlice(value_of_default, value_pinner);
126
0
    } else {
127
0
      pinnable_val_->PinSelf(value_of_default);
128
0
    }
129
0
  } else if (status.IsNotSupported()) {
130
    // Default column is a blob reference, so resolve it into the output value.
131
0
    bool resolved = false;
132
0
    status = WideColumnSerialization::GetValueOfDefaultColumnResolvingBlobs(
133
0
        entity, user_key, blob_fetcher_, *pinnable_val_, resolved);
134
0
  }
135
0
  return status;
136
0
}
137
138
Status GetContext::SaveWideColumnEntityToColumns(const Slice& user_key,
139
                                                 const Slice& entity,
140
0
                                                 Cleanable* value_pinner) {
141
0
  assert(columns_ != nullptr);
142
143
0
  std::vector<WideColumn> entity_columns;
144
0
  std::vector<std::pair<size_t, BlobIndex>> blob_cols;
145
0
  Slice entity_ref = entity;
146
0
  Status status = WideColumnSerialization::DeserializeV2(
147
0
      entity_ref, entity_columns, blob_cols);
148
0
  if (status.ok()) {
149
0
    if (LIKELY(blob_cols.empty())) {
150
0
      return columns_->SetWideColumnValue(entity, value_pinner);
151
0
    }
152
153
    // TODO: Add lazy resolution support for GetEntity point lookups. This
154
    // requires SuperVersion pinning on PinnableWideColumns to keep the
155
    // Version* alive after GetImpl returns. Currently, lazy_column_resolution
156
    // only takes effect for iterators (DBIter path).
157
    //
158
    // Eager path: resolve blob columns inline to avoid intermediate
159
    // std::string copies per blob value. Keep fetched blob values as
160
    // PinnableSlice.
161
0
    std::vector<PinnableSlice> resolved_blob_values(blob_cols.size());
162
0
    for (size_t bi = 0; bi < blob_cols.size() && status.ok(); ++bi) {
163
0
      const BlobIndex& blob_idx = blob_cols[bi].second;
164
0
      if (blob_idx.IsInlined()) {
165
0
        resolved_blob_values[bi].PinSelf(blob_idx.value());
166
0
        continue;
167
0
      }
168
169
0
      status = blob_fetcher_->FetchBlob(
170
0
          user_key, blob_idx, nullptr /* prefetch_buffer */,
171
0
          &resolved_blob_values[bi], nullptr /* bytes_read */);
172
0
    }
173
174
0
    if (status.ok()) {
175
0
      WideColumns result_columns;
176
0
      result_columns.reserve(entity_columns.size());
177
0
      size_t blob_cursor = 0;
178
0
      for (size_t ci = 0; ci < entity_columns.size(); ++ci) {
179
0
        if (blob_cursor < blob_cols.size() &&
180
0
            blob_cols[blob_cursor].first == ci) {
181
0
          result_columns.emplace_back(entity_columns[ci].name(),
182
0
                                      Slice(resolved_blob_values[blob_cursor]));
183
0
          ++blob_cursor;
184
0
        } else {
185
0
          result_columns.emplace_back(entity_columns[ci].name(),
186
0
                                      entity_columns[ci].value());
187
0
        }
188
0
      }
189
190
0
      std::string resolved_entity;
191
0
      status =
192
0
          WideColumnSerialization::Serialize(result_columns, resolved_entity);
193
0
      if (status.ok()) {
194
        // TODO: A combined SerializeAndBuildIndex method could avoid the
195
        // serialize + deserialize round trip inside
196
        // SetWideColumnValue -> CreateIndexForWideColumns.
197
0
        return columns_->SetWideColumnValue(std::move(resolved_entity));
198
0
      }
199
0
    }
200
0
  }
201
0
  return status;
202
0
}
203
204
0
void GetContext::SaveValue(const Slice& value, SequenceNumber /*seq*/) {
205
0
  assert(state_ == kNotFound);
206
0
  assert(ucmp_->timestamp_size() == 0);
207
208
0
  appendToReplayLog(kTypeValue, value, Slice());
209
210
0
  state_ = kFound;
211
0
  if (LIKELY(pinnable_val_ != nullptr)) {
212
0
    pinnable_val_->PinSelf(value);
213
0
  }
214
0
}
215
216
0
void GetContext::ReportCounters() {
217
0
  if (get_context_stats_.num_cache_hit > 0) {
218
0
    RecordTick(statistics_, BLOCK_CACHE_HIT, get_context_stats_.num_cache_hit);
219
0
  }
220
0
  if (get_context_stats_.num_cache_index_hit > 0) {
221
0
    RecordTick(statistics_, BLOCK_CACHE_INDEX_HIT,
222
0
               get_context_stats_.num_cache_index_hit);
223
0
  }
224
0
  if (get_context_stats_.num_cache_data_hit > 0) {
225
0
    RecordTick(statistics_, BLOCK_CACHE_DATA_HIT,
226
0
               get_context_stats_.num_cache_data_hit);
227
0
  }
228
0
  if (get_context_stats_.num_cache_filter_hit > 0) {
229
0
    RecordTick(statistics_, BLOCK_CACHE_FILTER_HIT,
230
0
               get_context_stats_.num_cache_filter_hit);
231
0
  }
232
0
  if (get_context_stats_.num_cache_compression_dict_hit > 0) {
233
0
    RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_HIT,
234
0
               get_context_stats_.num_cache_compression_dict_hit);
235
0
  }
236
0
  if (get_context_stats_.num_cache_index_miss > 0) {
237
0
    RecordTick(statistics_, BLOCK_CACHE_INDEX_MISS,
238
0
               get_context_stats_.num_cache_index_miss);
239
0
  }
240
0
  if (get_context_stats_.num_cache_filter_miss > 0) {
241
0
    RecordTick(statistics_, BLOCK_CACHE_FILTER_MISS,
242
0
               get_context_stats_.num_cache_filter_miss);
243
0
  }
244
0
  if (get_context_stats_.num_cache_data_miss > 0) {
245
0
    RecordTick(statistics_, BLOCK_CACHE_DATA_MISS,
246
0
               get_context_stats_.num_cache_data_miss);
247
0
  }
248
0
  if (get_context_stats_.num_cache_compression_dict_miss > 0) {
249
0
    RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_MISS,
250
0
               get_context_stats_.num_cache_compression_dict_miss);
251
0
  }
252
0
  if (get_context_stats_.num_cache_bytes_read > 0) {
253
0
    RecordTick(statistics_, BLOCK_CACHE_BYTES_READ,
254
0
               get_context_stats_.num_cache_bytes_read);
255
0
  }
256
0
  if (get_context_stats_.num_cache_miss > 0) {
257
0
    RecordTick(statistics_, BLOCK_CACHE_MISS,
258
0
               get_context_stats_.num_cache_miss);
259
0
  }
260
0
  if (get_context_stats_.num_cache_add > 0) {
261
0
    RecordTick(statistics_, BLOCK_CACHE_ADD, get_context_stats_.num_cache_add);
262
0
  }
263
0
  if (get_context_stats_.num_cache_add_redundant > 0) {
264
0
    RecordTick(statistics_, BLOCK_CACHE_ADD_REDUNDANT,
265
0
               get_context_stats_.num_cache_add_redundant);
266
0
  }
267
0
  if (get_context_stats_.num_cache_bytes_write > 0) {
268
0
    RecordTick(statistics_, BLOCK_CACHE_BYTES_WRITE,
269
0
               get_context_stats_.num_cache_bytes_write);
270
0
  }
271
0
  if (get_context_stats_.num_cache_index_add > 0) {
272
0
    RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD,
273
0
               get_context_stats_.num_cache_index_add);
274
0
  }
275
0
  if (get_context_stats_.num_cache_index_add_redundant > 0) {
276
0
    RecordTick(statistics_, BLOCK_CACHE_INDEX_ADD_REDUNDANT,
277
0
               get_context_stats_.num_cache_index_add_redundant);
278
0
  }
279
0
  if (get_context_stats_.num_cache_index_bytes_insert > 0) {
280
0
    RecordTick(statistics_, BLOCK_CACHE_INDEX_BYTES_INSERT,
281
0
               get_context_stats_.num_cache_index_bytes_insert);
282
0
  }
283
0
  if (get_context_stats_.num_cache_data_add > 0) {
284
0
    RecordTick(statistics_, BLOCK_CACHE_DATA_ADD,
285
0
               get_context_stats_.num_cache_data_add);
286
0
  }
287
0
  if (get_context_stats_.num_cache_data_add_redundant > 0) {
288
0
    RecordTick(statistics_, BLOCK_CACHE_DATA_ADD_REDUNDANT,
289
0
               get_context_stats_.num_cache_data_add_redundant);
290
0
  }
291
0
  if (get_context_stats_.num_cache_data_bytes_insert > 0) {
292
0
    RecordTick(statistics_, BLOCK_CACHE_DATA_BYTES_INSERT,
293
0
               get_context_stats_.num_cache_data_bytes_insert);
294
0
  }
295
0
  if (get_context_stats_.num_cache_filter_add > 0) {
296
0
    RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD,
297
0
               get_context_stats_.num_cache_filter_add);
298
0
  }
299
0
  if (get_context_stats_.num_cache_filter_add_redundant > 0) {
300
0
    RecordTick(statistics_, BLOCK_CACHE_FILTER_ADD_REDUNDANT,
301
0
               get_context_stats_.num_cache_filter_add_redundant);
302
0
  }
303
0
  if (get_context_stats_.num_cache_filter_bytes_insert > 0) {
304
0
    RecordTick(statistics_, BLOCK_CACHE_FILTER_BYTES_INSERT,
305
0
               get_context_stats_.num_cache_filter_bytes_insert);
306
0
  }
307
0
  if (get_context_stats_.num_cache_compression_dict_add > 0) {
308
0
    RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD,
309
0
               get_context_stats_.num_cache_compression_dict_add);
310
0
  }
311
0
  if (get_context_stats_.num_cache_compression_dict_add_redundant > 0) {
312
0
    RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_ADD_REDUNDANT,
313
0
               get_context_stats_.num_cache_compression_dict_add_redundant);
314
0
  }
315
0
  if (get_context_stats_.num_cache_compression_dict_bytes_insert > 0) {
316
0
    RecordTick(statistics_, BLOCK_CACHE_COMPRESSION_DICT_BYTES_INSERT,
317
0
               get_context_stats_.num_cache_compression_dict_bytes_insert);
318
0
  }
319
0
}
320
321
bool GetContext::SaveValue(const ParsedInternalKey& parsed_key,
322
                           const Slice& value, bool* matched,
323
1.43k
                           Status* read_status, Cleanable* value_pinner) {
324
1.43k
  assert(matched);
325
1.43k
  assert((state_ != kMerge && parsed_key.type != kTypeMerge) ||
326
1.43k
         merge_context_ != nullptr);
327
1.43k
  if (ucmp_->EqualWithoutTimestamp(parsed_key.user_key, user_key_)) {
328
994
    *matched = true;
329
    // If the value is not in the snapshot, skip it
330
994
    if (!CheckCallback(parsed_key.sequence)) {
331
0
      return true;  // to continue to the next seq
332
0
    }
333
334
994
    if (seq_ != nullptr) {
335
      // Set the sequence number if it is uninitialized
336
0
      if (*seq_ == kMaxSequenceNumber) {
337
0
        *seq_ = parsed_key.sequence;
338
0
      }
339
0
      if (max_covering_tombstone_seq_) {
340
0
        *seq_ = std::max(*seq_, *max_covering_tombstone_seq_);
341
0
      }
342
0
    }
343
344
994
    size_t ts_sz = ucmp_->timestamp_size();
345
994
    Slice ts;
346
347
994
    if (ts_sz > 0) {
348
      // ensure always have ts if cf enables ts.
349
0
      ts = ExtractTimestampFromUserKey(parsed_key.user_key, ts_sz);
350
0
      if (timestamp_ != nullptr) {
351
0
        if (!timestamp_->empty()) {
352
0
          assert(ts_sz == timestamp_->size());
353
          // `timestamp` can be set before `SaveValue` is ever called
354
          // when max_covering_tombstone_seq_ was set.
355
          // If this key has a higher sequence number than range tombstone,
356
          // then timestamp should be updated. `ts_from_rangetombstone_` is
357
          // set to false afterwards so that only the key with highest seqno
358
          // updates the timestamp.
359
0
          if (ts_from_rangetombstone_) {
360
0
            assert(max_covering_tombstone_seq_);
361
0
            if (parsed_key.sequence > *max_covering_tombstone_seq_) {
362
0
              timestamp_->assign(ts.data(), ts.size());
363
0
              ts_from_rangetombstone_ = false;
364
0
            }
365
0
          }
366
0
        }
367
        // TODO optimize for small size ts
368
0
        const std::string kMaxTs(ts_sz, '\xff');
369
0
        if (timestamp_->empty() ||
370
0
            ucmp_->CompareTimestamp(*timestamp_, kMaxTs) == 0) {
371
0
          timestamp_->assign(ts.data(), ts.size());
372
0
        }
373
0
      }
374
0
    }
375
994
    appendToReplayLog(parsed_key.type, value, ts);
376
377
994
    auto type = parsed_key.type;
378
994
    Slice unpacked_value = value;
379
    // Key matches. Process it
380
994
    if ((type == kTypeValue || type == kTypeValuePreferredSeqno ||
381
396
         type == kTypeMerge || type == kTypeBlobIndex ||
382
396
         type == kTypeWideColumnEntity || type == kTypeDeletion ||
383
0
         type == kTypeDeletionWithTimestamp || type == kTypeSingleDeletion) &&
384
994
        max_covering_tombstone_seq_ != nullptr &&
385
994
        *max_covering_tombstone_seq_ > parsed_key.sequence) {
386
      // Note that deletion types are also considered, this is for the case
387
      // when we need to return timestamp to user. If a range tombstone has a
388
      // higher seqno than point tombstone, its timestamp should be returned.
389
0
      type = kTypeRangeDeletion;
390
0
    }
391
994
    switch (type) {
392
598
      case kTypeValue:
393
598
      case kTypeValuePreferredSeqno:
394
598
      case kTypeBlobIndex:
395
598
      case kTypeWideColumnEntity:
396
598
        assert(state_ == kNotFound || state_ == kMerge);
397
598
        if (type == kTypeValuePreferredSeqno) {
398
0
          unpacked_value = ParsePackedValueForValue(value);
399
0
        }
400
598
        if (type == kTypeBlobIndex) {
401
0
          if (is_blob_index_ == nullptr) {
402
            // Blob value not supported. Stop.
403
0
            state_ = kUnexpectedBlobIndex;
404
0
            return false;
405
0
          }
406
0
        }
407
408
598
        if (is_blob_index_ != nullptr) {
409
598
          *is_blob_index_ = (type == kTypeBlobIndex);
410
598
        }
411
412
598
        if (kNotFound == state_) {
413
598
          state_ = kFound;
414
598
          if (do_merge_) {
415
598
            if (type == kTypeBlobIndex && ucmp_->timestamp_size() != 0) {
416
0
              ukey_with_ts_found_.PinSelf(parsed_key.user_key);
417
0
            }
418
598
            if (LIKELY(pinnable_val_ != nullptr)) {
419
598
              Slice value_to_use = unpacked_value;
420
421
598
              if (type == kTypeWideColumnEntity) {
422
0
                const Status s = SaveWideColumnEntityToPinnable(
423
0
                    parsed_key.user_key, unpacked_value, value_pinner);
424
0
                if (!s.ok()) {
425
0
                  if (s.IsIncomplete()) {
426
0
                    MarkKeyMayExist();
427
0
                  } else {
428
0
                    state_ = kCorrupt;
429
0
                  }
430
0
                  return false;
431
0
                }
432
598
              } else {
433
                // Non-entity type
434
598
                if (LIKELY(value_pinner != nullptr)) {
435
                  // If the backing resources for the value are provided, pin
436
                  // them
437
598
                  pinnable_val_->PinSlice(value_to_use, value_pinner);
438
598
                } else {
439
0
                  TEST_SYNC_POINT_CALLBACK("GetContext::SaveValue::PinSelf",
440
0
                                           this);
441
                  // Otherwise copy the value
442
0
                  pinnable_val_->PinSelf(value_to_use);
443
0
                }
444
598
              }
445
598
            } else if (columns_ != nullptr) {
446
0
              if (type == kTypeWideColumnEntity) {
447
0
                const Status s = SaveWideColumnEntityToColumns(
448
0
                    parsed_key.user_key, unpacked_value, value_pinner);
449
0
                if (!s.ok()) {
450
0
                  if (s.IsIncomplete()) {
451
0
                    MarkKeyMayExist();
452
0
                  } else {
453
0
                    state_ = kCorrupt;
454
0
                  }
455
0
                  return false;
456
0
                }
457
0
              } else {
458
0
                columns_->SetPlainValue(unpacked_value, value_pinner);
459
0
              }
460
0
            }
461
598
          } else {
462
            // It means this function is called as part of DB GetMergeOperands
463
            // API and the current value should be part of
464
            // merge_context_->operand_list
465
0
            if (type == kTypeBlobIndex) {
466
0
              PinnableSlice pin_val;
467
0
              if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
468
0
                               read_status) == false) {
469
0
                return false;
470
0
              }
471
0
              Slice blob_value(pin_val);
472
0
              push_operand(blob_value, nullptr);
473
0
            } else if (type == kTypeWideColumnEntity) {
474
0
              Slice value_copy = unpacked_value;
475
0
              Slice value_of_default;
476
477
0
              if (!WideColumnSerialization::GetValueOfDefaultColumn(
478
0
                       value_copy, value_of_default)
479
0
                       .ok()) {
480
0
                state_ = kCorrupt;
481
0
                return false;
482
0
              }
483
484
0
              push_operand(value_of_default, value_pinner);
485
0
            } else {
486
0
              assert(type == kTypeValue || type == kTypeValuePreferredSeqno);
487
0
              push_operand(unpacked_value, value_pinner);
488
0
            }
489
0
          }
490
598
        } else if (kMerge == state_) {
491
0
          assert(merge_operator_ != nullptr);
492
0
          if (type == kTypeBlobIndex) {
493
0
            PinnableSlice pin_val;
494
0
            if (GetBlobValue(parsed_key.user_key, unpacked_value, &pin_val,
495
0
                             read_status) == false) {
496
0
              return false;
497
0
            }
498
0
            Slice blob_value(pin_val);
499
0
            state_ = kFound;
500
0
            if (do_merge_) {
501
0
              MergeWithPlainBaseValue(blob_value);
502
0
            } else {
503
              // It means this function is called as part of DB GetMergeOperands
504
              // API and the current value should be part of
505
              // merge_context_->operand_list
506
0
              push_operand(blob_value, nullptr);
507
0
            }
508
0
          } else if (type == kTypeWideColumnEntity) {
509
0
            state_ = kFound;
510
511
0
            if (do_merge_) {
512
0
              MergeWithWideColumnBaseValue(unpacked_value);
513
0
            } else {
514
              // It means this function is called as part of DB GetMergeOperands
515
              // API and the current value should be part of
516
              // merge_context_->operand_list
517
0
              Slice value_copy = unpacked_value;
518
0
              Slice value_of_default;
519
520
0
              if (!WideColumnSerialization::GetValueOfDefaultColumn(
521
0
                       value_copy, value_of_default)
522
0
                       .ok()) {
523
0
                state_ = kCorrupt;
524
0
                return false;
525
0
              }
526
527
0
              push_operand(value_of_default, value_pinner);
528
0
            }
529
0
          } else {
530
0
            assert(type == kTypeValue || type == kTypeValuePreferredSeqno);
531
532
0
            state_ = kFound;
533
0
            if (do_merge_) {
534
0
              MergeWithPlainBaseValue(unpacked_value);
535
0
            } else {
536
              // It means this function is called as part of DB GetMergeOperands
537
              // API and the current value should be part of
538
              // merge_context_->operand_list
539
0
              push_operand(unpacked_value, value_pinner);
540
0
            }
541
0
          }
542
0
        }
543
598
        return false;
544
545
396
      case kTypeDeletion:
546
396
      case kTypeDeletionWithTimestamp:
547
396
      case kTypeSingleDeletion:
548
396
      case kTypeRangeDeletion:
549
        // TODO(noetzli): Verify correctness once merge of single-deletes
550
        // is supported
551
396
        assert(state_ == kNotFound || state_ == kMerge);
552
396
        if (kNotFound == state_) {
553
396
          state_ = kDeleted;
554
396
        } else if (kMerge == state_) {
555
0
          state_ = kFound;
556
0
          if (do_merge_) {
557
0
            MergeWithNoBaseValue();
558
0
          }
559
          // If do_merge_ = false then the current value shouldn't be part of
560
          // merge_context_->operand_list
561
0
        }
562
396
        return false;
563
564
0
      case kTypeMerge:
565
0
        assert(state_ == kNotFound || state_ == kMerge);
566
0
        state_ = kMerge;
567
        // value_pinner is not set from plain_table_reader.cc for example.
568
0
        push_operand(value, value_pinner);
569
0
        PERF_COUNTER_ADD(internal_merge_point_lookup_count, 1);
570
571
0
        if (do_merge_ && merge_operator_ != nullptr &&
572
0
            merge_operator_->ShouldMerge(
573
0
                merge_context_->GetOperandsDirectionBackward())) {
574
0
          state_ = kFound;
575
0
          MergeWithNoBaseValue();
576
0
          return false;
577
0
        }
578
0
        if (merge_context_->get_merge_operands_options != nullptr &&
579
0
            merge_context_->get_merge_operands_options->continue_cb !=
580
0
                nullptr &&
581
0
            !merge_context_->get_merge_operands_options->continue_cb(value)) {
582
0
          state_ = kFound;
583
0
          return false;
584
0
        }
585
0
        return true;
586
587
0
      default:
588
0
        assert(false);
589
0
        break;
590
994
    }
591
994
  }
592
593
  // state_ could be Corrupt, merge or notfound
594
438
  return false;
595
1.43k
}
596
597
0
void GetContext::PostprocessMerge(const Status& merge_status) {
598
0
  if (!merge_status.ok()) {
599
0
    if (merge_status.subcode() == Status::SubCode::kMergeOperatorFailed) {
600
0
      state_ = kMergeOperatorFailed;
601
0
    } else {
602
0
      state_ = kCorrupt;
603
0
    }
604
0
    return;
605
0
  }
606
607
0
  if (LIKELY(pinnable_val_ != nullptr)) {
608
0
    pinnable_val_->PinSelf();
609
0
  }
610
0
}
611
612
0
void GetContext::MergeWithNoBaseValue() {
613
0
  assert(do_merge_);
614
0
  assert(pinnable_val_ || columns_);
615
0
  assert(!pinnable_val_ || !columns_);
616
617
  // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
618
  // since a failure must be propagated regardless of its value.
619
0
  const Status s = MergeHelper::TimedFullMerge(
620
0
      merge_operator_, user_key_, MergeHelper::kNoBaseValue,
621
0
      merge_context_->GetOperands(), logger_, statistics_, clock_,
622
0
      /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
623
0
      pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
624
0
  PostprocessMerge(s);
625
0
}
626
627
0
void GetContext::MergeWithPlainBaseValue(const Slice& value) {
628
0
  assert(do_merge_);
629
0
  assert(pinnable_val_ || columns_);
630
0
  assert(!pinnable_val_ || !columns_);
631
632
  // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
633
  // since a failure must be propagated regardless of its value.
634
0
  const Status s = MergeHelper::TimedFullMerge(
635
0
      merge_operator_, user_key_, MergeHelper::kPlainBaseValue, value,
636
0
      merge_context_->GetOperands(), logger_, statistics_, clock_,
637
0
      /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
638
0
      pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
639
0
  PostprocessMerge(s);
640
0
}
641
642
0
void GetContext::MergeWithWideColumnBaseValue(const Slice& entity) {
643
0
  assert(do_merge_);
644
0
  assert(pinnable_val_ || columns_);
645
0
  assert(!pinnable_val_ || !columns_);
646
647
  // Resolve V2 entity blob columns if present, since TimedFullMerge only
648
  // supports V1 format.
649
0
  std::string resolved_entity;
650
0
  Slice effective_entity;
651
0
  const Status s_resolve = WideColumnSerialization::ResolveEntityForMerge(
652
0
      entity, user_key_, blob_fetcher_, nullptr /* prefetch_buffers */,
653
0
      resolved_entity, effective_entity);
654
0
  if (!s_resolve.ok()) {
655
0
    if (s_resolve.IsIncomplete()) {
656
0
      MarkKeyMayExist();
657
0
      return;
658
0
    }
659
0
    state_ = kCorrupt;
660
0
    return;
661
0
  }
662
663
  // `op_failure_scope` (an output parameter) is not provided (set to nullptr)
664
  // since a failure must be propagated regardless of its value.
665
0
  const Status s = MergeHelper::TimedFullMerge(
666
0
      merge_operator_, user_key_, MergeHelper::kWideBaseValue, effective_entity,
667
0
      merge_context_->GetOperands(), logger_, statistics_, clock_,
668
0
      /* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
669
0
      pinnable_val_ ? pinnable_val_->GetSelf() : nullptr, columns_);
670
0
  PostprocessMerge(s);
671
0
}
672
673
bool GetContext::GetBlobValue(const Slice& user_key, const Slice& blob_index,
674
0
                              PinnableSlice* blob_value, Status* read_status) {
675
0
  constexpr FilePrefetchBuffer* prefetch_buffer = nullptr;
676
0
  constexpr uint64_t* bytes_read = nullptr;
677
678
0
  *read_status = blob_fetcher_->FetchBlob(user_key, blob_index, prefetch_buffer,
679
0
                                          blob_value, bytes_read);
680
0
  if (!read_status->ok()) {
681
0
    if (read_status->IsIncomplete()) {
682
      // FIXME: this code is not covered by unit tests
683
0
      MarkKeyMayExist();
684
0
      return false;
685
0
    }
686
0
    state_ = kCorrupt;
687
0
    return false;
688
0
  }
689
0
  *is_blob_index_ = false;
690
0
  return true;
691
0
}
692
693
0
void GetContext::push_operand(const Slice& value, Cleanable* value_pinner) {
694
  // TODO(yanqin) preserve timestamps information in merge_context
695
0
  if (pinned_iters_mgr() && pinned_iters_mgr()->PinningEnabled() &&
696
0
      value_pinner != nullptr) {
697
0
    value_pinner->DelegateCleanupsTo(pinned_iters_mgr());
698
0
    merge_context_->PushOperand(value, true /*value_pinned*/);
699
0
  } else {
700
0
    merge_context_->PushOperand(value, false);
701
0
  }
702
0
}
703
704
Status replayGetContextLog(const Slice& replay_log, const Slice& user_key,
705
                           GetContext* get_context, Cleanable* value_pinner,
706
0
                           SequenceNumber seq_no) {
707
0
  Slice s = replay_log;
708
0
  Slice ts;
709
0
  size_t ts_sz = get_context->TimestampSize();
710
0
  bool ret = false;
711
712
0
  while (s.size()) {
713
0
    auto type = static_cast<ValueType>(*s.data());
714
0
    s.remove_prefix(1);
715
0
    Slice value;
716
0
    ret = GetLengthPrefixedSlice(&s, &value);
717
0
    assert(ret);
718
719
0
    bool dont_care __attribute__((__unused__));
720
721
    // Use a copy to prevent modifying user_key. Modification of user_key
722
    // could result to potential cache miss.
723
0
    std::string user_key_str = user_key.ToString();
724
0
    ParsedInternalKey ikey = ParsedInternalKey(user_key_str, seq_no, type);
725
726
    // If ts enabled for current cf, there will always be ts appended after each
727
    // piece of value.
728
0
    if (ts_sz > 0) {
729
0
      ret = GetLengthPrefixedSlice(&s, &ts);
730
0
      assert(ts_sz == ts.size());
731
0
      assert(ret);
732
0
      ikey.SetTimestamp(ts);
733
0
    }
734
735
0
    (void)ret;
736
737
0
    Status read_status;
738
0
    get_context->SaveValue(ikey, value, &dont_care, &read_status, value_pinner);
739
0
    if (!read_status.ok()) {
740
0
      return read_status;
741
0
    }
742
0
  }
743
0
  return Status::OK();
744
0
}
745
746
}  // namespace ROCKSDB_NAMESPACE