/src/rocksdb/db/version_set_sync_and_async.h
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | |
7 | | #include "util/coro_utils.h" |
8 | | |
9 | | #if defined(WITHOUT_COROUTINES) || \ |
10 | | (defined(USE_COROUTINES) && defined(WITH_COROUTINES)) |
11 | | |
12 | | namespace ROCKSDB_NAMESPACE { |
13 | | |
14 | | // Lookup a batch of keys in a single SST file |
15 | | DEFINE_SYNC_AND_ASYNC(Status, Version::MultiGetFromSST) |
16 | | (const ReadOptions& read_options, MultiGetRange file_range, int hit_file_level, |
17 | | bool skip_filters, bool skip_range_deletions, FdWithKeyRange* f, |
18 | | std::unordered_map<uint64_t, BlobReadContexts>& blob_ctxs, |
19 | | TableCache::TypedHandle* table_handle, uint64_t& num_filter_read, |
20 | 0 | uint64_t& num_index_read, uint64_t& num_sst_read) { |
21 | 0 | bool timer_enabled = GetPerfLevel() >= PerfLevel::kEnableTimeExceptForMutex && |
22 | 0 | get_perf_context()->per_level_perf_context_enabled; |
23 | |
|
24 | 0 | Status s; |
25 | 0 | StopWatchNano timer(clock_, timer_enabled /* auto_start */); |
26 | 0 | s = CO_AWAIT(table_cache_->MultiGet)( |
27 | 0 | read_options, *internal_comparator(), *f->file_metadata, &file_range, |
28 | 0 | mutable_cf_options_, |
29 | 0 | cfd_->internal_stats()->GetFileReadHist(hit_file_level), skip_filters, |
30 | 0 | skip_range_deletions, hit_file_level, table_handle); |
31 | | // TODO: examine the behavior for corrupted key |
32 | 0 | if (timer_enabled) { |
33 | 0 | PERF_COUNTER_BY_LEVEL_ADD(get_from_table_nanos, timer.ElapsedNanos(), |
34 | 0 | hit_file_level); |
35 | 0 | } |
36 | 0 | if (!s.ok()) { |
37 | | // TODO: Set status for individual keys appropriately |
38 | 0 | for (auto iter = file_range.begin(); iter != file_range.end(); ++iter) { |
39 | 0 | *iter->s = s; |
40 | 0 | file_range.MarkKeyDone(iter); |
41 | 0 | } |
42 | 0 | CO_RETURN s; |
43 | 0 | } |
44 | 0 | uint64_t batch_size = 0; |
45 | 0 | for (auto iter = file_range.begin(); s.ok() && iter != file_range.end(); |
46 | 0 | ++iter) { |
47 | 0 | GetContext& get_context = *iter->get_context; |
48 | 0 | Status* status = iter->s; |
49 | | // The Status in the KeyContext takes precedence over GetContext state |
50 | | // Status may be an error if there were any IO errors in the table |
51 | | // reader. We never expect Status to be NotFound(), as that is |
52 | | // determined by get_context |
53 | 0 | assert(!status->IsNotFound()); |
54 | 0 | if (!status->ok()) { |
55 | 0 | file_range.MarkKeyDone(iter); |
56 | 0 | continue; |
57 | 0 | } |
58 | | |
59 | 0 | if (get_context.sample()) { |
60 | 0 | sample_file_read_inc(f->file_metadata); |
61 | 0 | if (get_context.State() == GetContext::kNotFound || |
62 | 0 | get_context.State() == GetContext::kMerge || |
63 | 0 | get_context.State() == GetContext::kDeleted) { |
64 | 0 | sample_collapsible_entry_file_read_inc(f->file_metadata); |
65 | 0 | } |
66 | 0 | } |
67 | 0 | batch_size++; |
68 | 0 | num_index_read += get_context.get_context_stats_.num_index_read; |
69 | 0 | num_filter_read += get_context.get_context_stats_.num_filter_read; |
70 | 0 | num_sst_read += get_context.get_context_stats_.num_sst_read; |
71 | | // Reset these stats since they're specific to a level |
72 | 0 | get_context.get_context_stats_.num_index_read = 0; |
73 | 0 | get_context.get_context_stats_.num_filter_read = 0; |
74 | 0 | get_context.get_context_stats_.num_sst_read = 0; |
75 | | |
76 | | // report the counters before returning |
77 | 0 | if (get_context.State() != GetContext::kNotFound && |
78 | 0 | get_context.State() != GetContext::kMerge && |
79 | 0 | db_statistics_ != nullptr) { |
80 | 0 | get_context.ReportCounters(); |
81 | 0 | } else { |
82 | 0 | if (iter->max_covering_tombstone_seq > 0) { |
83 | | // The remaining files we look at will only contain covered keys, so |
84 | | // we stop here for this key |
85 | 0 | file_range.SkipKey(iter); |
86 | 0 | } |
87 | 0 | } |
88 | 0 | switch (get_context.State()) { |
89 | 0 | case GetContext::kNotFound: |
90 | | // Keep searching in other files |
91 | 0 | break; |
92 | 0 | case GetContext::kMerge: |
93 | | // TODO: update per-level perfcontext user_key_return_count for kMerge |
94 | 0 | break; |
95 | 0 | case GetContext::kFound: |
96 | 0 | if (hit_file_level == 0) { |
97 | 0 | RecordTick(db_statistics_, GET_HIT_L0); |
98 | 0 | } else if (hit_file_level == 1) { |
99 | 0 | RecordTick(db_statistics_, GET_HIT_L1); |
100 | 0 | } else if (hit_file_level >= 2) { |
101 | 0 | RecordTick(db_statistics_, GET_HIT_L2_AND_UP); |
102 | 0 | } |
103 | |
|
104 | 0 | PERF_COUNTER_BY_LEVEL_ADD(user_key_return_count, 1, hit_file_level); |
105 | |
|
106 | 0 | file_range.MarkKeyDone(iter); |
107 | |
|
108 | 0 | if (iter->is_blob_index) { |
109 | 0 | BlobIndex blob_index; |
110 | 0 | Status tmp_s; |
111 | |
|
112 | 0 | if (iter->value) { |
113 | 0 | TEST_SYNC_POINT_CALLBACK("Version::MultiGet::TamperWithBlobIndex", |
114 | 0 | &(*iter)); |
115 | |
|
116 | 0 | tmp_s = blob_index.DecodeFrom(*(iter->value)); |
117 | |
|
118 | 0 | } else { |
119 | 0 | assert(iter->columns); |
120 | |
|
121 | 0 | tmp_s = blob_index.DecodeFrom( |
122 | 0 | WideColumnsHelper::GetDefaultColumn(iter->columns->columns())); |
123 | 0 | } |
124 | |
|
125 | 0 | if (tmp_s.ok()) { |
126 | 0 | const uint64_t blob_file_num = blob_index.file_number(); |
127 | 0 | blob_ctxs[blob_file_num].emplace_back(blob_index, &*iter); |
128 | 0 | } else { |
129 | 0 | *(iter->s) = tmp_s; |
130 | 0 | } |
131 | 0 | } else { |
132 | 0 | if (iter->value) { |
133 | 0 | file_range.AddValueSize(iter->value->size()); |
134 | 0 | } else { |
135 | 0 | assert(iter->columns); |
136 | 0 | file_range.AddValueSize(iter->columns->serialized_size()); |
137 | 0 | } |
138 | |
|
139 | 0 | if (file_range.GetValueSize() > read_options.value_size_soft_limit) { |
140 | 0 | s = Status::Aborted(); |
141 | 0 | break; |
142 | 0 | } |
143 | 0 | } |
144 | 0 | continue; |
145 | 0 | case GetContext::kDeleted: |
146 | | // Use empty error message for speed |
147 | 0 | *status = Status::NotFound(); |
148 | 0 | file_range.MarkKeyDone(iter); |
149 | 0 | continue; |
150 | 0 | case GetContext::kCorrupt: |
151 | 0 | *status = |
152 | 0 | Status::Corruption("corrupted key for ", iter->lkey->user_key()); |
153 | 0 | file_range.MarkKeyDone(iter); |
154 | 0 | continue; |
155 | 0 | case GetContext::kUnexpectedBlobIndex: |
156 | 0 | ROCKS_LOG_ERROR(info_log_, "Encounter unexpected blob index."); |
157 | 0 | *status = Status::NotSupported( |
158 | 0 | "Encounter unexpected blob index. Please open DB with " |
159 | 0 | "ROCKSDB_NAMESPACE::blob_db::BlobDB instead."); |
160 | 0 | file_range.MarkKeyDone(iter); |
161 | 0 | continue; |
162 | 0 | case GetContext::kMergeOperatorFailed: |
163 | 0 | *status = Status::Corruption(Status::SubCode::kMergeOperatorFailed); |
164 | 0 | file_range.MarkKeyDone(iter); |
165 | 0 | continue; |
166 | 0 | } |
167 | 0 | } |
168 | | |
169 | 0 | RecordInHistogram(db_statistics_, SST_BATCH_SIZE, batch_size); |
170 | 0 | CO_RETURN s; |
171 | 0 | } |
172 | | } // namespace ROCKSDB_NAMESPACE |
173 | | #endif |