/src/rocksdb/db/compaction/compaction_outputs.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | // |
7 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
8 | | // Use of this source code is governed by a BSD-style license that can be |
9 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
10 | | |
11 | | #include "db/compaction/compaction_outputs.h" |
12 | | |
13 | | #include "db/builder.h" |
14 | | |
15 | | namespace ROCKSDB_NAMESPACE { |
16 | | |
17 | 1.29k | void CompactionOutputs::NewBuilder(const TableBuilderOptions& tboptions) { |
18 | 1.29k | builder_.reset(NewTableBuilder(tboptions, file_writer_.get())); |
19 | 1.29k | } |
20 | | |
21 | | Status CompactionOutputs::Finish( |
22 | | const Status& intput_status, |
23 | 1.29k | const SeqnoToTimeMapping& seqno_to_time_mapping) { |
24 | 1.29k | FileMetaData* meta = GetMetaData(); |
25 | 1.29k | assert(meta != nullptr); |
26 | 1.29k | Status s = intput_status; |
27 | 1.29k | if (s.ok()) { |
28 | 1.08k | SeqnoToTimeMapping relevant_mapping; |
29 | 1.08k | relevant_mapping.CopyFromSeqnoRange( |
30 | 1.08k | seqno_to_time_mapping, |
31 | 1.08k | std::min(smallest_preferred_seqno_, meta->fd.smallest_seqno), |
32 | 1.08k | meta->fd.largest_seqno); |
33 | 1.08k | relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST); |
34 | 1.08k | builder_->SetSeqnoTimeTableProperties(relevant_mapping, |
35 | 1.08k | meta->oldest_ancester_time); |
36 | 1.08k | s = builder_->Finish(); |
37 | | |
38 | 1.08k | } else { |
39 | 213 | builder_->Abandon(); |
40 | 213 | } |
41 | 1.29k | Status io_s = builder_->io_status(); |
42 | 1.29k | if (s.ok()) { |
43 | 1.08k | s = io_s; |
44 | 1.08k | } else { |
45 | 213 | io_s.PermitUncheckedError(); |
46 | 213 | } |
47 | 1.29k | const uint64_t current_bytes = builder_->FileSize(); |
48 | 1.29k | if (s.ok()) { |
49 | 1.08k | meta->fd.file_size = current_bytes; |
50 | 1.08k | meta->tail_size = builder_->GetTailSize(); |
51 | 1.08k | meta->marked_for_compaction = builder_->NeedCompact(); |
52 | 1.08k | meta->user_defined_timestamps_persisted = static_cast<bool>( |
53 | 1.08k | builder_->GetTableProperties().user_defined_timestamps_persisted); |
54 | 1.08k | } |
55 | 1.29k | current_output().finished = true; |
56 | 1.29k | stats_.bytes_written += current_bytes; |
57 | 1.29k | stats_.num_output_files = outputs_.size(); |
58 | | |
59 | 1.29k | return s; |
60 | 1.29k | } |
61 | | |
62 | | IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status, |
63 | | SystemClock* clock, |
64 | | Statistics* statistics, |
65 | 1.29k | bool use_fsync) { |
66 | 1.29k | IOStatus io_s; |
67 | 1.29k | IOOptions opts; |
68 | 1.29k | io_s = WritableFileWriter::PrepareIOOptions( |
69 | 1.29k | WriteOptions(Env::IOActivity::kCompaction), opts); |
70 | 1.29k | if (input_status.ok() && io_s.ok()) { |
71 | 1.08k | StopWatch sw(clock, statistics, COMPACTION_OUTFILE_SYNC_MICROS); |
72 | 1.08k | io_s = file_writer_->Sync(opts, use_fsync); |
73 | 1.08k | } |
74 | 1.29k | if (input_status.ok() && io_s.ok()) { |
75 | 1.08k | io_s = file_writer_->Close(opts); |
76 | 1.08k | } |
77 | | |
78 | 1.29k | if (input_status.ok() && io_s.ok()) { |
79 | 1.08k | FileMetaData* meta = GetMetaData(); |
80 | 1.08k | meta->file_checksum = file_writer_->GetFileChecksum(); |
81 | 1.08k | meta->file_checksum_func_name = file_writer_->GetFileChecksumFuncName(); |
82 | 1.08k | } |
83 | | |
84 | 1.29k | file_writer_.reset(); |
85 | | |
86 | 1.29k | return io_s; |
87 | 1.29k | } |
88 | | |
89 | | bool CompactionOutputs::UpdateFilesToCutForTTLStates( |
90 | 1.05k | const Slice& internal_key) { |
91 | 1.05k | if (!files_to_cut_for_ttl_.empty()) { |
92 | 0 | const InternalKeyComparator* icmp = |
93 | 0 | &compaction_->column_family_data()->internal_comparator(); |
94 | 0 | if (cur_files_to_cut_for_ttl_ != -1) { |
95 | | // Previous key is inside the range of a file |
96 | 0 | if (icmp->Compare(internal_key, |
97 | 0 | files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_] |
98 | 0 | ->largest.Encode()) > 0) { |
99 | 0 | next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1; |
100 | 0 | cur_files_to_cut_for_ttl_ = -1; |
101 | 0 | return true; |
102 | 0 | } |
103 | 0 | } else { |
104 | | // Look for the key position |
105 | 0 | while (next_files_to_cut_for_ttl_ < |
106 | 0 | static_cast<int>(files_to_cut_for_ttl_.size())) { |
107 | 0 | if (icmp->Compare(internal_key, |
108 | 0 | files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] |
109 | 0 | ->smallest.Encode()) >= 0) { |
110 | 0 | if (icmp->Compare(internal_key, |
111 | 0 | files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] |
112 | 0 | ->largest.Encode()) <= 0) { |
113 | | // With in the current file |
114 | 0 | cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_; |
115 | 0 | return true; |
116 | 0 | } |
117 | | // Beyond the current file |
118 | 0 | next_files_to_cut_for_ttl_++; |
119 | 0 | } else { |
120 | | // Still fall into the gap |
121 | 0 | break; |
122 | 0 | } |
123 | 0 | } |
124 | 0 | } |
125 | 0 | } |
126 | 1.05k | return false; |
127 | 1.05k | } |
128 | | |
129 | | size_t CompactionOutputs::UpdateGrandparentBoundaryInfo( |
130 | 1.05k | const Slice& internal_key) { |
131 | 1.05k | size_t curr_key_boundary_switched_num = 0; |
132 | 1.05k | const std::vector<FileMetaData*>& grandparents = compaction_->grandparents(); |
133 | | |
134 | 1.05k | if (grandparents.empty()) { |
135 | 1.05k | return curr_key_boundary_switched_num; |
136 | 1.05k | } |
137 | 0 | const Comparator* ucmp = compaction_->column_family_data()->user_comparator(); |
138 | | |
139 | | // Move the grandparent_index_ to the file containing the current user_key. |
140 | | // If there are multiple files containing the same user_key, make sure the |
141 | | // index points to the last file containing the key. |
142 | 0 | while (grandparent_index_ < grandparents.size()) { |
143 | 0 | if (being_grandparent_gap_) { |
144 | 0 | if (sstableKeyCompare(ucmp, internal_key, |
145 | 0 | grandparents[grandparent_index_]->smallest) < 0) { |
146 | 0 | break; |
147 | 0 | } |
148 | 0 | if (seen_key_) { |
149 | 0 | curr_key_boundary_switched_num++; |
150 | 0 | grandparent_overlapped_bytes_ += |
151 | 0 | grandparents[grandparent_index_]->fd.GetFileSize(); |
152 | 0 | grandparent_boundary_switched_num_++; |
153 | 0 | } |
154 | 0 | being_grandparent_gap_ = false; |
155 | 0 | } else { |
156 | 0 | int cmp_result = sstableKeyCompare( |
157 | 0 | ucmp, internal_key, grandparents[grandparent_index_]->largest); |
158 | | // If it's same key, make sure grandparent_index_ is pointing to the last |
159 | | // one. |
160 | 0 | if (cmp_result < 0 || |
161 | 0 | (cmp_result == 0 && |
162 | 0 | (grandparent_index_ == grandparents.size() - 1 || |
163 | 0 | sstableKeyCompare(ucmp, internal_key, |
164 | 0 | grandparents[grandparent_index_ + 1]->smallest) < |
165 | 0 | 0))) { |
166 | 0 | break; |
167 | 0 | } |
168 | 0 | if (seen_key_) { |
169 | 0 | curr_key_boundary_switched_num++; |
170 | 0 | grandparent_boundary_switched_num_++; |
171 | 0 | } |
172 | 0 | being_grandparent_gap_ = true; |
173 | 0 | grandparent_index_++; |
174 | 0 | } |
175 | 0 | } |
176 | | |
177 | | // If the first key is in the middle of a grandparent file, adding it to the |
178 | | // overlap |
179 | 0 | if (!seen_key_ && !being_grandparent_gap_) { |
180 | 0 | assert(grandparent_overlapped_bytes_ == 0); |
181 | 0 | grandparent_overlapped_bytes_ = |
182 | 0 | GetCurrentKeyGrandparentOverlappedBytes(internal_key); |
183 | 0 | } |
184 | |
|
185 | 0 | seen_key_ = true; |
186 | 0 | return curr_key_boundary_switched_num; |
187 | 1.05k | } |
188 | | |
189 | | uint64_t CompactionOutputs::GetCurrentKeyGrandparentOverlappedBytes( |
190 | 0 | const Slice& internal_key) const { |
191 | | // no overlap with any grandparent file |
192 | 0 | if (being_grandparent_gap_) { |
193 | 0 | return 0; |
194 | 0 | } |
195 | 0 | uint64_t overlapped_bytes = 0; |
196 | |
|
197 | 0 | const std::vector<FileMetaData*>& grandparents = compaction_->grandparents(); |
198 | 0 | const Comparator* ucmp = compaction_->column_family_data()->user_comparator(); |
199 | 0 | InternalKey ikey; |
200 | 0 | ikey.DecodeFrom(internal_key); |
201 | | #ifndef NDEBUG |
202 | | // make sure the grandparent_index_ is pointing to the last files containing |
203 | | // the current key. |
204 | | int cmp_result = |
205 | | sstableKeyCompare(ucmp, ikey, grandparents[grandparent_index_]->largest); |
206 | | assert( |
207 | | cmp_result < 0 || |
208 | | (cmp_result == 0 && |
209 | | (grandparent_index_ == grandparents.size() - 1 || |
210 | | sstableKeyCompare( |
211 | | ucmp, ikey, grandparents[grandparent_index_ + 1]->smallest) < 0))); |
212 | | assert(sstableKeyCompare(ucmp, ikey, |
213 | | grandparents[grandparent_index_]->smallest) >= 0); |
214 | | #endif |
215 | 0 | overlapped_bytes += grandparents[grandparent_index_]->fd.GetFileSize(); |
216 | | |
217 | | // go backwards to find all overlapped files, one key can overlap multiple |
218 | | // files. In the following example, if the current output key is `c`, and one |
219 | | // compaction file was cut before `c`, current `c` can overlap with 3 files: |
220 | | // [a b] [c... |
221 | | // [b, b] [c, c] [c, c] [c, d] |
222 | 0 | for (int64_t i = static_cast<int64_t>(grandparent_index_) - 1; |
223 | 0 | i >= 0 && sstableKeyCompare(ucmp, ikey, grandparents[i]->largest) == 0; |
224 | 0 | i--) { |
225 | 0 | overlapped_bytes += grandparents[i]->fd.GetFileSize(); |
226 | 0 | } |
227 | |
|
228 | 0 | return overlapped_bytes; |
229 | 0 | } |
230 | | |
231 | 1.48k | bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) { |
232 | 1.48k | assert(c_iter.Valid()); |
233 | 1.48k | const Slice& internal_key = c_iter.key(); |
234 | | #ifndef NDEBUG |
235 | | bool should_stop = false; |
236 | | std::pair<bool*, const Slice> p{&should_stop, internal_key}; |
237 | | TEST_SYNC_POINT_CALLBACK( |
238 | | "CompactionOutputs::ShouldStopBefore::manual_decision", (void*)&p); |
239 | | if (should_stop) { |
240 | | return true; |
241 | | } |
242 | | #endif // NDEBUG |
243 | 1.48k | const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_; |
244 | 1.48k | const InternalKeyComparator* icmp = |
245 | 1.48k | &compaction_->column_family_data()->internal_comparator(); |
246 | 1.48k | size_t num_grandparent_boundaries_crossed = 0; |
247 | 1.48k | bool should_stop_for_ttl = false; |
248 | | // Always update grandparent information like overlapped file number, size |
249 | | // etc., and TTL states. |
250 | | // If compaction_->output_level() == 0, there is no need to update grandparent |
251 | | // info, and that `grandparent` should be empty. |
252 | 1.48k | if (compaction_->output_level() > 0) { |
253 | 1.05k | num_grandparent_boundaries_crossed = |
254 | 1.05k | UpdateGrandparentBoundaryInfo(internal_key); |
255 | 1.05k | should_stop_for_ttl = UpdateFilesToCutForTTLStates(internal_key); |
256 | 1.05k | } |
257 | | |
258 | 1.48k | if (!HasBuilder()) { |
259 | 1.29k | return false; |
260 | 1.29k | } |
261 | | |
262 | 187 | if (should_stop_for_ttl) { |
263 | 0 | return true; |
264 | 0 | } |
265 | | |
266 | | // If there's user defined partitioner, check that first |
267 | 187 | if (partitioner_ && partitioner_->ShouldPartition(PartitionerRequest( |
268 | 0 | last_key_for_partitioner_, c_iter.user_key(), |
269 | 0 | current_output_file_size_)) == kRequired) { |
270 | 0 | return true; |
271 | 0 | } |
272 | | |
273 | | // files output to Level 0 won't be split |
274 | 187 | if (compaction_->output_level() == 0) { |
275 | 47 | return false; |
276 | 47 | } |
277 | | |
278 | | // reach the max file size |
279 | 140 | if (current_output_file_size_ >= compaction_->max_output_file_size()) { |
280 | 0 | return true; |
281 | 0 | } |
282 | | |
283 | | // Check if it needs to split for RoundRobin |
284 | | // Invalid local_output_split_key indicates that we do not need to split |
285 | 140 | if (local_output_split_key_ != nullptr && !is_split_) { |
286 | | // Split occurs when the next key is larger than/equal to the cursor |
287 | 0 | if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) { |
288 | 0 | is_split_ = true; |
289 | 0 | return true; |
290 | 0 | } |
291 | 0 | } |
292 | | |
293 | | // only check if the current key is going to cross the grandparents file |
294 | | // boundary (either the file beginning or ending). |
295 | 140 | if (num_grandparent_boundaries_crossed > 0) { |
296 | | // Cut the file before the current key if the size of the current output |
297 | | // file + its overlapped grandparent files is bigger than |
298 | | // max_compaction_bytes. Which is to prevent future bigger than |
299 | | // max_compaction_bytes compaction from the current output level. |
300 | 0 | if (grandparent_overlapped_bytes_ + current_output_file_size_ > |
301 | 0 | compaction_->max_compaction_bytes()) { |
302 | 0 | return true; |
303 | 0 | } |
304 | | |
305 | | // Cut the file if including the key is going to add a skippable file on |
306 | | // the grandparent level AND its size is reasonably big (1/8 of target file |
307 | | // size). For example, if it's compacting the files L0 + L1: |
308 | | // L0: [1, 21] |
309 | | // L1: [3, 23] |
310 | | // L2: [2, 4] [11, 15] [22, 24] |
311 | | // Without this break, it will output as: |
312 | | // L1: [1,3, 21,23] |
313 | | // With this break, it will output as (assuming [11, 15] at L2 is bigger |
314 | | // than 1/8 of target size): |
315 | | // L1: [1,3] [21,23] |
316 | | // Then for the future compactions, [11,15] won't be included. |
317 | | // For random datasets (either evenly distributed or skewed), it rarely |
318 | | // triggers this condition, but if the user is adding 2 different datasets |
319 | | // without any overlap, it may likely happen. |
320 | | // More details, check PR #1963 |
321 | 0 | const size_t num_skippable_boundaries_crossed = |
322 | 0 | being_grandparent_gap_ ? 2 : 3; |
323 | 0 | if (compaction_->immutable_options()->compaction_style == |
324 | 0 | kCompactionStyleLevel && |
325 | 0 | num_grandparent_boundaries_crossed >= |
326 | 0 | num_skippable_boundaries_crossed && |
327 | 0 | grandparent_overlapped_bytes_ - previous_overlapped_bytes > |
328 | 0 | compaction_->target_output_file_size() / 8) { |
329 | 0 | return true; |
330 | 0 | } |
331 | | |
332 | | // Pre-cut the output file if it's reaching a certain size AND it's at the |
333 | | // boundary of a grandparent file. It can reduce the future compaction size, |
334 | | // the cost is having smaller files. |
335 | | // The pre-cut size threshold is based on how many grandparent boundaries |
336 | | // it has seen before. Basically, if it has seen no boundary at all, then it |
337 | | // will pre-cut at 50% target file size. Every boundary it has seen |
338 | | // increases the threshold by 5%, max at 90%, which it will always cut. |
339 | | // The idea is based on if it has seen more boundaries before, it will more |
340 | | // likely to see another boundary (file cutting opportunity) before the |
341 | | // target file size. The test shows it can generate larger files than a |
342 | | // static threshold like 75% and has a similar write amplification |
343 | | // improvement. |
344 | 0 | if (compaction_->immutable_options()->compaction_style == |
345 | 0 | kCompactionStyleLevel && |
346 | 0 | current_output_file_size_ >= |
347 | 0 | ((compaction_->target_output_file_size() + 99) / 100) * |
348 | 0 | (50 + std::min(grandparent_boundary_switched_num_ * 5, |
349 | 0 | size_t{40}))) { |
350 | 0 | return true; |
351 | 0 | } |
352 | 0 | } |
353 | | |
354 | 140 | return false; |
355 | 140 | } |
356 | | |
357 | | Status CompactionOutputs::AddToOutput( |
358 | | const CompactionIterator& c_iter, |
359 | | const CompactionFileOpenFunc& open_file_func, |
360 | 1.48k | const CompactionFileCloseFunc& close_file_func) { |
361 | 1.48k | Status s; |
362 | 1.48k | bool is_range_del = c_iter.IsDeleteRangeSentinelKey(); |
363 | 1.48k | if (is_range_del && compaction_->bottommost_level()) { |
364 | | // We don't consider range tombstone for bottommost level since: |
365 | | // 1. there is no grandparent and hence no overlap to consider |
366 | | // 2. range tombstone may be dropped at bottommost level. |
367 | 0 | return s; |
368 | 0 | } |
369 | 1.48k | const Slice& key = c_iter.key(); |
370 | 1.48k | if (ShouldStopBefore(c_iter) && HasBuilder()) { |
371 | 0 | s = close_file_func(*this, c_iter.InputStatus(), key); |
372 | 0 | if (!s.ok()) { |
373 | 0 | return s; |
374 | 0 | } |
375 | | // reset grandparent information |
376 | 0 | grandparent_boundary_switched_num_ = 0; |
377 | 0 | grandparent_overlapped_bytes_ = |
378 | 0 | GetCurrentKeyGrandparentOverlappedBytes(key); |
379 | 0 | if (UNLIKELY(is_range_del)) { |
380 | | // lower bound for this new output file, this is needed as the lower bound |
381 | | // does not come from the smallest point key in this case. |
382 | 0 | range_tombstone_lower_bound_.DecodeFrom(key); |
383 | 0 | } else { |
384 | 0 | range_tombstone_lower_bound_.Clear(); |
385 | 0 | } |
386 | 0 | } |
387 | | |
388 | | // Open output file if necessary |
389 | 1.48k | if (!HasBuilder()) { |
390 | 1.29k | s = open_file_func(*this); |
391 | 1.29k | if (!s.ok()) { |
392 | 0 | return s; |
393 | 0 | } |
394 | 1.29k | } |
395 | | |
396 | | // c_iter may emit range deletion keys, so update `last_key_for_partitioner_` |
397 | | // here before returning below when `is_range_del` is true |
398 | 1.48k | if (partitioner_) { |
399 | 0 | last_key_for_partitioner_.assign(c_iter.user_key().data_, |
400 | 0 | c_iter.user_key().size_); |
401 | 0 | } |
402 | | |
403 | 1.48k | if (UNLIKELY(is_range_del)) { |
404 | 0 | return s; |
405 | 0 | } |
406 | | |
407 | 1.48k | assert(builder_ != nullptr); |
408 | 1.48k | const Slice& value = c_iter.value(); |
409 | 1.48k | s = current_output().validator.Add(key, value); |
410 | 1.48k | if (!s.ok()) { |
411 | 0 | return s; |
412 | 0 | } |
413 | 1.48k | builder_->Add(key, value); |
414 | | |
415 | 1.48k | stats_.num_output_records++; |
416 | 1.48k | current_output_file_size_ = builder_->EstimatedFileSize(); |
417 | | |
418 | 1.48k | if (blob_garbage_meter_) { |
419 | 0 | s = blob_garbage_meter_->ProcessOutFlow(key, value); |
420 | 0 | } |
421 | | |
422 | 1.48k | if (!s.ok()) { |
423 | 0 | return s; |
424 | 0 | } |
425 | | |
426 | 1.48k | const ParsedInternalKey& ikey = c_iter.ikey(); |
427 | 1.48k | if (ikey.type == kTypeValuePreferredSeqno) { |
428 | 0 | SequenceNumber preferred_seqno = ParsePackedValueForSeqno(value); |
429 | 0 | smallest_preferred_seqno_ = |
430 | 0 | std::min(smallest_preferred_seqno_, preferred_seqno); |
431 | 0 | } |
432 | 1.48k | s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, |
433 | 1.48k | ikey.type); |
434 | | |
435 | 1.48k | return s; |
436 | 1.48k | } |
437 | | |
438 | | namespace { |
439 | | void SetMaxSeqAndTs(InternalKey& internal_key, const Slice& user_key, |
440 | 0 | const size_t ts_sz) { |
441 | 0 | if (ts_sz) { |
442 | 0 | static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; |
443 | 0 | if (ts_sz <= strlen(kTsMax)) { |
444 | 0 | internal_key = InternalKey(user_key, kMaxSequenceNumber, |
445 | 0 | kTypeRangeDeletion, Slice(kTsMax, ts_sz)); |
446 | 0 | } else { |
447 | 0 | internal_key = |
448 | 0 | InternalKey(user_key, kMaxSequenceNumber, kTypeRangeDeletion, |
449 | 0 | std::string(ts_sz, '\xff')); |
450 | 0 | } |
451 | 0 | } else { |
452 | 0 | internal_key.Set(user_key, kMaxSequenceNumber, kTypeRangeDeletion); |
453 | 0 | } |
454 | 0 | } |
455 | | } // namespace |
456 | | |
457 | | Status CompactionOutputs::AddRangeDels( |
458 | | const Slice* comp_start_user_key, const Slice* comp_end_user_key, |
459 | | CompactionIterationStats& range_del_out_stats, bool bottommost_level, |
460 | | const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, |
461 | 0 | const Slice& next_table_min_key, const std::string& full_history_ts_low) { |
462 | | // The following example does not happen since |
463 | | // CompactionOutput::ShouldStopBefore() always return false for the first |
464 | | // point key. But we should consider removing this dependency. Suppose for the |
465 | | // first compaction output file, |
466 | | // - next_table_min_key.user_key == comp_start_user_key |
467 | | // - no point key is in the output file |
468 | | // - there is a range tombstone @seqno to be added that covers |
469 | | // comp_start_user_key |
470 | | // Then meta.smallest will be set to comp_start_user_key@seqno |
471 | | // and meta.largest will be set to comp_start_user_key@kMaxSequenceNumber |
472 | | // which violates the assumption that meta.smallest should be <= meta.largest. |
473 | 0 | assert(HasRangeDel()); |
474 | 0 | FileMetaData& meta = current_output().meta; |
475 | 0 | const Comparator* ucmp = icmp.user_comparator(); |
476 | 0 | InternalKey lower_bound_buf, upper_bound_buf; |
477 | 0 | Slice lower_bound_guard, upper_bound_guard; |
478 | 0 | std::string smallest_user_key; |
479 | 0 | const Slice *lower_bound, *upper_bound; |
480 | | |
481 | | // We first determine the internal key lower_bound and upper_bound for |
482 | | // this output file. All and only range tombstones that overlap with |
483 | | // [lower_bound, upper_bound] should be added to this file. File |
484 | | // boundaries (meta.smallest/largest) should be updated accordingly when |
485 | | // extended by range tombstones. |
486 | 0 | size_t output_size = outputs_.size(); |
487 | 0 | if (output_size == 1) { |
488 | | // This is the first file in the subcompaction. |
489 | | // |
490 | | // When outputting a range tombstone that spans a subcompaction boundary, |
491 | | // the files on either side of that boundary need to include that |
492 | | // boundary's user key. Otherwise, the spanning range tombstone would lose |
493 | | // coverage. |
494 | | // |
495 | | // To achieve this while preventing files from overlapping in internal key |
496 | | // (an LSM invariant violation), we allow the earlier file to include the |
497 | | // boundary user key up to `kMaxSequenceNumber,kTypeRangeDeletion`. The |
498 | | // later file can begin at the boundary user key at the newest key version |
499 | | // it contains. At this point that version number is unknown since we have |
500 | | // not processed the range tombstones yet, so permit any version. Same story |
501 | | // applies to timestamp, and a non-nullptr `comp_start_user_key` should have |
502 | | // `kMaxTs` here, which similarly permits any timestamp. |
503 | 0 | if (comp_start_user_key) { |
504 | 0 | lower_bound_buf.Set(*comp_start_user_key, kMaxSequenceNumber, |
505 | 0 | kTypeRangeDeletion); |
506 | 0 | lower_bound_guard = lower_bound_buf.Encode(); |
507 | 0 | lower_bound = &lower_bound_guard; |
508 | 0 | } else { |
509 | 0 | lower_bound = nullptr; |
510 | 0 | } |
511 | 0 | } else { |
512 | | // For subsequent output tables, only include range tombstones from min |
513 | | // key onwards since the previous file was extended to contain range |
514 | | // tombstones falling before min key. |
515 | 0 | if (range_tombstone_lower_bound_.size() > 0) { |
516 | 0 | assert(meta.smallest.size() == 0 || |
517 | 0 | icmp.Compare(range_tombstone_lower_bound_, meta.smallest) < 0); |
518 | 0 | lower_bound_guard = range_tombstone_lower_bound_.Encode(); |
519 | 0 | } else { |
520 | 0 | assert(meta.smallest.size() > 0); |
521 | 0 | lower_bound_guard = meta.smallest.Encode(); |
522 | 0 | } |
523 | 0 | lower_bound = &lower_bound_guard; |
524 | 0 | } |
525 | |
|
526 | 0 | const size_t ts_sz = ucmp->timestamp_size(); |
527 | 0 | if (next_table_min_key.empty()) { |
528 | | // Last file of the subcompaction. |
529 | 0 | if (comp_end_user_key) { |
530 | 0 | upper_bound_buf.Set(*comp_end_user_key, kMaxSequenceNumber, |
531 | 0 | kTypeRangeDeletion); |
532 | 0 | upper_bound_guard = upper_bound_buf.Encode(); |
533 | 0 | upper_bound = &upper_bound_guard; |
534 | 0 | } else { |
535 | 0 | upper_bound = nullptr; |
536 | 0 | } |
537 | 0 | } else { |
538 | | // There is another file coming whose coverage will begin at |
539 | | // `next_table_min_key`. The current file needs to extend range tombstone |
540 | | // coverage through its own keys (through `meta.largest`) and through user |
541 | | // keys preceding `next_table_min_key`'s user key. |
542 | 0 | ParsedInternalKey next_table_min_key_parsed; |
543 | 0 | ParseInternalKey(next_table_min_key, &next_table_min_key_parsed, |
544 | 0 | false /* log_err_key */) |
545 | 0 | .PermitUncheckedError(); |
546 | 0 | assert(next_table_min_key_parsed.sequence < kMaxSequenceNumber); |
547 | 0 | assert(meta.largest.size() == 0 || |
548 | 0 | icmp.Compare(meta.largest.Encode(), next_table_min_key) < 0); |
549 | 0 | assert(!lower_bound || icmp.Compare(*lower_bound, next_table_min_key) <= 0); |
550 | 0 | if (meta.largest.size() > 0 && |
551 | 0 | ucmp->EqualWithoutTimestamp(meta.largest.user_key(), |
552 | 0 | next_table_min_key_parsed.user_key)) { |
553 | | // Caution: this assumes meta.largest.Encode() lives longer than |
554 | | // upper_bound, which is only true if meta.largest is never updated. |
555 | | // This just happens to be the case here since meta.largest serves |
556 | | // as the upper_bound. |
557 | 0 | upper_bound_guard = meta.largest.Encode(); |
558 | 0 | } else { |
559 | 0 | SetMaxSeqAndTs(upper_bound_buf, next_table_min_key_parsed.user_key, |
560 | 0 | ts_sz); |
561 | 0 | upper_bound_guard = upper_bound_buf.Encode(); |
562 | 0 | } |
563 | 0 | upper_bound = &upper_bound_guard; |
564 | 0 | } |
565 | 0 | if (lower_bound && upper_bound && |
566 | 0 | icmp.Compare(*lower_bound, *upper_bound) > 0) { |
567 | 0 | assert(meta.smallest.size() == 0 && |
568 | 0 | ucmp->EqualWithoutTimestamp(ExtractUserKey(*lower_bound), |
569 | 0 | ExtractUserKey(*upper_bound))); |
570 | | // This can only happen when lower_bound have the same user key as |
571 | | // next_table_min_key and that there is no point key in the current |
572 | | // compaction output file. |
573 | 0 | return Status::OK(); |
574 | 0 | } |
575 | | // The end key of the subcompaction must be bigger or equal to the upper |
576 | | // bound. If the end of subcompaction is null or the upper bound is null, |
577 | | // it means that this file is the last file in the compaction. So there |
578 | | // will be no overlapping between this file and others. |
579 | 0 | assert(comp_end_user_key == nullptr || upper_bound == nullptr || |
580 | 0 | ucmp->CompareWithoutTimestamp(ExtractUserKey(*upper_bound), |
581 | 0 | *comp_end_user_key) <= 0); |
582 | 0 | auto it = range_del_agg_->NewIterator(lower_bound, upper_bound); |
583 | 0 | Slice last_tombstone_start_user_key{}; |
584 | 0 | bool reached_lower_bound = false; |
585 | 0 | const ReadOptions read_options(Env::IOActivity::kCompaction); |
586 | 0 | for (it->SeekToFirst(); it->Valid(); it->Next()) { |
587 | 0 | auto tombstone = it->Tombstone(); |
588 | 0 | auto kv = tombstone.Serialize(); |
589 | 0 | InternalKey tombstone_end = tombstone.SerializeEndKey(); |
590 | | // TODO: the underlying iterator should support clamping the bounds. |
591 | | // tombstone_end.Encode is of form user_key@kMaxSeqno |
592 | | // if it is equal to lower_bound, there is no need to include |
593 | | // such range tombstone. |
594 | 0 | if (!reached_lower_bound && lower_bound && |
595 | 0 | icmp.Compare(tombstone_end.Encode(), *lower_bound) <= 0) { |
596 | 0 | continue; |
597 | 0 | } |
598 | 0 | assert(!lower_bound || |
599 | 0 | icmp.Compare(*lower_bound, tombstone_end.Encode()) <= 0); |
600 | 0 | reached_lower_bound = true; |
601 | | |
602 | | // Garbage collection for range tombstones. |
603 | | // If user-defined timestamp is enabled, range tombstones are dropped if |
604 | | // they are at bottommost_level, below full_history_ts_low and not visible |
605 | | // in any snapshot. trim_ts_ is passed to the constructor for |
606 | | // range_del_agg_, and range_del_agg_ internally drops tombstones above |
607 | | // trim_ts_. |
608 | 0 | bool consider_drop = |
609 | 0 | tombstone.seq_ <= earliest_snapshot && |
610 | 0 | (ts_sz == 0 || |
611 | 0 | (!full_history_ts_low.empty() && |
612 | 0 | ucmp->CompareTimestamp(tombstone.ts_, full_history_ts_low) < 0)); |
613 | 0 | if (consider_drop && bottommost_level) { |
614 | | // TODO(andrewkr): tombstones that span multiple output files are |
615 | | // counted for each compaction output file, so lots of double |
616 | | // counting. |
617 | 0 | range_del_out_stats.num_range_del_drop_obsolete++; |
618 | 0 | range_del_out_stats.num_record_drop_obsolete++; |
619 | 0 | continue; |
620 | 0 | } |
621 | | |
622 | 0 | assert(lower_bound == nullptr || |
623 | 0 | ucmp->CompareWithoutTimestamp(ExtractUserKey(*lower_bound), |
624 | 0 | kv.second) < 0); |
625 | 0 | InternalKey tombstone_start = kv.first; |
626 | 0 | if (lower_bound && |
627 | 0 | ucmp->CompareWithoutTimestamp(tombstone_start.user_key(), |
628 | 0 | ExtractUserKey(*lower_bound)) < 0) { |
629 | | // This just updates the non-timestamp portion of `tombstone_start`'s user |
630 | | // key. Ideally there would be a simpler API usage |
631 | 0 | ParsedInternalKey tombstone_start_parsed; |
632 | 0 | ParseInternalKey(tombstone_start.Encode(), &tombstone_start_parsed, |
633 | 0 | false /* log_err_key */) |
634 | 0 | .PermitUncheckedError(); |
635 | | // timestamp should be from where sequence number is from, which is from |
636 | | // tombstone in this case |
637 | 0 | std::string ts = |
638 | 0 | tombstone_start_parsed.GetTimestamp(ucmp->timestamp_size()) |
639 | 0 | .ToString(); |
640 | 0 | tombstone_start_parsed.user_key = ExtractUserKey(*lower_bound); |
641 | 0 | tombstone_start.SetFrom(tombstone_start_parsed, ts); |
642 | 0 | } |
643 | 0 | if (upper_bound != nullptr && |
644 | 0 | icmp.Compare(*upper_bound, tombstone_start.Encode()) < 0) { |
645 | 0 | break; |
646 | 0 | } |
647 | 0 | if (lower_bound && |
648 | 0 | icmp.Compare(tombstone_start.Encode(), *lower_bound) < 0) { |
649 | 0 | tombstone_start.DecodeFrom(*lower_bound); |
650 | 0 | } |
651 | 0 | if (upper_bound && icmp.Compare(*upper_bound, tombstone_end.Encode()) < 0) { |
652 | 0 | tombstone_end.DecodeFrom(*upper_bound); |
653 | 0 | } |
654 | 0 | if (consider_drop && compaction_->KeyRangeNotExistsBeyondOutputLevel( |
655 | 0 | tombstone_start.user_key(), |
656 | 0 | tombstone_end.user_key(), &level_ptrs_)) { |
657 | 0 | range_del_out_stats.num_range_del_drop_obsolete++; |
658 | 0 | range_del_out_stats.num_record_drop_obsolete++; |
659 | 0 | continue; |
660 | 0 | } |
661 | | // Here we show that *only* range tombstones that overlap with |
662 | | // [lower_bound, upper_bound] are added to the current file, and |
663 | | // sanity checking invariants that should hold: |
664 | | // - [tombstone_start, tombstone_end] overlaps with [lower_bound, |
665 | | // upper_bound] |
666 | | // - meta.smallest <= meta.largest |
667 | | // Corresponding assertions are made, the proof is broken is any of them |
668 | | // fails. |
669 | | // TODO: show that *all* range tombstones that overlap with |
670 | | // [lower_bound, upper_bound] are added. |
671 | | // TODO: some invariant about boundaries are correctly updated. |
672 | | // |
673 | | // Note that `tombstone_start` is updated in the if condition above, we use |
674 | | // tombstone_start to refer to its initial value, i.e., |
675 | | // it->Tombstone().first, and use tombstone_start* to refer to its value |
676 | | // after the update. |
677 | | // |
678 | | // To show [lower_bound, upper_bound] overlaps with [tombstone_start, |
679 | | // tombstone_end]: |
680 | | // lower_bound <= upper_bound from the if condition right after all |
681 | | // bounds are initialized. We assume each tombstone fragment has |
682 | | // start_key.user_key < end_key.user_key, so |
683 | | // tombstone_start < tombstone_end by |
684 | | // FragmentedTombstoneIterator::Tombstone(). So these two ranges are both |
685 | | // non-emtpy. The flag `reached_lower_bound` and the if logic before it |
686 | | // ensures lower_bound <= tombstone_end. tombstone_start is only updated |
687 | | // if it has a smaller user_key than lower_bound user_key, so |
688 | | // tombstone_start <= tombstone_start*. The above if condition implies |
689 | | // tombstone_start* <= upper_bound. So we have |
690 | | // tombstone_start <= upper_bound and lower_bound <= tombstone_end |
691 | | // and the two ranges overlap. |
692 | | // |
693 | | // To show meta.smallest <= meta.largest: |
694 | | // From the implementation of UpdateBoundariesForRange(), it suffices to |
695 | | // prove that when it is first called in this function, its parameters |
696 | | // satisfy `start <= end`, where start = max(tombstone_start*, lower_bound) |
697 | | // and end = min(tombstone_end, upper_bound). From the above proof we have |
698 | | // lower_bound <= tombstone_end and lower_bound <= upper_bound. We only need |
699 | | // to show that tombstone_start* <= min(tombstone_end, upper_bound). |
700 | | // Note that tombstone_start*.user_key = max(tombstone_start.user_key, |
701 | | // lower_bound.user_key). Assuming tombstone_end always has |
702 | | // kMaxSequenceNumber and lower_bound.seqno < kMaxSequenceNumber. |
703 | | // Since lower_bound <= tombstone_end and lower_bound.seqno < |
704 | | // tombstone_end.seqno (in absolute number order, not internal key order), |
705 | | // lower_bound.user_key < tombstone_end.user_key. |
706 | | // Since lower_bound.user_key < tombstone_end.user_key and |
707 | | // tombstone_start.user_key < tombstone_end.user_key, tombstone_start* < |
708 | | // tombstone_end. Since tombstone_start* <= upper_bound from the above proof |
709 | | // and tombstone_start* < tombstone_end, tombstone_start* <= |
710 | | // min(tombstone_end, upper_bound), so the two ranges overlap. |
711 | | |
712 | | // Range tombstone is not supported by output validator yet. |
713 | 0 | builder_->Add(kv.first.Encode(), kv.second); |
714 | 0 | assert(icmp.Compare(tombstone_start, tombstone_end) <= 0); |
715 | 0 | meta.UpdateBoundariesForRange(tombstone_start, tombstone_end, |
716 | 0 | tombstone.seq_, icmp); |
717 | 0 | if (!bottommost_level) { |
718 | 0 | bool start_user_key_changed = |
719 | 0 | last_tombstone_start_user_key.empty() || |
720 | 0 | ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key, |
721 | 0 | it->start_key()) < 0; |
722 | 0 | last_tombstone_start_user_key = it->start_key(); |
723 | 0 | if (start_user_key_changed) { |
724 | | // If tombstone_start >= tombstone_end, then either no key range is |
725 | | // covered, or that they have the same user key. If they have the same |
726 | | // user key, then the internal key range should only be within this |
727 | | // level, and no keys from older levels is covered. |
728 | 0 | if (ucmp->CompareWithoutTimestamp(tombstone_start.user_key(), |
729 | 0 | tombstone_end.user_key()) < 0) { |
730 | 0 | SizeApproximationOptions approx_opts; |
731 | 0 | approx_opts.files_size_error_margin = 0.1; |
732 | 0 | auto approximate_covered_size = |
733 | 0 | compaction_->input_version()->version_set()->ApproximateSize( |
734 | 0 | approx_opts, read_options, compaction_->input_version(), |
735 | 0 | tombstone_start.Encode(), tombstone_end.Encode(), |
736 | 0 | compaction_->output_level() + 1 /* start_level */, |
737 | 0 | -1 /* end_level */, kCompaction); |
738 | 0 | meta.compensated_range_deletion_size += approximate_covered_size; |
739 | 0 | } |
740 | 0 | } |
741 | 0 | } |
742 | 0 | } |
743 | 0 | return Status::OK(); |
744 | 0 | } |
745 | | |
746 | 3.02k | void CompactionOutputs::FillFilesToCutForTtl() { |
747 | 3.02k | if (compaction_->immutable_options()->compaction_style != |
748 | 3.02k | kCompactionStyleLevel || |
749 | 3.02k | compaction_->immutable_options()->compaction_pri != |
750 | 3.02k | kMinOverlappingRatio || |
751 | 3.02k | compaction_->mutable_cf_options()->ttl == 0 || |
752 | 3.02k | compaction_->num_input_levels() < 2 || compaction_->bottommost_level()) { |
753 | 3.02k | return; |
754 | 3.02k | } |
755 | | |
756 | | // We define new file with the oldest ancestor time to be younger than 1/4 |
757 | | // TTL, and an old one to be older than 1/2 TTL time. |
758 | 0 | int64_t temp_current_time; |
759 | 0 | auto get_time_status = |
760 | 0 | compaction_->immutable_options()->clock->GetCurrentTime( |
761 | 0 | &temp_current_time); |
762 | 0 | if (!get_time_status.ok()) { |
763 | 0 | return; |
764 | 0 | } |
765 | | |
766 | 0 | auto current_time = static_cast<uint64_t>(temp_current_time); |
767 | 0 | if (current_time < compaction_->mutable_cf_options()->ttl) { |
768 | 0 | return; |
769 | 0 | } |
770 | | |
771 | 0 | uint64_t old_age_thres = |
772 | 0 | current_time - compaction_->mutable_cf_options()->ttl / 2; |
773 | 0 | const std::vector<FileMetaData*>& olevel = |
774 | 0 | *(compaction_->inputs(compaction_->num_input_levels() - 1)); |
775 | 0 | for (FileMetaData* file : olevel) { |
776 | | // Worth filtering out by start and end? |
777 | 0 | uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime(); |
778 | | // We put old files if they are not too small to prevent a flood |
779 | | // of small files. |
780 | 0 | if (oldest_ancester_time < old_age_thres && |
781 | 0 | file->fd.GetFileSize() > |
782 | 0 | compaction_->mutable_cf_options()->target_file_size_base / 2) { |
783 | 0 | files_to_cut_for_ttl_.push_back(file); |
784 | 0 | } |
785 | 0 | } |
786 | 0 | } |
787 | | |
788 | | CompactionOutputs::CompactionOutputs(const Compaction* compaction, |
789 | | const bool is_penultimate_level) |
790 | 4.33k | : compaction_(compaction), is_penultimate_level_(is_penultimate_level) { |
791 | 4.33k | partitioner_ = compaction->output_level() == 0 |
792 | 4.33k | ? nullptr |
793 | 4.33k | : compaction->CreateSstPartitioner(); |
794 | | |
795 | 4.33k | if (compaction->output_level() != 0) { |
796 | 3.02k | FillFilesToCutForTtl(); |
797 | 3.02k | } |
798 | | |
799 | 4.33k | level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); |
800 | 4.33k | } |
801 | | |
802 | | } // namespace ROCKSDB_NAMESPACE |