/src/rocksdb/db/compaction/compaction_iterator.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/compaction/compaction_iterator.h" |
7 | | |
8 | | #include <iterator> |
9 | | #include <limits> |
10 | | |
11 | | #include "db/blob/blob_fetcher.h" |
12 | | #include "db/blob/blob_file_builder.h" |
13 | | #include "db/blob/blob_index.h" |
14 | | #include "db/blob/prefetch_buffer_collection.h" |
15 | | #include "db/snapshot_checker.h" |
16 | | #include "db/wide/wide_column_serialization.h" |
17 | | #include "db/wide/wide_columns_helper.h" |
18 | | #include "logging/logging.h" |
19 | | #include "port/likely.h" |
20 | | #include "rocksdb/listener.h" |
21 | | #include "table/internal_iterator.h" |
22 | | #include "test_util/sync_point.h" |
23 | | |
24 | | namespace ROCKSDB_NAMESPACE { |
25 | | CompactionIterator::CompactionIterator( |
26 | | InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, |
27 | | SequenceNumber last_sequence, std::vector<SequenceNumber>* snapshots, |
28 | | SequenceNumber earliest_snapshot, |
29 | | SequenceNumber earliest_write_conflict_snapshot, |
30 | | SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, |
31 | | Env* env, bool report_detailed_time, |
32 | | CompactionRangeDelAggregator* range_del_agg, |
33 | | BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, |
34 | | bool enforce_single_del_contracts, |
35 | | const std::atomic<bool>& manual_compaction_canceled, |
36 | | bool must_count_input_entries, const Compaction* compaction, |
37 | | const CompactionFilter* compaction_filter, |
38 | | const std::atomic<bool>* shutting_down, |
39 | | const std::shared_ptr<Logger> info_log, |
40 | | const std::string* full_history_ts_low, |
41 | | std::optional<SequenceNumber> preserve_seqno_min) |
42 | 22.3k | : CompactionIterator( |
43 | 22.3k | input, cmp, merge_helper, last_sequence, snapshots, earliest_snapshot, |
44 | 22.3k | earliest_write_conflict_snapshot, job_snapshot, snapshot_checker, env, |
45 | 22.3k | report_detailed_time, range_del_agg, blob_file_builder, |
46 | 22.3k | allow_data_in_errors, enforce_single_del_contracts, |
47 | 22.3k | manual_compaction_canceled, |
48 | 22.3k | compaction ? std::make_unique<RealCompaction>(compaction) : nullptr, |
49 | 22.3k | must_count_input_entries, compaction_filter, shutting_down, info_log, |
50 | 22.3k | full_history_ts_low, preserve_seqno_min) {} |
51 | | |
52 | | CompactionIterator::CompactionIterator( |
53 | | InternalIterator* input, const Comparator* cmp, MergeHelper* merge_helper, |
54 | | SequenceNumber /*last_sequence*/, std::vector<SequenceNumber>* snapshots, |
55 | | SequenceNumber earliest_snapshot, |
56 | | SequenceNumber earliest_write_conflict_snapshot, |
57 | | SequenceNumber job_snapshot, const SnapshotChecker* snapshot_checker, |
58 | | Env* env, bool report_detailed_time, |
59 | | CompactionRangeDelAggregator* range_del_agg, |
60 | | BlobFileBuilder* blob_file_builder, bool allow_data_in_errors, |
61 | | bool enforce_single_del_contracts, |
62 | | const std::atomic<bool>& manual_compaction_canceled, |
63 | | std::unique_ptr<CompactionProxy> compaction, bool must_count_input_entries, |
64 | | const CompactionFilter* compaction_filter, |
65 | | const std::atomic<bool>* shutting_down, |
66 | | const std::shared_ptr<Logger> info_log, |
67 | | const std::string* full_history_ts_low, |
68 | | std::optional<SequenceNumber> preserve_seqno_min) |
69 | 22.3k | : input_(input, cmp, must_count_input_entries), |
70 | 22.3k | cmp_(cmp), |
71 | 22.3k | merge_helper_(merge_helper), |
72 | 22.3k | snapshots_(snapshots), |
73 | 22.3k | earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot), |
74 | 22.3k | job_snapshot_(job_snapshot), |
75 | 22.3k | snapshot_checker_(snapshot_checker), |
76 | 22.3k | env_(env), |
77 | 22.3k | clock_(env_->GetSystemClock().get()), |
78 | 22.3k | report_detailed_time_(report_detailed_time), |
79 | 22.3k | range_del_agg_(range_del_agg), |
80 | 22.3k | blob_file_builder_(blob_file_builder), |
81 | 22.3k | compaction_(std::move(compaction)), |
82 | 22.3k | compaction_filter_(compaction_filter), |
83 | 22.3k | shutting_down_(shutting_down), |
84 | 22.3k | manual_compaction_canceled_(manual_compaction_canceled), |
85 | 22.3k | bottommost_level_(compaction_ && compaction_->bottommost_level() && |
86 | 3.68k | !compaction_->allow_ingest_behind()), |
87 | | // snapshots_ cannot be nullptr, but we will assert later in the body of |
88 | | // the constructor. |
89 | 22.3k | visible_at_tip_(snapshots_ ? snapshots_->empty() : false), |
90 | 22.3k | earliest_snapshot_(earliest_snapshot), |
91 | 22.3k | info_log_(info_log), |
92 | 22.3k | allow_data_in_errors_(allow_data_in_errors), |
93 | 22.3k | enforce_single_del_contracts_(enforce_single_del_contracts), |
94 | 22.3k | timestamp_size_(cmp_ ? cmp_->timestamp_size() : 0), |
95 | 22.3k | full_history_ts_low_(full_history_ts_low), |
96 | 22.3k | current_user_key_sequence_(0), |
97 | 22.3k | current_user_key_snapshot_(0), |
98 | 22.3k | merge_out_iter_(merge_helper_), |
99 | | blob_garbage_collection_cutoff_file_number_( |
100 | 22.3k | ComputeBlobGarbageCollectionCutoffFileNumber(compaction_.get())), |
101 | 22.3k | blob_fetcher_(CreateBlobFetcherIfNeeded(compaction_.get())), |
102 | | prefetch_buffers_( |
103 | 22.3k | CreatePrefetchBufferCollectionIfNeeded(compaction_.get())), |
104 | 22.3k | current_key_committed_(false), |
105 | 22.3k | cmp_with_history_ts_low_(0), |
106 | 22.3k | level_(compaction_ == nullptr ? 0 : compaction_->level()), |
107 | 22.3k | preserve_seqno_after_(preserve_seqno_min.value_or(earliest_snapshot)) { |
108 | 22.3k | assert(snapshots_ != nullptr); |
109 | 22.3k | assert(preserve_seqno_after_ <= earliest_snapshot_); |
110 | | |
111 | 22.3k | if (compaction_ != nullptr) { |
112 | 3.68k | level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); |
113 | 3.68k | } |
114 | | #ifndef NDEBUG |
115 | | // findEarliestVisibleSnapshot assumes this ordering. |
116 | | for (size_t i = 1; i < snapshots_->size(); ++i) { |
117 | | assert(snapshots_->at(i - 1) < snapshots_->at(i)); |
118 | | } |
119 | | assert(timestamp_size_ == 0 || !full_history_ts_low_ || |
120 | | timestamp_size_ == full_history_ts_low_->size()); |
121 | | #endif |
122 | 22.3k | input_.SetPinnedItersMgr(&pinned_iters_mgr_); |
123 | | // The default `merge_until_status_` does not need to be checked since it is |
124 | | // overwritten as soon as `MergeUntil()` is called |
125 | 22.3k | merge_until_status_.PermitUncheckedError(); |
126 | 22.3k | TEST_SYNC_POINT_CALLBACK("CompactionIterator:AfterInit", compaction_.get()); |
127 | 22.3k | } |
128 | | |
129 | 22.3k | CompactionIterator::~CompactionIterator() { |
130 | | // input_ Iterator lifetime is longer than pinned_iters_mgr_ lifetime |
131 | 22.3k | input_.SetPinnedItersMgr(nullptr); |
132 | 22.3k | } |
133 | | |
134 | 3.68k | void CompactionIterator::ResetRecordCounts() { |
135 | 3.68k | iter_stats_.num_record_drop_user = 0; |
136 | 3.68k | iter_stats_.num_record_drop_hidden = 0; |
137 | 3.68k | iter_stats_.num_record_drop_obsolete = 0; |
138 | 3.68k | iter_stats_.num_record_drop_range_del = 0; |
139 | 3.68k | iter_stats_.num_range_del_drop_obsolete = 0; |
140 | 3.68k | iter_stats_.num_optimized_del_drop_obsolete = 0; |
141 | 3.68k | } |
142 | | |
143 | 22.3k | void CompactionIterator::SeekToFirst() { |
144 | 22.3k | NextFromInput(); |
145 | 22.3k | PrepareOutput(); |
146 | 22.3k | } |
147 | | |
148 | 55.0k | void CompactionIterator::Next() { |
149 | | // If there is a merge output, return it before continuing to process the |
150 | | // input. |
151 | 55.0k | if (merge_out_iter_.Valid()) { |
152 | 0 | merge_out_iter_.Next(); |
153 | | |
154 | | // Check if we returned all records of the merge output. |
155 | 0 | if (merge_out_iter_.Valid()) { |
156 | 0 | key_ = merge_out_iter_.key(); |
157 | 0 | value_ = merge_out_iter_.value(); |
158 | 0 | Status s = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); |
159 | | // MergeUntil stops when it encounters a corrupt key and does not |
160 | | // include them in the result, so we expect the keys here to be valid. |
161 | 0 | if (!s.ok()) { |
162 | | // FIXME: should fail compaction after this fatal logging. |
163 | 0 | ROCKS_LOG_FATAL( |
164 | 0 | info_log_, "Invalid ikey %s in compaction. %s", |
165 | 0 | allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden", |
166 | 0 | s.getState()); |
167 | 0 | assert(false); |
168 | 0 | } |
169 | | |
170 | | // Keep current_key_ in sync. |
171 | 0 | if (0 == timestamp_size_) { |
172 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
173 | 0 | } else { |
174 | 0 | Slice ts = ikey_.GetTimestamp(timestamp_size_); |
175 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type, &ts); |
176 | 0 | } |
177 | 0 | key_ = current_key_.GetInternalKey(); |
178 | 0 | ikey_.user_key = current_key_.GetUserKey(); |
179 | 0 | validity_info_.SetValid(ValidContext::kMerge1); |
180 | 0 | } else { |
181 | 0 | if (merge_until_status_.IsMergeInProgress()) { |
182 | | // `Status::MergeInProgress()` tells us that the previous `MergeUntil()` |
183 | | // produced only merge operands. Those merge operands were accessed and |
184 | | // written out using `merge_out_iter_`. Since `merge_out_iter_` is |
185 | | // exhausted at this point, all merge operands have been written out. |
186 | | // |
187 | | // Still, there may be a base value (PUT, DELETE, SINGLEDEL, etc.) that |
188 | | // needs to be written out. Normally, `CompactionIterator` would skip it |
189 | | // on the basis that it has already output something in the same |
190 | | // snapshot stripe. To prevent this, we reset `has_current_user_key_` to |
191 | | // trick the future iteration from finding out the snapshot stripe is |
192 | | // unchanged. |
193 | 0 | has_current_user_key_ = false; |
194 | 0 | } |
195 | | // We consumed all pinned merge operands, release pinned iterators |
196 | 0 | pinned_iters_mgr_.ReleasePinnedData(); |
197 | | // MergeHelper moves the iterator to the first record after the merged |
198 | | // records, so even though we reached the end of the merge output, we do |
199 | | // not want to advance the iterator. |
200 | 0 | NextFromInput(); |
201 | 0 | } |
202 | 55.0k | } else { |
203 | | // Only advance the input iterator if there is no merge output and the |
204 | | // iterator is not already at the next record. |
205 | 55.0k | if (!at_next_) { |
206 | 55.0k | AdvanceInputIter(); |
207 | 55.0k | } |
208 | 55.0k | NextFromInput(); |
209 | 55.0k | } |
210 | | |
211 | 55.0k | if (Valid()) { |
212 | | // Record that we've outputted a record for the current key. |
213 | 35.6k | has_outputted_key_ = true; |
214 | 35.6k | } |
215 | | |
216 | 55.0k | PrepareOutput(); |
217 | 55.0k | } |
218 | | |
219 | | bool CompactionIterator::InvokeFilterIfNeeded(bool* need_skip, |
220 | 113k | Slice* skip_until) { |
221 | 113k | if (!compaction_filter_) { |
222 | 113k | return true; |
223 | 113k | } |
224 | | |
225 | 0 | if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && |
226 | 0 | ikey_.type != kTypeWideColumnEntity) { |
227 | 0 | return true; |
228 | 0 | } |
229 | | |
230 | 0 | CompactionFilter::Decision decision = |
231 | 0 | CompactionFilter::Decision::kUndetermined; |
232 | 0 | CompactionFilter::ValueType value_type = |
233 | 0 | ikey_.type == kTypeValue ? CompactionFilter::ValueType::kValue |
234 | 0 | : ikey_.type == kTypeBlobIndex |
235 | 0 | ? CompactionFilter::ValueType::kBlobIndex |
236 | 0 | : CompactionFilter::ValueType::kWideColumnEntity; |
237 | | |
238 | | // Hack: pass internal key to BlobIndexCompactionFilter since it needs |
239 | | // to get sequence number. |
240 | 0 | assert(compaction_filter_); |
241 | 0 | const Slice& filter_key = |
242 | 0 | (ikey_.type != kTypeBlobIndex || |
243 | 0 | !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) |
244 | 0 | ? ikey_.user_key |
245 | 0 | : key_; |
246 | |
|
247 | 0 | compaction_filter_value_.clear(); |
248 | 0 | compaction_filter_skip_until_.Clear(); |
249 | |
|
250 | 0 | std::vector<std::pair<std::string, std::string>> new_columns; |
251 | |
|
252 | 0 | { |
253 | 0 | StopWatchNano timer(clock_, report_detailed_time_); |
254 | |
|
255 | 0 | if (ikey_.type == kTypeBlobIndex) { |
256 | 0 | decision = compaction_filter_->FilterBlobByKey( |
257 | 0 | level_, filter_key, &compaction_filter_value_, |
258 | 0 | compaction_filter_skip_until_.rep()); |
259 | 0 | if (decision == CompactionFilter::Decision::kUndetermined && |
260 | 0 | !compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { |
261 | 0 | if (!compaction_) { |
262 | 0 | status_ = |
263 | 0 | Status::Corruption("Unexpected blob index outside of compaction"); |
264 | 0 | validity_info_.Invalidate(); |
265 | 0 | return false; |
266 | 0 | } |
267 | | |
268 | 0 | TEST_SYNC_POINT_CALLBACK( |
269 | 0 | "CompactionIterator::InvokeFilterIfNeeded::TamperWithBlobIndex", |
270 | 0 | &value_); |
271 | | |
272 | | // For integrated BlobDB impl, CompactionIterator reads blob value. |
273 | | // For Stacked BlobDB impl, the corresponding CompactionFilter's |
274 | | // FilterV2 method should read the blob value. |
275 | 0 | BlobIndex blob_index; |
276 | 0 | Status s = blob_index.DecodeFrom(value_); |
277 | 0 | if (!s.ok()) { |
278 | 0 | status_ = s; |
279 | 0 | validity_info_.Invalidate(); |
280 | 0 | return false; |
281 | 0 | } |
282 | | |
283 | 0 | FilePrefetchBuffer* prefetch_buffer = |
284 | 0 | prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer( |
285 | 0 | blob_index.file_number()) |
286 | 0 | : nullptr; |
287 | |
|
288 | 0 | uint64_t bytes_read = 0; |
289 | |
|
290 | 0 | assert(blob_fetcher_); |
291 | |
|
292 | 0 | s = blob_fetcher_->FetchBlob(ikey_.user_key, blob_index, |
293 | 0 | prefetch_buffer, &blob_value_, |
294 | 0 | &bytes_read); |
295 | 0 | if (!s.ok()) { |
296 | 0 | status_ = s; |
297 | 0 | validity_info_.Invalidate(); |
298 | 0 | return false; |
299 | 0 | } |
300 | | |
301 | 0 | ++iter_stats_.num_blobs_read; |
302 | 0 | iter_stats_.total_blob_bytes_read += bytes_read; |
303 | |
|
304 | 0 | value_type = CompactionFilter::ValueType::kValue; |
305 | 0 | } |
306 | 0 | } |
307 | | |
308 | 0 | if (decision == CompactionFilter::Decision::kUndetermined) { |
309 | 0 | const Slice* existing_val = nullptr; |
310 | 0 | const WideColumns* existing_col = nullptr; |
311 | |
|
312 | 0 | WideColumns existing_columns; |
313 | |
|
314 | 0 | if (ikey_.type != kTypeWideColumnEntity) { |
315 | 0 | if (!blob_value_.empty()) { |
316 | 0 | existing_val = &blob_value_; |
317 | 0 | } else { |
318 | 0 | existing_val = &value_; |
319 | 0 | } |
320 | 0 | } else { |
321 | 0 | Slice value_copy = value_; |
322 | 0 | const Status s = |
323 | 0 | WideColumnSerialization::Deserialize(value_copy, existing_columns); |
324 | |
|
325 | 0 | if (!s.ok()) { |
326 | 0 | status_ = s; |
327 | 0 | validity_info_.Invalidate(); |
328 | 0 | return false; |
329 | 0 | } |
330 | | |
331 | 0 | existing_col = &existing_columns; |
332 | 0 | } |
333 | | |
334 | 0 | decision = compaction_filter_->FilterV3( |
335 | 0 | level_, filter_key, value_type, existing_val, existing_col, |
336 | 0 | &compaction_filter_value_, &new_columns, |
337 | 0 | compaction_filter_skip_until_.rep()); |
338 | 0 | } |
339 | | |
340 | 0 | iter_stats_.total_filter_time += |
341 | 0 | env_ != nullptr && report_detailed_time_ ? timer.ElapsedNanos() : 0; |
342 | 0 | } |
343 | | |
344 | 0 | if (decision == CompactionFilter::Decision::kUndetermined) { |
345 | | // Should not reach here, since FilterV2/FilterV3 should never return |
346 | | // kUndetermined. |
347 | 0 | status_ = Status::NotSupported( |
348 | 0 | "FilterV2/FilterV3 should never return kUndetermined"); |
349 | 0 | validity_info_.Invalidate(); |
350 | 0 | return false; |
351 | 0 | } |
352 | | |
353 | 0 | if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil && |
354 | 0 | cmp_->Compare(*compaction_filter_skip_until_.rep(), ikey_.user_key) <= |
355 | 0 | 0) { |
356 | | // Can't skip to a key smaller than the current one. |
357 | | // Keep the key as per FilterV2/FilterV3 documentation. |
358 | 0 | decision = CompactionFilter::Decision::kKeep; |
359 | 0 | } |
360 | |
|
361 | 0 | if (decision == CompactionFilter::Decision::kRemove) { |
362 | | // convert the current key to a delete; key_ is pointing into |
363 | | // current_key_ at this point, so updating current_key_ updates key() |
364 | 0 | ikey_.type = kTypeDeletion; |
365 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, kTypeDeletion); |
366 | | // no value associated with delete |
367 | 0 | value_.clear(); |
368 | 0 | iter_stats_.num_record_drop_user++; |
369 | 0 | } else if (decision == CompactionFilter::Decision::kPurge) { |
370 | | // convert the current key to a single delete; key_ is pointing into |
371 | | // current_key_ at this point, so updating current_key_ updates key() |
372 | 0 | ikey_.type = kTypeSingleDeletion; |
373 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, kTypeSingleDeletion); |
374 | | // no value associated with single delete |
375 | 0 | value_.clear(); |
376 | 0 | iter_stats_.num_record_drop_user++; |
377 | 0 | } else if (decision == CompactionFilter::Decision::kChangeValue) { |
378 | 0 | if (ikey_.type != kTypeValue) { |
379 | 0 | ikey_.type = kTypeValue; |
380 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, kTypeValue); |
381 | 0 | } |
382 | |
|
383 | 0 | value_ = compaction_filter_value_; |
384 | 0 | } else if (decision == CompactionFilter::Decision::kRemoveAndSkipUntil) { |
385 | 0 | *need_skip = true; |
386 | 0 | compaction_filter_skip_until_.ConvertFromUserKey(kMaxSequenceNumber, |
387 | 0 | kValueTypeForSeek); |
388 | 0 | *skip_until = compaction_filter_skip_until_.Encode(); |
389 | 0 | } else if (decision == CompactionFilter::Decision::kChangeBlobIndex) { |
390 | | // Only the StackableDB-based BlobDB impl's compaction filter should return |
391 | | // kChangeBlobIndex. Decision about rewriting blob and changing blob index |
392 | | // in the integrated BlobDB impl is made in subsequent call to |
393 | | // PrepareOutput() and its callees. |
394 | 0 | if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { |
395 | 0 | status_ = Status::NotSupported( |
396 | 0 | "Only stacked BlobDB's internal compaction filter can return " |
397 | 0 | "kChangeBlobIndex."); |
398 | 0 | validity_info_.Invalidate(); |
399 | 0 | return false; |
400 | 0 | } |
401 | | |
402 | 0 | if (ikey_.type != kTypeBlobIndex) { |
403 | 0 | ikey_.type = kTypeBlobIndex; |
404 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, kTypeBlobIndex); |
405 | 0 | } |
406 | |
|
407 | 0 | value_ = compaction_filter_value_; |
408 | 0 | } else if (decision == CompactionFilter::Decision::kIOError) { |
409 | 0 | if (!compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { |
410 | 0 | status_ = Status::NotSupported( |
411 | 0 | "CompactionFilter for integrated BlobDB should not return kIOError"); |
412 | 0 | validity_info_.Invalidate(); |
413 | 0 | return false; |
414 | 0 | } |
415 | | |
416 | 0 | status_ = Status::IOError("Failed to access blob during compaction filter"); |
417 | 0 | validity_info_.Invalidate(); |
418 | 0 | return false; |
419 | 0 | } else if (decision == CompactionFilter::Decision::kChangeWideColumnEntity) { |
420 | 0 | WideColumns sorted_columns; |
421 | 0 | sorted_columns.reserve(new_columns.size()); |
422 | |
|
423 | 0 | for (const auto& column : new_columns) { |
424 | 0 | sorted_columns.emplace_back(column.first, column.second); |
425 | 0 | } |
426 | |
|
427 | 0 | WideColumnsHelper::SortColumns(sorted_columns); |
428 | |
|
429 | 0 | { |
430 | 0 | const Status s = WideColumnSerialization::Serialize( |
431 | 0 | sorted_columns, compaction_filter_value_); |
432 | 0 | if (!s.ok()) { |
433 | 0 | status_ = s; |
434 | 0 | validity_info_.Invalidate(); |
435 | 0 | return false; |
436 | 0 | } |
437 | 0 | } |
438 | | |
439 | 0 | if (ikey_.type != kTypeWideColumnEntity) { |
440 | 0 | ikey_.type = kTypeWideColumnEntity; |
441 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, kTypeWideColumnEntity); |
442 | 0 | } |
443 | |
|
444 | 0 | value_ = compaction_filter_value_; |
445 | 0 | } |
446 | | |
447 | 0 | return true; |
448 | 0 | } |
449 | | |
450 | 77.4k | void CompactionIterator::NextFromInput() { |
451 | 77.4k | at_next_ = false; |
452 | 77.4k | validity_info_.Invalidate(); |
453 | | |
454 | 600k | while (!Valid() && input_.Valid() && !IsPausingManualCompaction() && |
455 | 524k | !IsShuttingDown()) { |
456 | 523k | key_ = input_.key(); |
457 | 523k | value_ = input_.value(); |
458 | 523k | blob_value_.Reset(); |
459 | 523k | iter_stats_.num_input_records++; |
460 | 523k | is_range_del_ = input_.IsDeleteRangeSentinelKey(); |
461 | | |
462 | 523k | Status pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); |
463 | 523k | if (!pik_status.ok()) { |
464 | 0 | iter_stats_.num_input_corrupt_records++; |
465 | | |
466 | | // Always fail compaction when encountering corrupted internal keys |
467 | 0 | status_ = pik_status; |
468 | 0 | return; |
469 | 0 | } |
470 | 523k | TEST_SYNC_POINT_CALLBACK("CompactionIterator:ProcessKV", &ikey_); |
471 | 523k | if (is_range_del_) { |
472 | 0 | validity_info_.SetValid(kRangeDeletion); |
473 | 0 | break; |
474 | 0 | } |
475 | | // Update input statistics |
476 | 523k | if (ikey_.type == kTypeDeletion || ikey_.type == kTypeSingleDeletion || |
477 | 397k | ikey_.type == kTypeDeletionWithTimestamp) { |
478 | 125k | iter_stats_.num_input_deletion_records++; |
479 | 397k | } else if (ikey_.type == kTypeValuePreferredSeqno) { |
480 | 0 | iter_stats_.num_input_timed_put_records++; |
481 | 0 | } |
482 | 523k | iter_stats_.total_input_raw_key_bytes += key_.size(); |
483 | 523k | iter_stats_.total_input_raw_value_bytes += value_.size(); |
484 | | |
485 | | // If need_skip is true, we should seek the input iterator |
486 | | // to internal key skip_until and continue from there. |
487 | 523k | bool need_skip = false; |
488 | | // Points either into compaction_filter_skip_until_ or into |
489 | | // merge_helper_->compaction_filter_skip_until_. |
490 | 523k | Slice skip_until; |
491 | | |
492 | 523k | bool user_key_equal_without_ts = false; |
493 | 523k | int cmp_ts = 0; |
494 | 523k | if (has_current_user_key_) { |
495 | 501k | user_key_equal_without_ts = |
496 | 501k | cmp_->EqualWithoutTimestamp(ikey_.user_key, current_user_key_); |
497 | | // if timestamp_size_ > 0, then curr_ts_ has been initialized by a |
498 | | // previous key. |
499 | 501k | cmp_ts = timestamp_size_ ? cmp_->CompareTimestamp( |
500 | 0 | ExtractTimestampFromUserKey( |
501 | 0 | ikey_.user_key, timestamp_size_), |
502 | 0 | curr_ts_) |
503 | 501k | : 0; |
504 | 501k | } |
505 | | |
506 | | // Check whether the user key changed. After this if statement current_key_ |
507 | | // is a copy of the current input key (maybe converted to a delete by the |
508 | | // compaction filter). ikey_.user_key is pointing to the copy. |
509 | 523k | if (!has_current_user_key_ || !user_key_equal_without_ts || cmp_ts != 0) { |
510 | | // First occurrence of this user key |
511 | | // Copy key for output |
512 | 113k | key_ = current_key_.SetInternalKey(key_, &ikey_); |
513 | | |
514 | 113k | int prev_cmp_with_ts_low = |
515 | 113k | !full_history_ts_low_ ? 0 |
516 | 113k | : curr_ts_.empty() |
517 | 0 | ? 0 |
518 | 0 | : cmp_->CompareTimestamp(curr_ts_, *full_history_ts_low_); |
519 | | |
520 | | // If timestamp_size_ > 0, then copy from ikey_ to curr_ts_ for the use |
521 | | // in next iteration to compare with the timestamp of next key. |
522 | 113k | UpdateTimestampAndCompareWithFullHistoryLow(); |
523 | | |
524 | | // If |
525 | | // (1) !has_current_user_key_, OR |
526 | | // (2) timestamp is disabled, OR |
527 | | // (3) all history will be preserved, OR |
528 | | // (4) user key (excluding timestamp) is different from previous key, OR |
529 | | // (5) timestamp is NO older than *full_history_ts_low_, OR |
530 | | // (6) timestamp is the largest one older than full_history_ts_low_, |
531 | | // then current_user_key_ must be treated as a different user key. |
532 | | // This means, if a user key (excluding ts) is the same as the previous |
533 | | // user key, and its ts is older than *full_history_ts_low_, then we |
534 | | // consider this key for GC, e.g. it may be dropped if certain conditions |
535 | | // match. |
536 | 113k | if (!has_current_user_key_ || !timestamp_size_ || !full_history_ts_low_ || |
537 | 0 | !user_key_equal_without_ts || cmp_with_history_ts_low_ >= 0 || |
538 | 113k | prev_cmp_with_ts_low >= 0) { |
539 | | // Initialize for future comparison for rule (A) and etc. |
540 | 113k | current_user_key_sequence_ = kMaxSequenceNumber; |
541 | 113k | current_user_key_snapshot_ = 0; |
542 | 113k | has_current_user_key_ = true; |
543 | 113k | } |
544 | 113k | current_user_key_ = ikey_.user_key; |
545 | | |
546 | 113k | has_outputted_key_ = false; |
547 | | |
548 | 113k | last_key_seq_zeroed_ = false; |
549 | | |
550 | 113k | current_key_committed_ = KeyCommitted(ikey_.sequence); |
551 | | |
552 | | // Apply the compaction filter to the first committed version of the user |
553 | | // key. |
554 | 113k | if (current_key_committed_ && |
555 | 113k | !InvokeFilterIfNeeded(&need_skip, &skip_until)) { |
556 | 0 | break; |
557 | 0 | } |
558 | 409k | } else { |
559 | | // Update the current key to reflect the new sequence number/type without |
560 | | // copying the user key. |
561 | | // TODO(rven): Compaction filter does not process keys in this path |
562 | | // Need to have the compaction filter process multiple versions |
563 | | // if we have versions on both sides of a snapshot |
564 | 409k | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
565 | 409k | key_ = current_key_.GetInternalKey(); |
566 | 409k | ikey_.user_key = current_key_.GetUserKey(); |
567 | | |
568 | | // Note that newer version of a key is ordered before older versions. If a |
569 | | // newer version of a key is committed, so as the older version. No need |
570 | | // to query snapshot_checker_ in that case. |
571 | 409k | if (UNLIKELY(!current_key_committed_)) { |
572 | 0 | assert(snapshot_checker_ != nullptr); |
573 | 0 | current_key_committed_ = KeyCommitted(ikey_.sequence); |
574 | | // Apply the compaction filter to the first committed version of the |
575 | | // user key. |
576 | 0 | if (current_key_committed_ && |
577 | 0 | !InvokeFilterIfNeeded(&need_skip, &skip_until)) { |
578 | 0 | break; |
579 | 0 | } |
580 | 0 | } |
581 | 409k | } |
582 | | |
583 | 523k | if (UNLIKELY(!current_key_committed_)) { |
584 | 0 | assert(snapshot_checker_ != nullptr); |
585 | 0 | validity_info_.SetValid(ValidContext::kCurrentKeyUncommitted); |
586 | 0 | break; |
587 | 0 | } |
588 | | |
589 | | // If there are no snapshots, then this kv affect visibility at tip. |
590 | | // Otherwise, search though all existing snapshots to find the earliest |
591 | | // snapshot that is affected by this kv. |
592 | 523k | SequenceNumber last_sequence = current_user_key_sequence_; |
593 | 523k | current_user_key_sequence_ = ikey_.sequence; |
594 | 523k | SequenceNumber last_snapshot = current_user_key_snapshot_; |
595 | 523k | SequenceNumber prev_snapshot = 0; // 0 means no previous snapshot |
596 | 523k | current_user_key_snapshot_ = |
597 | 523k | visible_at_tip_ |
598 | 523k | ? earliest_snapshot_ |
599 | 523k | : findEarliestVisibleSnapshot(ikey_.sequence, &prev_snapshot); |
600 | | |
601 | 523k | if (need_skip) { |
602 | | // This case is handled below. |
603 | 523k | } else if (clear_and_output_next_key_) { |
604 | | // In the previous iteration we encountered a single delete that we could |
605 | | // not compact out. We will keep this Put, but can drop it's data. |
606 | | // (See Optimization 3, below.) |
607 | 0 | if (ikey_.type != kTypeValue && ikey_.type != kTypeBlobIndex && |
608 | 0 | ikey_.type != kTypeWideColumnEntity && |
609 | 0 | ikey_.type != kTypeValuePreferredSeqno) { |
610 | 0 | ROCKS_LOG_FATAL(info_log_, "Unexpected key %s for compaction output", |
611 | 0 | ikey_.DebugString(allow_data_in_errors_, true).c_str()); |
612 | 0 | assert(false); |
613 | 0 | } |
614 | 0 | if (current_user_key_snapshot_ < last_snapshot) { |
615 | 0 | ROCKS_LOG_FATAL(info_log_, |
616 | 0 | "key %s, current_user_key_snapshot_ (%" PRIu64 |
617 | 0 | ") < last_snapshot (%" PRIu64 ")", |
618 | 0 | ikey_.DebugString(allow_data_in_errors_, true).c_str(), |
619 | 0 | current_user_key_snapshot_, last_snapshot); |
620 | 0 | assert(false); |
621 | 0 | } |
622 | |
|
623 | 0 | if (ikey_.type == kTypeBlobIndex || ikey_.type == kTypeWideColumnEntity || |
624 | 0 | ikey_.type == kTypeValuePreferredSeqno) { |
625 | 0 | ikey_.type = kTypeValue; |
626 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
627 | 0 | } |
628 | |
|
629 | 0 | value_.clear(); |
630 | 0 | validity_info_.SetValid(ValidContext::kKeepSDAndClearPut); |
631 | 0 | clear_and_output_next_key_ = false; |
632 | 523k | } else if (ikey_.type == kTypeSingleDeletion) { |
633 | | // We can compact out a SingleDelete if: |
634 | | // 1) We encounter the corresponding PUT -OR- we know that this key |
635 | | // doesn't appear past this output level and we are not in |
636 | | // ingest_behind mode. |
637 | | // =AND= |
638 | | // 2) We've already returned a record in this snapshot -OR- |
639 | | // there are no earlier earliest_write_conflict_snapshot. |
640 | | // |
641 | | // A note about 2) above: |
642 | | // we try to determine whether there is any earlier write conflict |
643 | | // checking snapshot by calling DefinitelyInSnapshot() with seq and |
644 | | // earliest_write_conflict_snapshot as arguments. For write-prepared |
645 | | // and write-unprepared transactions, if earliest_write_conflict_snapshot |
646 | | // is evicted from WritePreparedTxnDB::commit_cache, then |
647 | | // DefinitelyInSnapshot(seq, earliest_write_conflict_snapshot) returns |
648 | | // false, even if the seq is actually visible within |
649 | | // earliest_write_conflict_snapshot. Consequently, CompactionIterator |
650 | | // may try to zero out its sequence number, thus hitting assertion error |
651 | | // in debug mode or cause incorrect DBIter return result. |
652 | | // We observe that earliest_write_conflict_snapshot >= earliest_snapshot, |
653 | | // and the seq zeroing logic depends on |
654 | | // DefinitelyInSnapshot(seq, earliest_snapshot). Therefore, if we cannot |
655 | | // determine whether seq is **definitely** in |
656 | | // earliest_write_conflict_snapshot, then we can additionally check if |
657 | | // seq is definitely in earliest_snapshot. If the latter holds, then the |
658 | | // former holds too. |
659 | | // |
660 | | // Rule 1 is needed for SingleDelete correctness. Rule 2 is needed to |
661 | | // allow Transactions to do write-conflict checking (if we compacted away |
662 | | // all keys, then we wouldn't know that a write happened in this |
663 | | // snapshot). If there is no earlier snapshot, then we know that there |
664 | | // are no active transactions that need to know about any writes. |
665 | | // |
666 | | // Optimization 3: |
667 | | // If we encounter a SingleDelete followed by a PUT and Rule 2 is NOT |
668 | | // true, then we must output a SingleDelete. In this case, we will decide |
669 | | // to also output the PUT. While we are compacting less by outputting the |
670 | | // PUT now, hopefully this will lead to better compaction in the future |
671 | | // when Rule 2 is later true (Ie, We are hoping we can later compact out |
672 | | // both the SingleDelete and the Put, while we couldn't if we only |
673 | | // outputted the SingleDelete now). |
674 | | // In this case, we can save space by removing the PUT's value as it will |
675 | | // never be read. |
676 | | // |
677 | | // Deletes and Merges are not supported on the same key that has a |
678 | | // SingleDelete as it is not possible to correctly do any partial |
679 | | // compaction of such a combination of operations. The result of mixing |
680 | | // those operations for a given key is documented as being undefined. So |
681 | | // we can choose how to handle such a combinations of operations. We will |
682 | | // try to compact out as much as we can in these cases. |
683 | | // We will report counts on these anomalous cases. |
684 | | // |
685 | | // Note: If timestamp is enabled, then record will be eligible for |
686 | | // deletion, only if, along with above conditions (Rule 1 and Rule 2) |
687 | | // full_history_ts_low_ is specified and timestamp for that key is less |
688 | | // than *full_history_ts_low_. If it's not eligible for deletion, then we |
689 | | // will output the SingleDelete. For Optimization 3 also, if |
690 | | // full_history_ts_low_ is specified and timestamp for the key is less |
691 | | // than *full_history_ts_low_ then only optimization will be applied. |
692 | | |
693 | | // The easiest way to process a SingleDelete during iteration is to peek |
694 | | // ahead at the next key. |
695 | 0 | const bool is_timestamp_eligible_for_gc = |
696 | 0 | (timestamp_size_ == 0 || |
697 | 0 | (full_history_ts_low_ && cmp_with_history_ts_low_ < 0)); |
698 | |
|
699 | 0 | ParsedInternalKey next_ikey; |
700 | 0 | AdvanceInputIter(); |
701 | 0 | while (input_.Valid() && input_.IsDeleteRangeSentinelKey() && |
702 | 0 | ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) |
703 | 0 | .ok() && |
704 | 0 | cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { |
705 | | // skip range tombstone start keys with the same user key |
706 | | // since they are not "real" point keys. |
707 | 0 | AdvanceInputIter(); |
708 | 0 | } |
709 | | |
710 | | // Check whether the next key exists, is not corrupt, and is the same key |
711 | | // as the single delete. |
712 | 0 | if (input_.Valid() && |
713 | 0 | ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) |
714 | 0 | .ok() && |
715 | 0 | cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { |
716 | 0 | assert(!input_.IsDeleteRangeSentinelKey()); |
717 | | #ifndef NDEBUG |
718 | | const Compaction* c = |
719 | | compaction_ ? compaction_->real_compaction() : nullptr; |
720 | | #endif |
721 | 0 | TEST_SYNC_POINT_CALLBACK( |
722 | 0 | "CompactionIterator::NextFromInput:SingleDelete:1", |
723 | 0 | const_cast<Compaction*>(c)); |
724 | 0 | if (last_key_seq_zeroed_) { |
725 | | // Drop SD and the next key since they are both in the last |
726 | | // snapshot (since last key has seqno zeroed). |
727 | 0 | ++iter_stats_.num_record_drop_hidden; |
728 | 0 | ++iter_stats_.num_record_drop_obsolete; |
729 | 0 | assert(bottommost_level_); |
730 | 0 | AdvanceInputIter(); |
731 | 0 | } else if (prev_snapshot == 0 || |
732 | 0 | DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot)) { |
733 | | // Check whether the next key belongs to the same snapshot as the |
734 | | // SingleDelete. |
735 | |
|
736 | 0 | TEST_SYNC_POINT_CALLBACK( |
737 | 0 | "CompactionIterator::NextFromInput:SingleDelete:2", nullptr); |
738 | 0 | if (next_ikey.type == kTypeSingleDeletion) { |
739 | | // We encountered two SingleDeletes for same key in a row. This |
740 | | // could be due to unexpected user input. If write-(un)prepared |
741 | | // transaction is used, this could also be due to releasing an old |
742 | | // snapshot between a Put and its matching SingleDelete. |
743 | | // Skip the first SingleDelete and let the next iteration decide |
744 | | // how to handle the second SingleDelete. |
745 | | |
746 | | // First SingleDelete has been skipped since we already called |
747 | | // input_.Next(). |
748 | 0 | ++iter_stats_.num_record_drop_obsolete; |
749 | 0 | ++iter_stats_.num_single_del_mismatch; |
750 | 0 | } else if (next_ikey.type == kTypeDeletion) { |
751 | 0 | std::ostringstream oss; |
752 | 0 | oss << "Found SD and type: " << static_cast<int>(next_ikey.type) |
753 | 0 | << " on the same key, violating the contract " |
754 | 0 | "of SingleDelete. Check your application to make sure the " |
755 | 0 | "application does not mix SingleDelete and Delete for " |
756 | 0 | "the same key. If you are using " |
757 | 0 | "write-prepared/write-unprepared transactions, and use " |
758 | 0 | "SingleDelete to delete certain keys, then make sure " |
759 | 0 | "TransactionDBOptions::rollback_deletion_type_callback is " |
760 | 0 | "configured properly. Mixing SD and DEL can lead to " |
761 | 0 | "undefined behaviors"; |
762 | 0 | ++iter_stats_.num_record_drop_obsolete; |
763 | 0 | ++iter_stats_.num_single_del_mismatch; |
764 | 0 | if (enforce_single_del_contracts_) { |
765 | 0 | ROCKS_LOG_ERROR(info_log_, "%s", oss.str().c_str()); |
766 | 0 | validity_info_.Invalidate(); |
767 | 0 | status_ = Status::Corruption(oss.str()); |
768 | 0 | return; |
769 | 0 | } |
770 | 0 | ROCKS_LOG_WARN(info_log_, "%s", oss.str().c_str()); |
771 | 0 | } else if (!is_timestamp_eligible_for_gc) { |
772 | | // We cannot drop the SingleDelete as timestamp is enabled, and |
773 | | // timestamp of this key is greater than or equal to |
774 | | // *full_history_ts_low_. We will output the SingleDelete. |
775 | 0 | validity_info_.SetValid(ValidContext::kKeepTsHistory); |
776 | 0 | } else if (has_outputted_key_ || |
777 | 0 | DefinitelyInSnapshot(ikey_.sequence, |
778 | 0 | earliest_write_conflict_snapshot_) || |
779 | 0 | (earliest_snapshot_ < earliest_write_conflict_snapshot_ && |
780 | 0 | DefinitelyInSnapshot(ikey_.sequence, |
781 | 0 | earliest_snapshot_))) { |
782 | | // Found a matching value, we can drop the single delete and the |
783 | | // value. It is safe to drop both records since we've already |
784 | | // outputted a key in this snapshot, or there is no earlier |
785 | | // snapshot (Rule 2 above). |
786 | | |
787 | | // Note: it doesn't matter whether the second key is a Put or if it |
788 | | // is an unexpected Merge or Delete. We will compact it out |
789 | | // either way. We will maintain counts of how many mismatches |
790 | | // happened |
791 | 0 | if (next_ikey.type != kTypeValue && |
792 | 0 | next_ikey.type != kTypeBlobIndex && |
793 | 0 | next_ikey.type != kTypeWideColumnEntity && |
794 | 0 | next_ikey.type != kTypeValuePreferredSeqno) { |
795 | 0 | ++iter_stats_.num_single_del_mismatch; |
796 | 0 | } |
797 | |
|
798 | 0 | ++iter_stats_.num_record_drop_hidden; |
799 | 0 | ++iter_stats_.num_record_drop_obsolete; |
800 | | // Already called input_.Next() once. Call it a second time to |
801 | | // skip past the second key. |
802 | 0 | AdvanceInputIter(); |
803 | 0 | } else { |
804 | | // Found a matching value, but we cannot drop both keys since |
805 | | // there is an earlier snapshot and we need to leave behind a record |
806 | | // to know that a write happened in this snapshot (Rule 2 above). |
807 | | // Clear the value and output the SingleDelete. (The value will be |
808 | | // outputted on the next iteration.) |
809 | | |
810 | | // Setting valid_ to true will output the current SingleDelete |
811 | 0 | validity_info_.SetValid(ValidContext::kKeepSDForConflictCheck); |
812 | | |
813 | | // Set up the Put to be outputted in the next iteration. |
814 | | // (Optimization 3). |
815 | 0 | clear_and_output_next_key_ = true; |
816 | 0 | TEST_SYNC_POINT_CALLBACK( |
817 | 0 | "CompactionIterator::NextFromInput:KeepSDForWW", |
818 | 0 | /*arg=*/nullptr); |
819 | 0 | } |
820 | 0 | } else { |
821 | | // We hit the next snapshot without hitting a put, so the iterator |
822 | | // returns the single delete. |
823 | 0 | validity_info_.SetValid(ValidContext::kKeepSDForSnapshot); |
824 | 0 | TEST_SYNC_POINT_CALLBACK( |
825 | 0 | "CompactionIterator::NextFromInput:SingleDelete:3", |
826 | 0 | const_cast<Compaction*>(c)); |
827 | 0 | } |
828 | 0 | } else { |
829 | | // We are at the end of the input, could not parse the next key, or hit |
830 | | // a different key. The iterator returns the single delete if the key |
831 | | // possibly exists beyond the current output level. We set |
832 | | // has_current_user_key to false so that if the iterator is at the next |
833 | | // key, we do not compare it again against the previous key at the next |
834 | | // iteration. If the next key is corrupt, we return before the |
835 | | // comparison, so the value of has_current_user_key does not matter. |
836 | 0 | has_current_user_key_ = false; |
837 | 0 | if (compaction_ != nullptr && !compaction_->allow_ingest_behind() && |
838 | 0 | DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && |
839 | 0 | compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
840 | 0 | &level_ptrs_) && |
841 | 0 | is_timestamp_eligible_for_gc) { |
842 | | // Key doesn't exist outside of this range. |
843 | | // Can compact out this SingleDelete. |
844 | 0 | ++iter_stats_.num_record_drop_obsolete; |
845 | 0 | ++iter_stats_.num_single_del_fallthru; |
846 | 0 | if (!bottommost_level_) { |
847 | 0 | ++iter_stats_.num_optimized_del_drop_obsolete; |
848 | 0 | } |
849 | 0 | } else if (last_key_seq_zeroed_) { |
850 | | // Sequence number zeroing requires bottommost_level_, which is |
851 | | // false with ingest_behind. |
852 | 0 | assert(!compaction_->allow_ingest_behind()); |
853 | | // Skip. |
854 | 0 | ++iter_stats_.num_record_drop_hidden; |
855 | 0 | ++iter_stats_.num_record_drop_obsolete; |
856 | 0 | assert(bottommost_level_); |
857 | 0 | } else { |
858 | | // Output SingleDelete |
859 | 0 | validity_info_.SetValid(ValidContext::kKeepSD); |
860 | 0 | } |
861 | 0 | } |
862 | | |
863 | 0 | if (Valid()) { |
864 | 0 | at_next_ = true; |
865 | 0 | } |
866 | 523k | } else if (last_sequence != kMaxSequenceNumber && |
867 | 409k | (last_snapshot == current_user_key_snapshot_ || |
868 | 409k | last_snapshot < current_user_key_snapshot_)) { |
869 | | // rule (A): |
870 | | // If the earliest snapshot is which this key is visible in |
871 | | // is the same as the visibility of a previous instance of the |
872 | | // same key, then this kv is not visible in any snapshot. |
873 | | // Hidden by an newer entry for same user key |
874 | | // |
875 | | // Note: Dropping this key will not affect TransactionDB write-conflict |
876 | | // checking since there has already been a record returned for this key |
877 | | // in this snapshot. |
878 | | // When ingest_behind is enabled, it's ok that we drop an overwritten |
879 | | // Delete here. The overwritting key still covers whatever that will be |
880 | | // ingested. Note that we will not drop SingleDelete here as SingleDelte |
881 | | // is handled entirely in its own if clause. This is important, see |
882 | | // example: from new to old: SingleDelete_1, PUT_1, SingleDelete_2, PUT_2, |
883 | | // where all operations are on the same key and PUT_2 is ingested with |
884 | | // ingest_behind=true. If SingleDelete_2 is dropped due to being compacted |
885 | | // together with PUT_1, and then PUT_1 is compacted away together with |
886 | | // SingleDelete_1, PUT_2 can incorrectly becomes visible. |
887 | 409k | if (last_sequence < current_user_key_sequence_) { |
888 | 0 | ROCKS_LOG_FATAL(info_log_, |
889 | 0 | "key %s, last_sequence (%" PRIu64 |
890 | 0 | ") < current_user_key_sequence_ (%" PRIu64 ")", |
891 | 0 | ikey_.DebugString(allow_data_in_errors_, true).c_str(), |
892 | 0 | last_sequence, current_user_key_sequence_); |
893 | 0 | assert(false); |
894 | 0 | } |
895 | | |
896 | 409k | ++iter_stats_.num_record_drop_hidden; |
897 | 409k | AdvanceInputIter(); |
898 | 409k | } else if (compaction_ != nullptr && |
899 | 4.08k | (ikey_.type == kTypeDeletion || |
900 | 2.27k | (ikey_.type == kTypeDeletionWithTimestamp && |
901 | 0 | cmp_with_history_ts_low_ < 0)) && |
902 | 1.80k | !compaction_->allow_ingest_behind() && |
903 | 1.80k | DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && |
904 | 1.80k | compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
905 | 1.80k | &level_ptrs_)) { |
906 | | // TODO(noetzli): This is the only place where we use compaction_ |
907 | | // (besides the constructor). We should probably get rid of this |
908 | | // dependency and find a way to do similar filtering during flushes. |
909 | | // |
910 | | // For this user key: |
911 | | // (1) there is no data in higher levels |
912 | | // (2) data in lower levels will have larger sequence numbers |
913 | | // (3) data in layers that are being compacted here and have |
914 | | // smaller sequence numbers will be dropped in the next |
915 | | // few iterations of this loop (by rule (A) above). |
916 | | // Therefore this deletion marker is obsolete and can be dropped. |
917 | | // |
918 | | // Note: Dropping this Delete will not affect TransactionDB |
919 | | // write-conflict checking since it is earlier than any snapshot. |
920 | | // |
921 | | // It seems that we can also drop deletion later than earliest snapshot |
922 | | // given that: |
923 | | // (1) The deletion is earlier than earliest_write_conflict_snapshot, and |
924 | | // (2) No value exist earlier than the deletion. |
925 | | // |
926 | | // Note also that a deletion marker of type kTypeDeletionWithTimestamp |
927 | | // will be treated as a different user key unless the timestamp is older |
928 | | // than *full_history_ts_low_. |
929 | 1.80k | ++iter_stats_.num_record_drop_obsolete; |
930 | 1.80k | if (!bottommost_level_) { |
931 | 0 | ++iter_stats_.num_optimized_del_drop_obsolete; |
932 | 0 | } |
933 | 1.80k | AdvanceInputIter(); |
934 | 111k | } else if ((ikey_.type == kTypeDeletion || |
935 | 88.2k | (ikey_.type == kTypeDeletionWithTimestamp && |
936 | 0 | cmp_with_history_ts_low_ < 0)) && |
937 | 23.5k | bottommost_level_) { |
938 | 0 | assert(compaction_); |
939 | 0 | assert(!compaction_->allow_ingest_behind()); // bottommost_level_ is true |
940 | | // Handle the case where we have a delete key at the bottom most level |
941 | | // We can skip outputting the key iff there are no subsequent puts for |
942 | | // this key |
943 | 0 | assert(compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
944 | 0 | &level_ptrs_)); |
945 | 0 | ParsedInternalKey next_ikey; |
946 | 0 | AdvanceInputIter(); |
947 | | #ifndef NDEBUG |
948 | | const Compaction* c = |
949 | | compaction_ ? compaction_->real_compaction() : nullptr; |
950 | | #endif |
951 | 0 | TEST_SYNC_POINT_CALLBACK( |
952 | 0 | "CompactionIterator::NextFromInput:BottommostDelete:1", |
953 | 0 | const_cast<Compaction*>(c)); |
954 | | // Skip over all versions of this key that happen to occur in the same |
955 | | // snapshot range as the delete. |
956 | | // |
957 | | // Note that a deletion marker of type kTypeDeletionWithTimestamp will be |
958 | | // considered to have a different user key unless the timestamp is older |
959 | | // than *full_history_ts_low_. |
960 | | // |
961 | | // Range tombstone start keys are skipped as they are not "real" keys. |
962 | 0 | while (!IsPausingManualCompaction() && !IsShuttingDown() && |
963 | 0 | input_.Valid() && |
964 | 0 | (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) |
965 | 0 | .ok()) && |
966 | 0 | cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key) && |
967 | 0 | (prev_snapshot == 0 || input_.IsDeleteRangeSentinelKey() || |
968 | 0 | DefinitelyNotInSnapshot(next_ikey.sequence, prev_snapshot))) { |
969 | 0 | AdvanceInputIter(); |
970 | 0 | } |
971 | | // If you find you still need to output a row with this key, we need to |
972 | | // output the delete too |
973 | 0 | if (input_.Valid() && |
974 | 0 | (ParseInternalKey(input_.key(), &next_ikey, allow_data_in_errors_) |
975 | 0 | .ok()) && |
976 | 0 | cmp_->EqualWithoutTimestamp(ikey_.user_key, next_ikey.user_key)) { |
977 | 0 | validity_info_.SetValid(ValidContext::kKeepDel); |
978 | 0 | at_next_ = true; |
979 | 0 | } |
980 | 111k | } else if (ikey_.type == kTypeValuePreferredSeqno && |
981 | 0 | DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && |
982 | 0 | (bottommost_level_ || |
983 | 0 | (compaction_ != nullptr && |
984 | 0 | compaction_->KeyNotExistsBeyondOutputLevel(ikey_.user_key, |
985 | 0 | &level_ptrs_)))) { |
986 | | // FIXME: it's possible that we are setting sequence number to 0 as |
987 | | // preferred sequence number here. If cf_ingest_behind is enabled, this |
988 | | // may fail ingestions since they expect all keys above the last level |
989 | | // to have non-zero sequence number. We should probably not allow seqno |
990 | | // zeroing here. |
991 | | // |
992 | | // This section that attempts to swap preferred sequence number will not |
993 | | // be invoked if this is a CompactionIterator created for flush, since |
994 | | // `compaction_` will be nullptr and it's not bottommost either. |
995 | | // |
996 | | // The entries with the same user key and smaller sequence numbers are |
997 | | // all in this earliest snapshot range to be iterated. Since those entries |
998 | | // will be hidden by this entry [rule A], it's safe to swap in the |
999 | | // preferred seqno now. |
1000 | | // |
1001 | | // It's otherwise not safe to swap in the preferred seqno since it's |
1002 | | // possible for entries in earlier snapshots to have sequence number that |
1003 | | // is smaller than this entry's sequence number but bigger than this |
1004 | | // entry's preferred sequence number. Swapping in the preferred sequence |
1005 | | // number will break the internal key ordering invariant for this key. |
1006 | | // |
1007 | | // A special case involving range deletion is handled separately below. |
1008 | 0 | auto [unpacked_value, preferred_seqno] = |
1009 | 0 | ParsePackedValueWithSeqno(value_); |
1010 | 0 | assert(preferred_seqno < ikey_.sequence || ikey_.sequence == 0); |
1011 | 0 | if (range_del_agg_->ShouldDelete( |
1012 | 0 | key_, RangeDelPositioningMode::kForwardTraversal)) { |
1013 | 0 | ++iter_stats_.num_record_drop_hidden; |
1014 | 0 | ++iter_stats_.num_record_drop_range_del; |
1015 | 0 | AdvanceInputIter(); |
1016 | 0 | } else { |
1017 | 0 | InternalKey ikey_after_swap(ikey_.user_key, |
1018 | 0 | std::min(preferred_seqno, ikey_.sequence), |
1019 | 0 | kTypeValue); |
1020 | 0 | Slice ikey_after_swap_slice(*ikey_after_swap.rep()); |
1021 | 0 | if (range_del_agg_->ShouldDelete( |
1022 | 0 | ikey_after_swap_slice, |
1023 | 0 | RangeDelPositioningMode::kForwardTraversal)) { |
1024 | | // A range tombstone that doesn't cover this kTypeValuePreferredSeqno |
1025 | | // entry will end up covering the entry, so it's not safe to swap |
1026 | | // preferred sequence number. In this case, we output the entry as is. |
1027 | 0 | validity_info_.SetValid(ValidContext::kNewUserKey); |
1028 | 0 | } else { |
1029 | 0 | if (ikey_.sequence != 0) { |
1030 | 0 | iter_stats_.num_timed_put_swap_preferred_seqno++; |
1031 | 0 | ikey_.sequence = preferred_seqno; |
1032 | 0 | } |
1033 | 0 | ikey_.type = kTypeValue; |
1034 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
1035 | 0 | key_ = current_key_.GetInternalKey(); |
1036 | 0 | ikey_.user_key = current_key_.GetUserKey(); |
1037 | 0 | value_ = unpacked_value; |
1038 | 0 | validity_info_.SetValid(ValidContext::kSwapPreferredSeqno); |
1039 | 0 | } |
1040 | 0 | } |
1041 | 111k | } else if (ikey_.type == kTypeMerge) { |
1042 | 0 | if (!merge_helper_->HasOperator()) { |
1043 | 0 | status_ = Status::InvalidArgument( |
1044 | 0 | "merge_operator is not properly initialized."); |
1045 | 0 | return; |
1046 | 0 | } |
1047 | | |
1048 | 0 | pinned_iters_mgr_.StartPinning(); |
1049 | | |
1050 | | // We know the merge type entry is not hidden, otherwise we would |
1051 | | // have hit (A) |
1052 | | // We encapsulate the merge related state machine in a different |
1053 | | // object to minimize change to the existing flow. |
1054 | 0 | merge_until_status_ = merge_helper_->MergeUntil( |
1055 | 0 | &input_, range_del_agg_, prev_snapshot, bottommost_level_, |
1056 | 0 | allow_data_in_errors_, blob_fetcher_.get(), full_history_ts_low_, |
1057 | 0 | prefetch_buffers_.get(), &iter_stats_); |
1058 | 0 | merge_out_iter_.SeekToFirst(); |
1059 | |
|
1060 | 0 | if (!merge_until_status_.ok() && |
1061 | 0 | !merge_until_status_.IsMergeInProgress()) { |
1062 | 0 | status_ = merge_until_status_; |
1063 | 0 | return; |
1064 | 0 | } else if (merge_out_iter_.Valid()) { |
1065 | | // NOTE: key, value, and ikey_ refer to old entries. |
1066 | | // These will be correctly set below. |
1067 | 0 | key_ = merge_out_iter_.key(); |
1068 | 0 | value_ = merge_out_iter_.value(); |
1069 | 0 | pik_status = ParseInternalKey(key_, &ikey_, allow_data_in_errors_); |
1070 | | // MergeUntil stops when it encounters a corrupt key and does not |
1071 | | // include them in the result, so we expect the keys here to valid. |
1072 | 0 | if (!pik_status.ok()) { |
1073 | 0 | ROCKS_LOG_FATAL( |
1074 | 0 | info_log_, "Invalid key %s in compaction. %s", |
1075 | 0 | allow_data_in_errors_ ? key_.ToString(true).c_str() : "hidden", |
1076 | 0 | pik_status.getState()); |
1077 | 0 | assert(false); |
1078 | 0 | } |
1079 | | // Keep current_key_ in sync. |
1080 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
1081 | 0 | key_ = current_key_.GetInternalKey(); |
1082 | 0 | ikey_.user_key = current_key_.GetUserKey(); |
1083 | 0 | validity_info_.SetValid(ValidContext::kMerge2); |
1084 | 0 | } else { |
1085 | | // all merge operands were filtered out. reset the user key, since the |
1086 | | // batch consumed by the merge operator should not shadow any keys |
1087 | | // coming after the merges |
1088 | 0 | has_current_user_key_ = false; |
1089 | 0 | pinned_iters_mgr_.ReleasePinnedData(); |
1090 | |
|
1091 | 0 | if (merge_helper_->FilteredUntil(&skip_until)) { |
1092 | 0 | need_skip = true; |
1093 | 0 | } |
1094 | 0 | } |
1095 | 111k | } else { |
1096 | | // 1. new user key -OR- |
1097 | | // 2. different snapshot stripe |
1098 | | // If user-defined timestamp is enabled, we consider keys for GC if they |
1099 | | // are below history_ts_low_. CompactionRangeDelAggregator::ShouldDelete() |
1100 | | // only considers range deletions that are at or below history_ts_low_ and |
1101 | | // trim_ts_. We drop keys here that are below history_ts_low_ and are |
1102 | | // covered by a range tombstone that is at or below history_ts_low_ and |
1103 | | // trim_ts. |
1104 | 111k | bool should_delete = false; |
1105 | 111k | if (!timestamp_size_ || cmp_with_history_ts_low_ < 0) { |
1106 | 111k | should_delete = range_del_agg_->ShouldDelete( |
1107 | 111k | key_, RangeDelPositioningMode::kForwardTraversal); |
1108 | 111k | } |
1109 | 111k | if (should_delete) { |
1110 | 56.8k | ++iter_stats_.num_record_drop_hidden; |
1111 | 56.8k | ++iter_stats_.num_record_drop_range_del; |
1112 | 56.8k | AdvanceInputIter(); |
1113 | 56.8k | } else { |
1114 | 55.0k | validity_info_.SetValid(ValidContext::kNewUserKey); |
1115 | 55.0k | } |
1116 | 111k | } |
1117 | | |
1118 | 523k | if (need_skip) { |
1119 | 0 | SkipUntil(skip_until); |
1120 | 0 | } |
1121 | 523k | } |
1122 | | |
1123 | 77.4k | if (status_.ok()) { |
1124 | 77.4k | if (!Valid() && IsShuttingDown()) { |
1125 | 1.32k | status_ = Status::ShutdownInProgress(); |
1126 | 76.0k | } else if (IsPausingManualCompaction()) { |
1127 | 0 | status_ = Status::Incomplete(Status::SubCode::kManualCompactionPaused); |
1128 | 76.0k | } else if (!input_.Valid() && input_.status().IsCorruption()) { |
1129 | | // Propagate corruption status from memtable iterator |
1130 | 0 | status_ = input_.status(); |
1131 | 0 | } |
1132 | 77.4k | } |
1133 | 77.4k | } |
1134 | | |
1135 | 41.9k | bool CompactionIterator::ExtractLargeValueIfNeededImpl() { |
1136 | 41.9k | if (!blob_file_builder_) { |
1137 | 41.9k | return false; |
1138 | 41.9k | } |
1139 | | |
1140 | 0 | blob_index_.clear(); |
1141 | 0 | const Status s = blob_file_builder_->Add(user_key(), value_, &blob_index_); |
1142 | |
|
1143 | 0 | if (!s.ok()) { |
1144 | 0 | status_ = s; |
1145 | 0 | validity_info_.Invalidate(); |
1146 | |
|
1147 | 0 | return false; |
1148 | 0 | } |
1149 | | |
1150 | 0 | if (blob_index_.empty()) { |
1151 | 0 | return false; |
1152 | 0 | } |
1153 | | |
1154 | 0 | value_ = blob_index_; |
1155 | |
|
1156 | 0 | return true; |
1157 | 0 | } |
1158 | | |
1159 | 41.9k | void CompactionIterator::ExtractLargeValueIfNeeded() { |
1160 | 41.9k | assert(ikey_.type == kTypeValue); |
1161 | | |
1162 | 41.9k | if (!ExtractLargeValueIfNeededImpl()) { |
1163 | 41.9k | return; |
1164 | 41.9k | } |
1165 | | |
1166 | 0 | ikey_.type = kTypeBlobIndex; |
1167 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
1168 | 0 | } |
1169 | | |
1170 | 0 | void CompactionIterator::GarbageCollectBlobIfNeeded() { |
1171 | 0 | assert(ikey_.type == kTypeBlobIndex); |
1172 | |
|
1173 | 0 | if (!compaction_) { |
1174 | 0 | return; |
1175 | 0 | } |
1176 | | |
1177 | | // GC for integrated BlobDB |
1178 | 0 | if (compaction_->enable_blob_garbage_collection()) { |
1179 | 0 | TEST_SYNC_POINT_CALLBACK( |
1180 | 0 | "CompactionIterator::GarbageCollectBlobIfNeeded::TamperWithBlobIndex", |
1181 | 0 | &value_); |
1182 | |
|
1183 | 0 | BlobIndex blob_index; |
1184 | |
|
1185 | 0 | { |
1186 | 0 | const Status s = blob_index.DecodeFrom(value_); |
1187 | |
|
1188 | 0 | if (!s.ok()) { |
1189 | 0 | status_ = s; |
1190 | 0 | validity_info_.Invalidate(); |
1191 | |
|
1192 | 0 | return; |
1193 | 0 | } |
1194 | 0 | } |
1195 | | |
1196 | 0 | if (blob_index.file_number() >= |
1197 | 0 | blob_garbage_collection_cutoff_file_number_) { |
1198 | 0 | return; |
1199 | 0 | } |
1200 | | |
1201 | 0 | FilePrefetchBuffer* prefetch_buffer = |
1202 | 0 | prefetch_buffers_ ? prefetch_buffers_->GetOrCreatePrefetchBuffer( |
1203 | 0 | blob_index.file_number()) |
1204 | 0 | : nullptr; |
1205 | |
|
1206 | 0 | uint64_t bytes_read = 0; |
1207 | |
|
1208 | 0 | { |
1209 | 0 | assert(blob_fetcher_); |
1210 | |
|
1211 | 0 | const Status s = blob_fetcher_->FetchBlob( |
1212 | 0 | user_key(), blob_index, prefetch_buffer, &blob_value_, &bytes_read); |
1213 | |
|
1214 | 0 | if (!s.ok()) { |
1215 | 0 | status_ = s; |
1216 | 0 | validity_info_.Invalidate(); |
1217 | |
|
1218 | 0 | return; |
1219 | 0 | } |
1220 | 0 | } |
1221 | | |
1222 | 0 | ++iter_stats_.num_blobs_read; |
1223 | 0 | iter_stats_.total_blob_bytes_read += bytes_read; |
1224 | |
|
1225 | 0 | ++iter_stats_.num_blobs_relocated; |
1226 | 0 | iter_stats_.total_blob_bytes_relocated += blob_index.size(); |
1227 | |
|
1228 | 0 | value_ = blob_value_; |
1229 | |
|
1230 | 0 | if (ExtractLargeValueIfNeededImpl()) { |
1231 | 0 | return; |
1232 | 0 | } |
1233 | | |
1234 | 0 | ikey_.type = kTypeValue; |
1235 | 0 | current_key_.UpdateInternalKey(ikey_.sequence, ikey_.type); |
1236 | |
|
1237 | 0 | return; |
1238 | 0 | } |
1239 | | |
1240 | | // GC for stacked BlobDB |
1241 | 0 | if (compaction_filter_ && |
1242 | 0 | compaction_filter_->IsStackedBlobDbInternalCompactionFilter()) { |
1243 | 0 | const auto blob_decision = compaction_filter_->PrepareBlobOutput( |
1244 | 0 | user_key(), value_, &compaction_filter_value_); |
1245 | |
|
1246 | 0 | if (blob_decision == CompactionFilter::BlobDecision::kCorruption) { |
1247 | 0 | status_ = |
1248 | 0 | Status::Corruption("Corrupted blob reference encountered during GC"); |
1249 | 0 | validity_info_.Invalidate(); |
1250 | |
|
1251 | 0 | return; |
1252 | 0 | } |
1253 | | |
1254 | 0 | if (blob_decision == CompactionFilter::BlobDecision::kIOError) { |
1255 | 0 | status_ = Status::IOError("Could not relocate blob during GC"); |
1256 | 0 | validity_info_.Invalidate(); |
1257 | |
|
1258 | 0 | return; |
1259 | 0 | } |
1260 | | |
1261 | 0 | if (blob_decision == CompactionFilter::BlobDecision::kChangeValue) { |
1262 | 0 | value_ = compaction_filter_value_; |
1263 | |
|
1264 | 0 | return; |
1265 | 0 | } |
1266 | 0 | } |
1267 | 0 | } |
1268 | | |
1269 | 77.4k | void CompactionIterator::PrepareOutput() { |
1270 | 77.4k | if (Valid()) { |
1271 | 55.0k | if (LIKELY(!is_range_del_)) { |
1272 | 55.0k | if (ikey_.type == kTypeValue) { |
1273 | 41.9k | ExtractLargeValueIfNeeded(); |
1274 | 41.9k | } else if (ikey_.type == kTypeBlobIndex) { |
1275 | 0 | GarbageCollectBlobIfNeeded(); |
1276 | 0 | } |
1277 | 55.0k | } |
1278 | | |
1279 | | // Zeroing out the sequence number leads to better compression. |
1280 | | // If this is the bottommost level (no files in lower levels) |
1281 | | // and the earliest snapshot is larger than this seqno |
1282 | | // and the userkey differs from the last userkey in compaction |
1283 | | // then we can squash the seqno to zero. |
1284 | | // |
1285 | | // This is safe for TransactionDB write-conflict checking since transactions |
1286 | | // only care about sequence number larger than any active snapshots. |
1287 | | // |
1288 | | // Can we do the same for levels above bottom level as long as |
1289 | | // KeyNotExistsBeyondOutputLevel() return true? |
1290 | 55.0k | if (Valid() && bottommost_level_ && |
1291 | 2.27k | DefinitelyInSnapshot(ikey_.sequence, earliest_snapshot_) && |
1292 | 2.27k | ikey_.type != kTypeMerge && current_key_committed_ && |
1293 | 2.27k | ikey_.sequence <= preserve_seqno_after_ && !is_range_del_) { |
1294 | 2.27k | assert(compaction_ != nullptr && !compaction_->allow_ingest_behind()); |
1295 | 2.27k | if (ikey_.type == kTypeDeletion || |
1296 | 2.27k | (ikey_.type == kTypeSingleDeletion && timestamp_size_ == 0)) { |
1297 | 0 | ROCKS_LOG_FATAL( |
1298 | 0 | info_log_, |
1299 | 0 | "Unexpected key %s for seq-zero optimization. " |
1300 | 0 | "earliest_snapshot %" PRIu64 |
1301 | 0 | ", earliest_write_conflict_snapshot %" PRIu64 |
1302 | 0 | " job_snapshot %" PRIu64 |
1303 | 0 | ". timestamp_size: %d full_history_ts_low_ %s. validity %x", |
1304 | 0 | ikey_.DebugString(allow_data_in_errors_, true).c_str(), |
1305 | 0 | earliest_snapshot_, earliest_write_conflict_snapshot_, |
1306 | 0 | job_snapshot_, static_cast<int>(timestamp_size_), |
1307 | 0 | full_history_ts_low_ != nullptr |
1308 | 0 | ? Slice(*full_history_ts_low_).ToString(true).c_str() |
1309 | 0 | : "null", |
1310 | 0 | validity_info_.rep); |
1311 | 0 | assert(false); |
1312 | 0 | } |
1313 | 2.27k | ikey_.sequence = 0; |
1314 | 2.27k | last_key_seq_zeroed_ = true; |
1315 | 2.27k | TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput:ZeroingSeq", |
1316 | 2.27k | &ikey_); |
1317 | 2.27k | if (!timestamp_size_) { |
1318 | 2.27k | current_key_.UpdateInternalKey(0, ikey_.type); |
1319 | 2.27k | } else if (full_history_ts_low_ && cmp_with_history_ts_low_ < 0) { |
1320 | | // We can also zero out timestamp for better compression. |
1321 | | // For the same user key (excluding timestamp), the timestamp-based |
1322 | | // history can be collapsed to save some space if the timestamp is |
1323 | | // older than *full_history_ts_low_. |
1324 | 0 | const std::string kTsMin(timestamp_size_, static_cast<char>(0)); |
1325 | 0 | const Slice ts_slice = kTsMin; |
1326 | 0 | ikey_.SetTimestamp(ts_slice); |
1327 | 0 | current_key_.UpdateInternalKey(0, ikey_.type, &ts_slice); |
1328 | 0 | } |
1329 | 2.27k | } |
1330 | 55.0k | } |
1331 | 77.4k | } |
1332 | | |
1333 | | inline SequenceNumber CompactionIterator::findEarliestVisibleSnapshot( |
1334 | 13 | SequenceNumber in, SequenceNumber* prev_snapshot) { |
1335 | 13 | assert(snapshots_->size()); |
1336 | 13 | if (snapshots_->size() == 0) { |
1337 | 0 | ROCKS_LOG_FATAL(info_log_, |
1338 | 0 | "No snapshot left in findEarliestVisibleSnapshot"); |
1339 | 0 | } |
1340 | 13 | auto snapshots_iter = |
1341 | 13 | std::lower_bound(snapshots_->begin(), snapshots_->end(), in); |
1342 | 13 | assert(prev_snapshot != nullptr); |
1343 | 13 | if (snapshots_iter == snapshots_->begin()) { |
1344 | 13 | *prev_snapshot = 0; |
1345 | 13 | } else { |
1346 | 0 | *prev_snapshot = *std::prev(snapshots_iter); |
1347 | 0 | if (*prev_snapshot >= in) { |
1348 | 0 | ROCKS_LOG_FATAL(info_log_, |
1349 | 0 | "*prev_snapshot (%" PRIu64 ") >= in (%" PRIu64 |
1350 | 0 | ") in findEarliestVisibleSnapshot", |
1351 | 0 | *prev_snapshot, in); |
1352 | 0 | assert(false); |
1353 | 0 | } |
1354 | 0 | } |
1355 | 13 | if (snapshot_checker_ == nullptr) { |
1356 | 13 | return snapshots_iter != snapshots_->end() ? *snapshots_iter |
1357 | 13 | : kMaxSequenceNumber; |
1358 | 13 | } |
1359 | 0 | bool has_released_snapshot = !released_snapshots_.empty(); |
1360 | 0 | for (; snapshots_iter != snapshots_->end(); ++snapshots_iter) { |
1361 | 0 | auto cur = *snapshots_iter; |
1362 | 0 | if (in > cur) { |
1363 | 0 | ROCKS_LOG_FATAL(info_log_, |
1364 | 0 | "in (%" PRIu64 ") > cur (%" PRIu64 |
1365 | 0 | ") in findEarliestVisibleSnapshot", |
1366 | 0 | in, cur); |
1367 | 0 | assert(false); |
1368 | 0 | } |
1369 | | // Skip if cur is in released_snapshots. |
1370 | 0 | if (has_released_snapshot && released_snapshots_.count(cur) > 0) { |
1371 | 0 | continue; |
1372 | 0 | } |
1373 | 0 | auto res = snapshot_checker_->CheckInSnapshot(in, cur); |
1374 | 0 | if (res == SnapshotCheckerResult::kInSnapshot) { |
1375 | 0 | return cur; |
1376 | 0 | } else if (res == SnapshotCheckerResult::kSnapshotReleased) { |
1377 | 0 | released_snapshots_.insert(cur); |
1378 | 0 | } |
1379 | 0 | *prev_snapshot = cur; |
1380 | 0 | } |
1381 | 0 | return kMaxSequenceNumber; |
1382 | 0 | } |
1383 | | |
1384 | | uint64_t CompactionIterator::ComputeBlobGarbageCollectionCutoffFileNumber( |
1385 | 22.3k | const CompactionProxy* compaction) { |
1386 | 22.3k | if (!compaction) { |
1387 | 18.7k | return 0; |
1388 | 18.7k | } |
1389 | | |
1390 | 3.68k | if (!compaction->enable_blob_garbage_collection()) { |
1391 | 3.68k | return 0; |
1392 | 3.68k | } |
1393 | | |
1394 | 0 | const Version* const version = compaction->input_version(); |
1395 | 0 | assert(version); |
1396 | |
|
1397 | 0 | const VersionStorageInfo* const storage_info = version->storage_info(); |
1398 | 0 | assert(storage_info); |
1399 | |
|
1400 | 0 | const auto& blob_files = storage_info->GetBlobFiles(); |
1401 | |
|
1402 | 0 | const size_t cutoff_index = static_cast<size_t>( |
1403 | 0 | compaction->blob_garbage_collection_age_cutoff() * blob_files.size()); |
1404 | |
|
1405 | 0 | if (cutoff_index >= blob_files.size()) { |
1406 | 0 | return std::numeric_limits<uint64_t>::max(); |
1407 | 0 | } |
1408 | | |
1409 | 0 | const auto& meta = blob_files[cutoff_index]; |
1410 | 0 | assert(meta); |
1411 | |
|
1412 | 0 | return meta->GetBlobFileNumber(); |
1413 | 0 | } |
1414 | | |
1415 | | std::unique_ptr<BlobFetcher> CompactionIterator::CreateBlobFetcherIfNeeded( |
1416 | 22.3k | const CompactionProxy* compaction) { |
1417 | 22.3k | if (!compaction) { |
1418 | 18.7k | return nullptr; |
1419 | 18.7k | } |
1420 | | |
1421 | 3.68k | const Version* const version = compaction->input_version(); |
1422 | 3.68k | if (!version) { |
1423 | 0 | return nullptr; |
1424 | 0 | } |
1425 | | |
1426 | 3.68k | ReadOptions read_options; |
1427 | 3.68k | read_options.io_activity = Env::IOActivity::kCompaction; |
1428 | 3.68k | read_options.fill_cache = false; |
1429 | | |
1430 | 3.68k | return std::unique_ptr<BlobFetcher>(new BlobFetcher(version, read_options)); |
1431 | 3.68k | } |
1432 | | |
1433 | | std::unique_ptr<PrefetchBufferCollection> |
1434 | | CompactionIterator::CreatePrefetchBufferCollectionIfNeeded( |
1435 | 22.3k | const CompactionProxy* compaction) { |
1436 | 22.3k | if (!compaction) { |
1437 | 18.7k | return nullptr; |
1438 | 18.7k | } |
1439 | | |
1440 | 3.68k | if (!compaction->input_version()) { |
1441 | 0 | return nullptr; |
1442 | 0 | } |
1443 | | |
1444 | 3.68k | if (compaction->allow_mmap_reads()) { |
1445 | 0 | return nullptr; |
1446 | 0 | } |
1447 | | |
1448 | 3.68k | const uint64_t readahead_size = compaction->blob_compaction_readahead_size(); |
1449 | 3.68k | if (!readahead_size) { |
1450 | 3.68k | return nullptr; |
1451 | 3.68k | } |
1452 | | |
1453 | 0 | return std::unique_ptr<PrefetchBufferCollection>( |
1454 | 0 | new PrefetchBufferCollection(readahead_size)); |
1455 | 3.68k | } |
1456 | | |
1457 | | } // namespace ROCKSDB_NAMESPACE |