/src/rocksdb/db/merge_helper.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/merge_helper.h" |
7 | | |
8 | | #include <string> |
9 | | |
10 | | #include "db/blob/blob_fetcher.h" |
11 | | #include "db/blob/blob_index.h" |
12 | | #include "db/blob/prefetch_buffer_collection.h" |
13 | | #include "db/compaction/compaction_iteration_stats.h" |
14 | | #include "db/dbformat.h" |
15 | | #include "db/wide/wide_columns_helper.h" |
16 | | #include "logging/logging.h" |
17 | | #include "monitoring/perf_context_imp.h" |
18 | | #include "monitoring/statistics_impl.h" |
19 | | #include "port/likely.h" |
20 | | #include "rocksdb/comparator.h" |
21 | | #include "rocksdb/db.h" |
22 | | #include "rocksdb/merge_operator.h" |
23 | | #include "rocksdb/system_clock.h" |
24 | | #include "table/format.h" |
25 | | #include "table/internal_iterator.h" |
26 | | #include "util/overload.h" |
27 | | |
28 | | namespace ROCKSDB_NAMESPACE { |
29 | | |
30 | | MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator, |
31 | | const MergeOperator* user_merge_operator, |
32 | | const CompactionFilter* compaction_filter, |
33 | | Logger* logger, bool assert_valid_internal_key, |
34 | | SequenceNumber latest_snapshot, |
35 | | const SnapshotChecker* snapshot_checker, int level, |
36 | | Statistics* stats, |
37 | | const std::atomic<bool>* shutting_down) |
38 | | : env_(env), |
39 | | clock_(env->GetSystemClock().get()), |
40 | | user_comparator_(user_comparator), |
41 | | user_merge_operator_(user_merge_operator), |
42 | | compaction_filter_(compaction_filter), |
43 | | shutting_down_(shutting_down), |
44 | | logger_(logger), |
45 | | assert_valid_internal_key_(assert_valid_internal_key), |
46 | | allow_single_operand_(false), |
47 | | latest_snapshot_(latest_snapshot), |
48 | | snapshot_checker_(snapshot_checker), |
49 | | level_(level), |
50 | | keys_(), |
51 | | filter_timer_(clock_), |
52 | | total_filter_time_(0U), |
53 | 7.15k | stats_(stats) { |
54 | 7.15k | assert(user_comparator_ != nullptr); |
55 | 7.15k | if (user_merge_operator_) { |
56 | 0 | allow_single_operand_ = user_merge_operator_->AllowSingleOperand(); |
57 | 0 | } |
58 | 7.15k | } |
59 | | |
60 | | template <typename Visitor> |
61 | | Status MergeHelper::TimedFullMergeCommonImpl( |
62 | | const MergeOperator* merge_operator, const Slice& key, |
63 | | MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value, |
64 | | const std::vector<Slice>& operands, Logger* logger, Statistics* statistics, |
65 | | SystemClock* clock, bool update_num_ops_stats, |
66 | 0 | MergeOperator::OpFailureScope* op_failure_scope, Visitor&& visitor) { |
67 | 0 | assert(merge_operator); |
68 | 0 | assert(!operands.empty()); |
69 | |
|
70 | 0 | if (update_num_ops_stats) { |
71 | 0 | RecordInHistogram(statistics, READ_NUM_MERGE_OPERANDS, |
72 | 0 | static_cast<uint64_t>(operands.size())); |
73 | 0 | } |
74 | |
|
75 | 0 | const MergeOperator::MergeOperationInputV3 merge_in( |
76 | 0 | key, std::move(existing_value), operands, logger); |
77 | 0 | MergeOperator::MergeOperationOutputV3 merge_out; |
78 | |
|
79 | 0 | bool success = false; |
80 | |
|
81 | 0 | { |
82 | 0 | StopWatchNano timer(clock, statistics != nullptr); |
83 | 0 | PERF_TIMER_GUARD(merge_operator_time_nanos); |
84 | |
|
85 | 0 | success = merge_operator->FullMergeV3(merge_in, &merge_out); |
86 | |
|
87 | 0 | RecordTick(statistics, MERGE_OPERATION_TOTAL_TIME, |
88 | 0 | statistics ? timer.ElapsedNanos() : 0); |
89 | 0 | } |
90 | |
|
91 | 0 | if (!success) { |
92 | 0 | RecordTick(statistics, NUMBER_MERGE_FAILURES); |
93 | |
|
94 | 0 | if (op_failure_scope) { |
95 | 0 | *op_failure_scope = merge_out.op_failure_scope; |
96 | | // Apply default per merge_operator.h |
97 | 0 | if (*op_failure_scope == MergeOperator::OpFailureScope::kDefault) { |
98 | 0 | *op_failure_scope = MergeOperator::OpFailureScope::kTryMerge; |
99 | 0 | } |
100 | 0 | } |
101 | |
|
102 | 0 | return Status::Corruption(Status::SubCode::kMergeOperatorFailed); |
103 | 0 | } |
104 | | |
105 | 0 | return std::visit(std::forward<Visitor>(visitor), |
106 | 0 | std::move(merge_out.new_value)); |
107 | 0 | } Unexecuted instantiation: merge_helper.cc:rocksdb::Status rocksdb::MergeHelper::TimedFullMergeCommonImpl<rocksdb::overload<rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)::$_0, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)::$_1, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)::$_2> >(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, rocksdb::overload<rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)::$_0, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)::$_1, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::Slice*, rocksdb::ValueType*)::$_2>&&) Unexecuted instantiation: merge_helper.cc:rocksdb::Status rocksdb::MergeHelper::TimedFullMergeCommonImpl<rocksdb::overload<rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)::$_0, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)::$_1, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)::$_2> >(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, rocksdb::overload<rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)::$_0, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)::$_1, rocksdb::MergeHelper::TimedFullMergeImpl(rocksdb::MergeOperator const*, rocksdb::Slice const&, std::__1::variant<std::__1::monostate, rocksdb::Slice, std::__1::vector<rocksdb::WideColumn, std::__1::allocator<rocksdb::WideColumn> > >&&, std::__1::vector<rocksdb::Slice, std::__1::allocator<rocksdb::Slice> > const&, rocksdb::Logger*, rocksdb::Statistics*, rocksdb::SystemClock*, bool, rocksdb::MergeOperator::OpFailureScope*, std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >*, rocksdb::PinnableWideColumns*)::$_2>&&) |
108 | | |
109 | | Status MergeHelper::TimedFullMergeImpl( |
110 | | const MergeOperator* merge_operator, const Slice& key, |
111 | | MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value, |
112 | | const std::vector<Slice>& operands, Logger* logger, Statistics* statistics, |
113 | | SystemClock* clock, bool update_num_ops_stats, |
114 | | MergeOperator::OpFailureScope* op_failure_scope, std::string* result, |
115 | 0 | Slice* result_operand, ValueType* result_type) { |
116 | 0 | assert(result); |
117 | 0 | assert(result_type); |
118 | |
|
119 | 0 | auto visitor = overload{ |
120 | 0 | [&](std::string&& new_value) -> Status { |
121 | 0 | *result_type = kTypeValue; |
122 | |
|
123 | 0 | if (result_operand) { |
124 | 0 | *result_operand = Slice(nullptr, 0); |
125 | 0 | } |
126 | |
|
127 | 0 | *result = std::move(new_value); |
128 | |
|
129 | 0 | return Status::OK(); |
130 | 0 | }, |
131 | 0 | [&](MergeOperator::MergeOperationOutputV3::NewColumns&& new_columns) |
132 | 0 | -> Status { |
133 | 0 | *result_type = kTypeWideColumnEntity; |
134 | |
|
135 | 0 | if (result_operand) { |
136 | 0 | *result_operand = Slice(nullptr, 0); |
137 | 0 | } |
138 | |
|
139 | 0 | result->clear(); |
140 | |
|
141 | 0 | WideColumns sorted_columns; |
142 | 0 | sorted_columns.reserve(new_columns.size()); |
143 | |
|
144 | 0 | for (const auto& column : new_columns) { |
145 | 0 | sorted_columns.emplace_back(column.first, column.second); |
146 | 0 | } |
147 | |
|
148 | 0 | WideColumnsHelper::SortColumns(sorted_columns); |
149 | |
|
150 | 0 | return WideColumnSerialization::Serialize(sorted_columns, *result); |
151 | 0 | }, |
152 | 0 | [&](Slice&& operand) -> Status { |
153 | 0 | *result_type = kTypeValue; |
154 | |
|
155 | 0 | if (result_operand) { |
156 | 0 | *result_operand = operand; |
157 | 0 | result->clear(); |
158 | 0 | } else { |
159 | 0 | result->assign(operand.data(), operand.size()); |
160 | 0 | } |
161 | |
|
162 | 0 | return Status::OK(); |
163 | 0 | }}; |
164 | |
|
165 | 0 | return TimedFullMergeCommonImpl(merge_operator, key, |
166 | 0 | std::move(existing_value), operands, logger, |
167 | 0 | statistics, clock, update_num_ops_stats, |
168 | 0 | op_failure_scope, std::move(visitor)); |
169 | 0 | } |
170 | | |
171 | | Status MergeHelper::TimedFullMergeImpl( |
172 | | const MergeOperator* merge_operator, const Slice& key, |
173 | | MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value, |
174 | | const std::vector<Slice>& operands, Logger* logger, Statistics* statistics, |
175 | | SystemClock* clock, bool update_num_ops_stats, |
176 | | MergeOperator::OpFailureScope* op_failure_scope, std::string* result_value, |
177 | 0 | PinnableWideColumns* result_entity) { |
178 | 0 | assert(result_value || result_entity); |
179 | 0 | assert(!result_value || !result_entity); |
180 | |
|
181 | 0 | auto visitor = overload{ |
182 | 0 | [&](std::string&& new_value) -> Status { |
183 | 0 | if (result_value) { |
184 | 0 | *result_value = std::move(new_value); |
185 | |
|
186 | 0 | return Status::OK(); |
187 | 0 | } |
188 | | |
189 | 0 | assert(result_entity); |
190 | 0 | result_entity->SetPlainValue(std::move(new_value)); |
191 | |
|
192 | 0 | return Status::OK(); |
193 | 0 | }, |
194 | 0 | [&](MergeOperator::MergeOperationOutputV3::NewColumns&& new_columns) |
195 | 0 | -> Status { |
196 | 0 | if (result_value) { |
197 | 0 | if (!new_columns.empty() && |
198 | 0 | new_columns.front().first == kDefaultWideColumnName) { |
199 | 0 | *result_value = std::move(new_columns.front().second); |
200 | 0 | } else { |
201 | 0 | result_value->clear(); |
202 | 0 | } |
203 | |
|
204 | 0 | return Status::OK(); |
205 | 0 | } |
206 | | |
207 | 0 | assert(result_entity); |
208 | |
|
209 | 0 | WideColumns sorted_columns; |
210 | 0 | sorted_columns.reserve(new_columns.size()); |
211 | |
|
212 | 0 | for (const auto& column : new_columns) { |
213 | 0 | sorted_columns.emplace_back(column.first, column.second); |
214 | 0 | } |
215 | |
|
216 | 0 | WideColumnsHelper::SortColumns(sorted_columns); |
217 | |
|
218 | 0 | std::string result; |
219 | 0 | const Status s = |
220 | 0 | WideColumnSerialization::Serialize(sorted_columns, result); |
221 | 0 | if (!s.ok()) { |
222 | 0 | result_entity->Reset(); |
223 | 0 | return s; |
224 | 0 | } |
225 | | |
226 | 0 | return result_entity->SetWideColumnValue(std::move(result)); |
227 | 0 | }, |
228 | 0 | [&](Slice&& operand) -> Status { |
229 | 0 | if (result_value) { |
230 | 0 | result_value->assign(operand.data(), operand.size()); |
231 | |
|
232 | 0 | return Status::OK(); |
233 | 0 | } |
234 | | |
235 | 0 | assert(result_entity); |
236 | 0 | result_entity->SetPlainValue(operand); |
237 | |
|
238 | 0 | return Status::OK(); |
239 | 0 | }}; |
240 | |
|
241 | 0 | return TimedFullMergeCommonImpl(merge_operator, key, |
242 | 0 | std::move(existing_value), operands, logger, |
243 | 0 | statistics, clock, update_num_ops_stats, |
244 | 0 | op_failure_scope, std::move(visitor)); |
245 | 0 | } |
246 | | |
247 | | // PRE: iter points to the first merge type entry |
248 | | // POST: iter points to the first entry beyond the merge process (or the end) |
249 | | // keys_, operands_ are updated to reflect the merge result. |
250 | | // keys_ stores the list of keys encountered while merging. |
251 | | // operands_ stores the list of merge operands encountered while merging. |
252 | | // keys_[i] corresponds to operands_[i] for each i. |
253 | | // |
254 | | // TODO: Avoid the snapshot stripe map lookup in CompactionRangeDelAggregator |
255 | | // and just pass the StripeRep corresponding to the stripe being merged. |
256 | | Status MergeHelper::MergeUntil(InternalIterator* iter, |
257 | | CompactionRangeDelAggregator* range_del_agg, |
258 | | const SequenceNumber stop_before, |
259 | | const bool at_bottom, |
260 | | const bool allow_data_in_errors, |
261 | | const BlobFetcher* blob_fetcher, |
262 | | const std::string* const full_history_ts_low, |
263 | | PrefetchBufferCollection* prefetch_buffers, |
264 | 0 | CompactionIterationStats* c_iter_stats) { |
265 | | // Get a copy of the internal key, before it's invalidated by iter->Next() |
266 | | // Also maintain the list of merge operands seen. |
267 | 0 | assert(HasOperator()); |
268 | 0 | keys_.clear(); |
269 | 0 | merge_context_.Clear(); |
270 | 0 | has_compaction_filter_skip_until_ = false; |
271 | 0 | assert(user_merge_operator_); |
272 | 0 | assert(user_comparator_); |
273 | 0 | const size_t ts_sz = user_comparator_->timestamp_size(); |
274 | 0 | if (full_history_ts_low) { |
275 | 0 | assert(ts_sz > 0); |
276 | 0 | assert(ts_sz == full_history_ts_low->size()); |
277 | 0 | } |
278 | 0 | bool first_key = true; |
279 | | |
280 | | // We need to parse the internal key again as the parsed key is |
281 | | // backed by the internal key! |
282 | | // Assume no internal key corruption as it has been successfully parsed |
283 | | // by the caller. |
284 | | // original_key_is_iter variable is just caching the information: |
285 | | // original_key_is_iter == (iter->key().ToString() == original_key) |
286 | 0 | bool original_key_is_iter = true; |
287 | 0 | std::string original_key = iter->key().ToString(); |
288 | | // Important: |
289 | | // orig_ikey is backed by original_key if keys_.empty() |
290 | | // orig_ikey is backed by keys_.back() if !keys_.empty() |
291 | 0 | ParsedInternalKey orig_ikey; |
292 | |
|
293 | 0 | Status s = ParseInternalKey(original_key, &orig_ikey, allow_data_in_errors); |
294 | 0 | assert(s.ok()); |
295 | 0 | if (!s.ok()) { |
296 | 0 | return s; |
297 | 0 | } |
298 | | |
299 | 0 | assert(kTypeMerge == orig_ikey.type); |
300 | |
|
301 | 0 | bool hit_the_next_user_key = false; |
302 | 0 | int cmp_with_full_history_ts_low = 0; |
303 | 0 | for (; iter->Valid(); iter->Next(), original_key_is_iter = false) { |
304 | 0 | if (IsShuttingDown()) { |
305 | 0 | s = Status::ShutdownInProgress(); |
306 | 0 | return s; |
307 | 0 | } |
308 | | // Skip range tombstones emitted by the compaction iterator. |
309 | 0 | if (iter->IsDeleteRangeSentinelKey()) { |
310 | 0 | continue; |
311 | 0 | } |
312 | | |
313 | 0 | ParsedInternalKey ikey; |
314 | 0 | assert(keys_.size() == merge_context_.GetNumOperands()); |
315 | |
|
316 | 0 | Status pik_status = |
317 | 0 | ParseInternalKey(iter->key(), &ikey, allow_data_in_errors); |
318 | 0 | Slice ts; |
319 | 0 | if (pik_status.ok()) { |
320 | 0 | ts = ExtractTimestampFromUserKey(ikey.user_key, ts_sz); |
321 | 0 | if (full_history_ts_low) { |
322 | 0 | cmp_with_full_history_ts_low = |
323 | 0 | user_comparator_->CompareTimestamp(ts, *full_history_ts_low); |
324 | 0 | } |
325 | 0 | } |
326 | 0 | if (!pik_status.ok()) { |
327 | | // stop at corrupted key |
328 | 0 | if (assert_valid_internal_key_) { |
329 | 0 | return pik_status; |
330 | 0 | } |
331 | 0 | break; |
332 | 0 | } else if (first_key) { |
333 | | // If user-defined timestamp is enabled, we expect both user key and |
334 | | // timestamps are equal, as a sanity check. |
335 | 0 | assert(user_comparator_->Equal(ikey.user_key, orig_ikey.user_key)); |
336 | 0 | first_key = false; |
337 | 0 | } else if (!user_comparator_->EqualWithoutTimestamp(ikey.user_key, |
338 | 0 | orig_ikey.user_key) || |
339 | 0 | (ts_sz > 0 && |
340 | 0 | !user_comparator_->Equal(ikey.user_key, orig_ikey.user_key) && |
341 | 0 | cmp_with_full_history_ts_low >= 0)) { |
342 | | // 1) hit a different user key, or |
343 | | // 2) user-defined timestamp is enabled, and hit a version of user key NOT |
344 | | // eligible for GC, then stop right here. |
345 | 0 | hit_the_next_user_key = true; |
346 | 0 | break; |
347 | 0 | } else if (stop_before > 0 && ikey.sequence <= stop_before && |
348 | 0 | LIKELY(snapshot_checker_ == nullptr || |
349 | 0 | snapshot_checker_->CheckInSnapshot(ikey.sequence, |
350 | 0 | stop_before) != |
351 | 0 | SnapshotCheckerResult::kNotInSnapshot)) { |
352 | | // hit an entry that's possibly visible by the previous snapshot, can't |
353 | | // touch that |
354 | 0 | break; |
355 | 0 | } |
356 | | |
357 | | // At this point we are guaranteed that we need to process this key. |
358 | | |
359 | 0 | assert(IsValueType(ikey.type)); |
360 | 0 | if (ikey.type != kTypeMerge) { |
361 | | // hit a put/delete/single delete |
362 | | // => merge the put value or a nullptr with operands_ |
363 | | // => store result in operands_.back() (and update keys_.back()) |
364 | | // => change the entry type for keys_.back() |
365 | | // We are done! Success! |
366 | | |
367 | | // If there are no operands, just return the Status::OK(). That will cause |
368 | | // the compaction iterator to write out the key we're currently at, which |
369 | | // is the put/delete we just encountered. |
370 | 0 | if (keys_.empty()) { |
371 | 0 | return s; |
372 | 0 | } |
373 | | |
374 | | // TODO: if we're in compaction and it's a put, it would be nice to run |
375 | | // compaction filter on it. |
376 | 0 | std::string merge_result; |
377 | 0 | ValueType merge_result_type; |
378 | 0 | MergeOperator::OpFailureScope op_failure_scope; |
379 | |
|
380 | 0 | if (range_del_agg && |
381 | 0 | range_del_agg->ShouldDelete( |
382 | 0 | ikey, RangeDelPositioningMode::kForwardTraversal)) { |
383 | 0 | s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue, |
384 | 0 | merge_context_.GetOperands(), logger_, stats_, |
385 | 0 | clock_, /* update_num_ops_stats */ false, |
386 | 0 | &op_failure_scope, &merge_result, |
387 | 0 | /* result_operand */ nullptr, &merge_result_type); |
388 | 0 | } else if (ikey.type == kTypeValue) { |
389 | 0 | s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue, |
390 | 0 | iter->value(), merge_context_.GetOperands(), logger_, |
391 | 0 | stats_, clock_, /* update_num_ops_stats */ false, |
392 | 0 | &op_failure_scope, &merge_result, |
393 | 0 | /* result_operand */ nullptr, &merge_result_type); |
394 | 0 | } else if (ikey.type == kTypeValuePreferredSeqno) { |
395 | | // When a TimedPut is merged with some merge operands, its original |
396 | | // write time info is obsolete and removed, and the merge result is a |
397 | | // kTypeValue. |
398 | 0 | Slice unpacked_value = ParsePackedValueForValue(iter->value()); |
399 | 0 | s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue, |
400 | 0 | unpacked_value, merge_context_.GetOperands(), |
401 | 0 | logger_, stats_, clock_, |
402 | 0 | /* update_num_ops_stats */ false, &op_failure_scope, |
403 | 0 | &merge_result, |
404 | 0 | /* result_operand */ nullptr, &merge_result_type); |
405 | |
|
406 | 0 | } else if (ikey.type == kTypeBlobIndex) { |
407 | 0 | BlobIndex blob_index; |
408 | |
|
409 | 0 | s = blob_index.DecodeFrom(iter->value()); |
410 | 0 | if (!s.ok()) { |
411 | 0 | return s; |
412 | 0 | } |
413 | | |
414 | 0 | FilePrefetchBuffer* prefetch_buffer = |
415 | 0 | prefetch_buffers ? prefetch_buffers->GetOrCreatePrefetchBuffer( |
416 | 0 | blob_index.file_number()) |
417 | 0 | : nullptr; |
418 | |
|
419 | 0 | uint64_t bytes_read = 0; |
420 | |
|
421 | 0 | assert(blob_fetcher); |
422 | |
|
423 | 0 | PinnableSlice blob_value; |
424 | 0 | s = blob_fetcher->FetchBlob(ikey.user_key, blob_index, prefetch_buffer, |
425 | 0 | &blob_value, &bytes_read); |
426 | 0 | if (!s.ok()) { |
427 | 0 | return s; |
428 | 0 | } |
429 | | |
430 | 0 | if (c_iter_stats) { |
431 | 0 | ++c_iter_stats->num_blobs_read; |
432 | 0 | c_iter_stats->total_blob_bytes_read += bytes_read; |
433 | 0 | } |
434 | |
|
435 | 0 | s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue, |
436 | 0 | blob_value, merge_context_.GetOperands(), logger_, |
437 | 0 | stats_, clock_, /* update_num_ops_stats */ false, |
438 | 0 | &op_failure_scope, &merge_result, |
439 | 0 | /* result_operand */ nullptr, &merge_result_type); |
440 | 0 | } else if (ikey.type == kTypeWideColumnEntity) { |
441 | 0 | s = TimedFullMerge(user_merge_operator_, ikey.user_key, kWideBaseValue, |
442 | 0 | iter->value(), merge_context_.GetOperands(), logger_, |
443 | 0 | stats_, clock_, /* update_num_ops_stats */ false, |
444 | 0 | &op_failure_scope, &merge_result, |
445 | 0 | /* result_operand */ nullptr, &merge_result_type); |
446 | 0 | } else { |
447 | 0 | s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue, |
448 | 0 | merge_context_.GetOperands(), logger_, stats_, |
449 | 0 | clock_, /* update_num_ops_stats */ false, |
450 | 0 | &op_failure_scope, &merge_result, |
451 | 0 | /* result_operand */ nullptr, &merge_result_type); |
452 | 0 | } |
453 | | |
454 | | // We store the result in keys_.back() and operands_.back() |
455 | | // if nothing went wrong (i.e.: no operand corruption on disk) |
456 | 0 | if (s.ok()) { |
457 | | // The original key encountered |
458 | 0 | original_key = std::move(keys_.back()); |
459 | |
|
460 | 0 | assert(merge_result_type == kTypeValue || |
461 | 0 | merge_result_type == kTypeWideColumnEntity); |
462 | 0 | orig_ikey.type = merge_result_type; |
463 | 0 | UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); |
464 | |
|
465 | 0 | keys_.clear(); |
466 | 0 | merge_context_.Clear(); |
467 | 0 | keys_.emplace_front(std::move(original_key)); |
468 | 0 | merge_context_.PushOperand(merge_result); |
469 | | |
470 | | // move iter to the next entry |
471 | 0 | iter->Next(); |
472 | 0 | } else if (op_failure_scope == |
473 | 0 | MergeOperator::OpFailureScope::kMustMerge) { |
474 | | // Change to `Status::MergeInProgress()` to denote output consists of |
475 | | // merge operands only. Leave `iter` at the non-merge entry so it will |
476 | | // be output after. |
477 | 0 | s = Status::MergeInProgress(); |
478 | 0 | } |
479 | 0 | return s; |
480 | 0 | } else { |
481 | | // hit a merge |
482 | | // => if there is a compaction filter, apply it. |
483 | | // => check for range tombstones covering the operand |
484 | | // => merge the operand into the front of the operands_ list |
485 | | // if not filtered |
486 | | // => then continue because we haven't yet seen a Put/Delete. |
487 | | // |
488 | | // Keep queuing keys and operands until we either meet a put / delete |
489 | | // request or later did a partial merge. |
490 | |
|
491 | 0 | Slice value_slice = iter->value(); |
492 | | // add an operand to the list if: |
493 | | // 1) it's included in one of the snapshots. in that case we *must* write |
494 | | // it out, no matter what compaction filter says |
495 | | // 2) it's not filtered by a compaction filter |
496 | 0 | CompactionFilter::Decision filter = |
497 | 0 | ikey.sequence <= latest_snapshot_ |
498 | 0 | ? CompactionFilter::Decision::kKeep |
499 | 0 | : FilterMerge(orig_ikey.user_key, value_slice); |
500 | 0 | if (filter != CompactionFilter::Decision::kRemoveAndSkipUntil && |
501 | 0 | range_del_agg != nullptr && |
502 | 0 | range_del_agg->ShouldDelete( |
503 | 0 | iter->key(), RangeDelPositioningMode::kForwardTraversal)) { |
504 | 0 | filter = CompactionFilter::Decision::kRemove; |
505 | 0 | } |
506 | 0 | if (filter == CompactionFilter::Decision::kKeep || |
507 | 0 | filter == CompactionFilter::Decision::kChangeValue) { |
508 | 0 | if (original_key_is_iter) { |
509 | | // this is just an optimization that saves us one memcpy |
510 | 0 | keys_.emplace_front(original_key); |
511 | 0 | } else { |
512 | 0 | keys_.emplace_front(iter->key().ToString()); |
513 | 0 | } |
514 | 0 | if (keys_.size() == 1) { |
515 | | // we need to re-anchor the orig_ikey because it was anchored by |
516 | | // original_key before |
517 | 0 | pik_status = |
518 | 0 | ParseInternalKey(keys_.back(), &orig_ikey, allow_data_in_errors); |
519 | 0 | pik_status.PermitUncheckedError(); |
520 | 0 | assert(pik_status.ok()); |
521 | 0 | } |
522 | 0 | if (filter == CompactionFilter::Decision::kKeep) { |
523 | 0 | merge_context_.PushOperand( |
524 | 0 | value_slice, iter->IsValuePinned() /* operand_pinned */); |
525 | 0 | } else { |
526 | 0 | assert(filter == CompactionFilter::Decision::kChangeValue); |
527 | | // Compaction filter asked us to change the operand from value_slice |
528 | | // to compaction_filter_value_. |
529 | 0 | merge_context_.PushOperand(compaction_filter_value_, false); |
530 | 0 | } |
531 | 0 | } else if (filter == CompactionFilter::Decision::kRemoveAndSkipUntil) { |
532 | | // Compaction filter asked us to remove this key altogether |
533 | | // (not just this operand), along with some keys following it. |
534 | 0 | keys_.clear(); |
535 | 0 | merge_context_.Clear(); |
536 | 0 | has_compaction_filter_skip_until_ = true; |
537 | 0 | return s; |
538 | 0 | } |
539 | 0 | } |
540 | 0 | } |
541 | | |
542 | 0 | if (cmp_with_full_history_ts_low >= 0) { |
543 | 0 | size_t num_merge_operands = merge_context_.GetNumOperands(); |
544 | 0 | if (ts_sz && num_merge_operands > 1) { |
545 | | // We do not merge merge operands with different timestamps if they are |
546 | | // not eligible for GC. |
547 | 0 | ROCKS_LOG_ERROR(logger_, "ts_sz=%d, %d merge oprands", |
548 | 0 | static_cast<int>(ts_sz), |
549 | 0 | static_cast<int>(num_merge_operands)); |
550 | 0 | assert(false); |
551 | 0 | } |
552 | 0 | } |
553 | |
|
554 | 0 | if (merge_context_.GetNumOperands() == 0) { |
555 | | // we filtered out all the merge operands |
556 | 0 | return s; |
557 | 0 | } |
558 | | |
559 | | // We are sure we have seen this key's entire history if: |
560 | | // at_bottom == true (this does not necessarily mean it is the bottommost |
561 | | // layer, but rather that we are confident the key does not appear on any of |
562 | | // the lower layers, at_bottom == false doesn't mean it does appear, just |
563 | | // that we can't be sure, see Compaction::IsBottommostLevel for details) |
564 | | // AND |
565 | | // we have either encountered another key or end of key history on this |
566 | | // layer. |
567 | | // Note that if user-defined timestamp is enabled, we need some extra caution |
568 | | // here: if full_history_ts_low is nullptr, or it's not null but the key's |
569 | | // timestamp is greater than or equal to full_history_ts_low, it means this |
570 | | // key cannot be dropped. We may not have seen the beginning of the key. |
571 | | // |
572 | | // When these conditions are true we are able to merge all the keys |
573 | | // using full merge. |
574 | | // |
575 | | // For these cases we are not sure about, we simply miss the opportunity |
576 | | // to combine the keys. Since VersionSet::SetupOtherInputs() always makes |
577 | | // sure that all merge-operands on the same level get compacted together, |
578 | | // this will simply lead to these merge operands moving to the next level. |
579 | 0 | bool surely_seen_the_beginning = |
580 | 0 | (hit_the_next_user_key || !iter->Valid()) && at_bottom && |
581 | 0 | (ts_sz == 0 || cmp_with_full_history_ts_low < 0); |
582 | 0 | if (surely_seen_the_beginning) { |
583 | | // do a final merge with nullptr as the existing value and say |
584 | | // bye to the merge type (it's now converted to a Put) |
585 | 0 | assert(kTypeMerge == orig_ikey.type); |
586 | 0 | assert(merge_context_.GetNumOperands() >= 1); |
587 | 0 | assert(merge_context_.GetNumOperands() == keys_.size()); |
588 | 0 | std::string merge_result; |
589 | 0 | ValueType merge_result_type; |
590 | 0 | MergeOperator::OpFailureScope op_failure_scope; |
591 | 0 | s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, kNoBaseValue, |
592 | 0 | merge_context_.GetOperands(), logger_, stats_, clock_, |
593 | 0 | /* update_num_ops_stats */ false, &op_failure_scope, |
594 | 0 | &merge_result, |
595 | 0 | /* result_operand */ nullptr, &merge_result_type); |
596 | 0 | if (s.ok()) { |
597 | | // The original key encountered |
598 | | // We are certain that keys_ is not empty here (see assertions couple of |
599 | | // lines before). |
600 | 0 | original_key = std::move(keys_.back()); |
601 | |
|
602 | 0 | assert(merge_result_type == kTypeValue || |
603 | 0 | merge_result_type == kTypeWideColumnEntity); |
604 | 0 | orig_ikey.type = merge_result_type; |
605 | 0 | UpdateInternalKey(&original_key, orig_ikey.sequence, orig_ikey.type); |
606 | |
|
607 | 0 | keys_.clear(); |
608 | 0 | merge_context_.Clear(); |
609 | 0 | keys_.emplace_front(std::move(original_key)); |
610 | 0 | merge_context_.PushOperand(merge_result); |
611 | 0 | } else if (op_failure_scope == MergeOperator::OpFailureScope::kMustMerge) { |
612 | | // Change to `Status::MergeInProgress()` to denote output consists of |
613 | | // merge operands only. |
614 | 0 | s = Status::MergeInProgress(); |
615 | 0 | } |
616 | 0 | } else { |
617 | | // We haven't seen the beginning of the key nor a Put/Delete. |
618 | | // Attempt to use the user's associative merge function to |
619 | | // merge the stacked merge operands into a single operand. |
620 | 0 | s = Status::MergeInProgress(); |
621 | 0 | if (merge_context_.GetNumOperands() >= 2 || |
622 | 0 | (allow_single_operand_ && merge_context_.GetNumOperands() == 1)) { |
623 | 0 | bool merge_success = false; |
624 | 0 | std::string merge_result; |
625 | 0 | { |
626 | 0 | StopWatchNano timer(clock_, stats_ != nullptr); |
627 | 0 | PERF_TIMER_GUARD(merge_operator_time_nanos); |
628 | 0 | merge_success = user_merge_operator_->PartialMergeMulti( |
629 | 0 | orig_ikey.user_key, |
630 | 0 | std::deque<Slice>(merge_context_.GetOperands().begin(), |
631 | 0 | merge_context_.GetOperands().end()), |
632 | 0 | &merge_result, logger_); |
633 | 0 | RecordTick(stats_, MERGE_OPERATION_TOTAL_TIME, |
634 | 0 | stats_ ? timer.ElapsedNanosSafe() : 0); |
635 | 0 | } |
636 | 0 | if (merge_success) { |
637 | | // Merging of operands (associative merge) was successful. |
638 | | // Replace operands with the merge result |
639 | 0 | merge_context_.Clear(); |
640 | 0 | merge_context_.PushOperand(merge_result); |
641 | 0 | keys_.erase(keys_.begin(), keys_.end() - 1); |
642 | 0 | } |
643 | 0 | } |
644 | 0 | } |
645 | |
|
646 | 0 | return s; |
647 | 0 | } |
648 | | |
649 | | MergeOutputIterator::MergeOutputIterator(const MergeHelper* merge_helper) |
650 | 7.15k | : merge_helper_(merge_helper) { |
651 | 7.15k | it_keys_ = merge_helper_->keys().rend(); |
652 | 7.15k | it_values_ = merge_helper_->values().rend(); |
653 | 7.15k | } |
654 | | |
655 | 0 | void MergeOutputIterator::SeekToFirst() { |
656 | 0 | const auto& keys = merge_helper_->keys(); |
657 | 0 | const auto& values = merge_helper_->values(); |
658 | 0 | assert(keys.size() == values.size()); |
659 | 0 | it_keys_ = keys.rbegin(); |
660 | 0 | it_values_ = values.rbegin(); |
661 | 0 | } |
662 | | |
663 | 0 | void MergeOutputIterator::Next() { |
664 | 0 | ++it_keys_; |
665 | 0 | ++it_values_; |
666 | 0 | } |
667 | | |
668 | | CompactionFilter::Decision MergeHelper::FilterMerge(const Slice& user_key, |
669 | 0 | const Slice& value_slice) { |
670 | 0 | if (compaction_filter_ == nullptr) { |
671 | 0 | return CompactionFilter::Decision::kKeep; |
672 | 0 | } |
673 | 0 | if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) { |
674 | 0 | filter_timer_.Start(); |
675 | 0 | } |
676 | 0 | compaction_filter_value_.clear(); |
677 | 0 | compaction_filter_skip_until_.Clear(); |
678 | 0 | auto ret = compaction_filter_->FilterV3( |
679 | 0 | level_, user_key, CompactionFilter::ValueType::kMergeOperand, |
680 | 0 | &value_slice, /* existing_columns */ nullptr, &compaction_filter_value_, |
681 | 0 | /* new_columns */ nullptr, compaction_filter_skip_until_.rep()); |
682 | 0 | if (ret == CompactionFilter::Decision::kRemoveAndSkipUntil) { |
683 | 0 | if (user_comparator_->Compare(*compaction_filter_skip_until_.rep(), |
684 | 0 | user_key) <= 0) { |
685 | | // Invalid skip_until returned from compaction filter. |
686 | | // Keep the key as per FilterV2/FilterV3 documentation. |
687 | 0 | ret = CompactionFilter::Decision::kKeep; |
688 | 0 | } else { |
689 | 0 | compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, |
690 | 0 | kValueTypeForSeek); |
691 | 0 | } |
692 | 0 | } |
693 | 0 | if (stats_ != nullptr && ShouldReportDetailedTime(env_, stats_)) { |
694 | 0 | total_filter_time_ += filter_timer_.ElapsedNanosSafe(); |
695 | 0 | } |
696 | 0 | return ret; |
697 | 0 | } |
698 | | |
699 | | } // namespace ROCKSDB_NAMESPACE |