Coverage Report

Created: 2026-05-16 07:18

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/version_set_sync_and_async.h
Line
Count
Source
1
//  Copyright (c) Meta Platforms, Inc. and affiliates.
2
//
3
//  This source code is licensed under both the GPLv2 (found in the
4
//  COPYING file in the root directory) and Apache 2.0 License
5
//  (found in the LICENSE.Apache file in the root directory).
6
7
#include "util/coro_utils.h"
8
9
#if defined(WITHOUT_COROUTINES) || \
10
    (defined(USE_COROUTINES) && defined(WITH_COROUTINES))
11
12
namespace ROCKSDB_NAMESPACE {
13
14
// Lookup a batch of keys in a single SST file
15
DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST)
16
(const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level,
17
 bool skip_filters, bool skip_range_deletions, FdWithKeyRange* f,
18
 std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs,
19
 TableCache::TypedHandle* table_handle, uint64_t& num_filter_read,
20
0
 uint64_t& num_index_read, uint64_t& num_sst_read) {
21
0
  bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex &&
22
0
                       get_perf_context()->per_level_perf_context_enabled;
23
24
0
  Status s;
25
0
  StopWatchNano timer(clock_, timer_enabled /* auto_start */);
26
0
  s = CO_AWAIT(table_cache_->MultiGet)(
27
0
      read_options, *internal_comparator(), *f->file_metadata, &file_range,
28
0
      mutable_cf_options_,
29
0
      cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters,
30
0
      skip_range_deletions, hit_file_level, table_handle);
31
  // TODO: examine the behavior for corrupted key
32
0
  if (timer_enabled) {
33
0
    PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(),
34
0
                              hit_file_level);
35
0
  }
36
0
  if (!s.ok()) {
37
    // TODO: Set status for individual keys appropriately
38
0
    for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) {
39
0
      *iter->s = s;
40
0
      file_range.MarkKeyDone(iter);
41
0
    }
42
0
    CO_RETURN s;
43
0
  }
44
0
  uint64_t batch_size = 0;
45
0
  for (auto iter = file_range.begin(); s.ok() && iter != file_range.end();
46
0
       ++iter) {
47
0
    GetContext& get_context = *iter->get_context;
48
0
    Status* status = iter->s;
49
    // The Status in the KeyContext takes precedence over GetContext state
50
    // Status may be an error if there were any IO errors in the table
51
    // reader. We never expect Status to be NotFound(), as that is
52
    // determined by get_context
53
0
    assert(!status->IsNotFound());
54
0
    if (!status->ok()) {
55
0
      file_range.MarkKeyDone(iter);
56
0
      continue;
57
0
    }
58
59
0
    if (get_context.sample()) {
60
0
      sample_file_read_inc(f->file_metadata);
61
0
      if (get_context.State() == GetContext::kNotFound ||
62
0
          get_context.State() == GetContext::kMerge ||
63
0
          get_context.State() == GetContext::kDeleted) {
64
0
        sample_collapsible_entry_file_read_inc(f->file_metadata);
65
0
      }
66
0
    }
67
0
    batch_size++;
68
0
    num_index_read += get_context.get_context_stats_.num_index_read;
69
0
    num_filter_read += get_context.get_context_stats_.num_filter_read;
70
0
    num_sst_read += get_context.get_context_stats_.num_sst_read;
71
    // Reset these stats since they're specific to a level
72
0
    get_context.get_context_stats_.num_index_read = 0;
73
0
    get_context.get_context_stats_.num_filter_read = 0;
74
0
    get_context.get_context_stats_.num_sst_read = 0;
75
76
    // report the counters before returning
77
0
    if (get_context.State() != GetContext::kNotFound &&
78
0
        get_context.State() != GetContext::kMerge &&
79
0
        db_statistics_ != nullptr) {
80
0
      get_context.ReportCounters();
81
0
    } else {
82
0
      if (iter->max_covering_tombstone_seq > 0) {
83
        // The remaining files we look at will only contain covered keys, so
84
        // we stop here for this key
85
0
        file_range.SkipKey(iter);
86
0
      }
87
0
    }
88
0
    switch (get_context.State()) {
89
0
      case GetContext::kNotFound:
90
        // Keep searching in other files
91
0
        break;
92
0
      case GetContext::kMerge:
93
        // TODO: update per-level perfcontext user_key_return_count for kMerge
94
0
        break;
95
0
      case GetContext::kFound:
96
0
        if (hit_file_level == 0) {
97
0
          RecordTick(db_statistics_, GET_HIT_L0);
98
0
        } else if (hit_file_level == 1) {
99
0
          RecordTick(db_statistics_, GET_HIT_L1);
100
0
        } else if (hit_file_level >= 2) {
101
0
          RecordTick(db_statistics_, GET_HIT_L2_AND_UP);
102
0
        }
103
104
0
        PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level);
105
106
0
        file_range.MarkKeyDone(iter);
107
108
0
        if (iter->is_blob_index) {
109
0
          BlobIndex blob_index;
110
0
          Status tmp_s;
111
112
0
          if (iter->value) {
113
0
            TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex",
114
0
                                     &(*iter));
115
116
0
            tmp_s = blob_index.DecodeFrom(*(iter->value));
117
118
0
          } else {
119
0
            assert(iter->columns);
120
121
0
            tmp_s = blob_index.DecodeFrom(
122
0
                WideColumnsHelper::GetDefaultColumn(iter->columns->columns()));
123
0
          }
124
125
0
          if (tmp_s.ok()) {
126
0
            const uint64_t blob_file_num = blob_index.file_number();
127
0
            blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter);
128
0
          } else {
129
0
            *(iter->s) = tmp_s;
130
0
          }
131
0
        } else {
132
0
          if (iter->value) {
133
0
            file_range.AddValueSize(iter->value->size());
134
0
          } else {
135
0
            assert(iter->columns);
136
0
            file_range.AddValueSize(iter->columns->serialized_size());
137
0
          }
138
139
0
          if (file_range.GetValueSize() > read_options.value_size_soft_limit) {
140
0
            s = Status::Aborted();
141
0
            break;
142
0
          }
143
0
        }
144
0
        continue;
145
0
      case GetContext::kDeleted:
146
        // Use empty error message for speed
147
0
        *status = Status::NotFound();
148
0
        file_range.MarkKeyDone(iter);
149
0
        continue;
150
0
      case GetContext::kCorrupt:
151
0
        *status =
152
0
            Status::Corruption("corrupted key for ", iter->lkey->user_key());
153
0
        file_range.MarkKeyDone(iter);
154
0
        continue;
155
0
      case GetContext::kUnexpectedBlobIndex:
156
0
        ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index.");
157
0
        *status = Status::NotSupported(
158
0
            "Encounter unexpected blob index. Please open DB with "
159
0
            "ROCKSDB_NAMESPACE::blob_db::BlobDB instead.");
160
0
        file_range.MarkKeyDone(iter);
161
0
        continue;
162
0
      case GetContext::kMergeOperatorFailed:
163
0
        *status = Status::Corruption(Status::SubCode::kMergeOperatorFailed);
164
0
        file_range.MarkKeyDone(iter);
165
0
        continue;
166
0
    }
167
0
  }
168
169
0
  RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size);
170
0
  CO_RETURN s;
171
0
}
172
}  // namespace ROCKSDB_NAMESPACE
173
#endif