/src/rocksdb/db/compaction/compaction_outputs.cc
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | // |
7 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
8 | | // Use of this source code is governed by a BSD-style license that can be |
9 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
10 | | |
11 | | #include "db/compaction/compaction_outputs.h" |
12 | | |
13 | | #include "db/builder.h" |
14 | | |
15 | | namespace ROCKSDB_NAMESPACE { |
16 | | |
17 | 1.84k | void CompactionOutputs::NewBuilder(const TableBuilderOptions& tboptions) { |
18 | 1.84k | builder_.reset(NewTableBuilder(tboptions, file_writer_.get())); |
19 | 1.84k | } |
20 | | |
21 | | Status CompactionOutputs::Finish( |
22 | | const Status& intput_status, |
23 | 1.84k | const SeqnoToTimeMapping& seqno_to_time_mapping) { |
24 | 1.84k | FileMetaData* meta = GetMetaData(); |
25 | 1.84k | assert(meta != nullptr); |
26 | 1.84k | Status s = intput_status; |
27 | 1.84k | if (s.ok()) { |
28 | 1.27k | SeqnoToTimeMapping relevant_mapping; |
29 | 1.27k | relevant_mapping.CopyFromSeqnoRange( |
30 | 1.27k | seqno_to_time_mapping, |
31 | 1.27k | std::min(smallest_preferred_seqno_, meta->fd.smallest_seqno), |
32 | 1.27k | meta->fd.largest_seqno); |
33 | 1.27k | relevant_mapping.SetCapacity(kMaxSeqnoTimePairsPerSST); |
34 | 1.27k | builder_->SetSeqnoTimeTableProperties(relevant_mapping, |
35 | 1.27k | meta->oldest_ancester_time); |
36 | 1.27k | s = builder_->Finish(); |
37 | | |
38 | 1.27k | } else { |
39 | 573 | builder_->Abandon(); |
40 | 573 | } |
41 | 1.84k | Status io_s = builder_->io_status(); |
42 | 1.84k | if (s.ok()) { |
43 | 1.27k | s = io_s; |
44 | 1.27k | } else { |
45 | 573 | io_s.PermitUncheckedError(); |
46 | 573 | } |
47 | 1.84k | const uint64_t current_bytes = builder_->FileSize(); |
48 | 1.84k | if (s.ok()) { |
49 | 1.27k | meta->fd.file_size = current_bytes; |
50 | 1.27k | meta->tail_size = builder_->GetTailSize(); |
51 | 1.27k | meta->marked_for_compaction = builder_->NeedCompact(); |
52 | 1.27k | const TableProperties& tp = builder_->GetTableProperties(); |
53 | 1.27k | meta->user_defined_timestamps_persisted = |
54 | 1.27k | static_cast<bool>(tp.user_defined_timestamps_persisted); |
55 | 1.27k | ExtractTimestampFromTableProperties(tp, meta); |
56 | 1.27k | } |
57 | 1.84k | current_output().finished = true; |
58 | 1.84k | stats_.bytes_written += current_bytes; |
59 | 1.84k | stats_.bytes_written_pre_comp += builder_->PreCompressionSize(); |
60 | 1.84k | stats_.num_output_files = static_cast<int>(outputs_.size()); |
61 | 1.84k | worker_cpu_micros_ += builder_->GetWorkerCPUMicros(); |
62 | | |
63 | 1.84k | return s; |
64 | 1.84k | } |
65 | | |
66 | | IOStatus CompactionOutputs::WriterSyncClose(const Status& input_status, |
67 | | SystemClock* clock, |
68 | | Statistics* statistics, |
69 | 1.84k | bool use_fsync) { |
70 | 1.84k | IOStatus io_s; |
71 | 1.84k | IOOptions opts; |
72 | 1.84k | io_s = WritableFileWriter::PrepareIOOptions( |
73 | 1.84k | WriteOptions(Env::IOActivity::kCompaction), opts); |
74 | 1.84k | if (input_status.ok() && io_s.ok()) { |
75 | 1.27k | StopWatch sw(clock, statistics, COMPACTION_OUTFILE_SYNC_MICROS); |
76 | 1.27k | io_s = file_writer_->Sync(opts, use_fsync); |
77 | 1.27k | } |
78 | 1.84k | if (input_status.ok() && io_s.ok()) { |
79 | 1.27k | io_s = file_writer_->Close(opts); |
80 | 1.27k | } |
81 | | |
82 | 1.84k | if (input_status.ok() && io_s.ok()) { |
83 | 1.27k | FileMetaData* meta = GetMetaData(); |
84 | 1.27k | meta->file_checksum = file_writer_->GetFileChecksum(); |
85 | 1.27k | meta->file_checksum_func_name = file_writer_->GetFileChecksumFuncName(); |
86 | 1.27k | } |
87 | | |
88 | 1.84k | file_writer_.reset(); |
89 | | |
90 | 1.84k | return io_s; |
91 | 1.84k | } |
92 | | |
93 | | bool CompactionOutputs::UpdateFilesToCutForTTLStates( |
94 | 1.83k | const Slice& internal_key) { |
95 | 1.83k | if (!files_to_cut_for_ttl_.empty()) { |
96 | 0 | const InternalKeyComparator* icmp = |
97 | 0 | &compaction_->column_family_data()->internal_comparator(); |
98 | 0 | if (cur_files_to_cut_for_ttl_ != -1) { |
99 | | // Previous key is inside the range of a file |
100 | 0 | if (icmp->Compare(internal_key, |
101 | 0 | files_to_cut_for_ttl_[cur_files_to_cut_for_ttl_] |
102 | 0 | ->largest.Encode()) > 0) { |
103 | 0 | next_files_to_cut_for_ttl_ = cur_files_to_cut_for_ttl_ + 1; |
104 | 0 | cur_files_to_cut_for_ttl_ = -1; |
105 | 0 | return true; |
106 | 0 | } |
107 | 0 | } else { |
108 | | // Look for the key position |
109 | 0 | while (next_files_to_cut_for_ttl_ < |
110 | 0 | static_cast<int>(files_to_cut_for_ttl_.size())) { |
111 | 0 | if (icmp->Compare(internal_key, |
112 | 0 | files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] |
113 | 0 | ->smallest.Encode()) >= 0) { |
114 | 0 | if (icmp->Compare(internal_key, |
115 | 0 | files_to_cut_for_ttl_[next_files_to_cut_for_ttl_] |
116 | 0 | ->largest.Encode()) <= 0) { |
117 | | // With in the current file |
118 | 0 | cur_files_to_cut_for_ttl_ = next_files_to_cut_for_ttl_; |
119 | 0 | return true; |
120 | 0 | } |
121 | | // Beyond the current file |
122 | 0 | next_files_to_cut_for_ttl_++; |
123 | 0 | } else { |
124 | | // Still fall into the gap |
125 | 0 | break; |
126 | 0 | } |
127 | 0 | } |
128 | 0 | } |
129 | 0 | } |
130 | 1.83k | return false; |
131 | 1.83k | } |
132 | | |
133 | | size_t CompactionOutputs::UpdateGrandparentBoundaryInfo( |
134 | 1.83k | const Slice& internal_key) { |
135 | 1.83k | size_t curr_key_boundary_switched_num = 0; |
136 | 1.83k | const std::vector<FileMetaData*>& grandparents = compaction_->grandparents(); |
137 | | |
138 | 1.83k | if (grandparents.empty()) { |
139 | 1.83k | return curr_key_boundary_switched_num; |
140 | 1.83k | } |
141 | 0 | const Comparator* ucmp = compaction_->column_family_data()->user_comparator(); |
142 | | |
143 | | // Move the grandparent_index_ to the file containing the current user_key. |
144 | | // If there are multiple files containing the same user_key, make sure the |
145 | | // index points to the last file containing the key. |
146 | 0 | while (grandparent_index_ < grandparents.size()) { |
147 | 0 | if (being_grandparent_gap_) { |
148 | 0 | if (sstableKeyCompare(ucmp, internal_key, |
149 | 0 | grandparents[grandparent_index_]->smallest) < 0) { |
150 | 0 | break; |
151 | 0 | } |
152 | 0 | if (seen_key_) { |
153 | 0 | curr_key_boundary_switched_num++; |
154 | 0 | grandparent_overlapped_bytes_ += |
155 | 0 | grandparents[grandparent_index_]->fd.GetFileSize(); |
156 | 0 | grandparent_boundary_switched_num_++; |
157 | 0 | } |
158 | 0 | being_grandparent_gap_ = false; |
159 | 0 | } else { |
160 | 0 | int cmp_result = sstableKeyCompare( |
161 | 0 | ucmp, internal_key, grandparents[grandparent_index_]->largest); |
162 | | // If it's same key, make sure grandparent_index_ is pointing to the last |
163 | | // one. |
164 | 0 | if (cmp_result < 0 || |
165 | 0 | (cmp_result == 0 && |
166 | 0 | (grandparent_index_ == grandparents.size() - 1 || |
167 | 0 | sstableKeyCompare(ucmp, internal_key, |
168 | 0 | grandparents[grandparent_index_ + 1]->smallest) < |
169 | 0 | 0))) { |
170 | 0 | break; |
171 | 0 | } |
172 | 0 | if (seen_key_) { |
173 | 0 | curr_key_boundary_switched_num++; |
174 | 0 | grandparent_boundary_switched_num_++; |
175 | 0 | } |
176 | 0 | being_grandparent_gap_ = true; |
177 | 0 | grandparent_index_++; |
178 | 0 | } |
179 | 0 | } |
180 | | |
181 | | // If the first key is in the middle of a grandparent file, adding it to the |
182 | | // overlap |
183 | 0 | if (!seen_key_ && !being_grandparent_gap_) { |
184 | 0 | assert(grandparent_overlapped_bytes_ == 0); |
185 | 0 | grandparent_overlapped_bytes_ = |
186 | 0 | GetCurrentKeyGrandparentOverlappedBytes(internal_key); |
187 | 0 | } |
188 | |
|
189 | 0 | seen_key_ = true; |
190 | 0 | return curr_key_boundary_switched_num; |
191 | 1.83k | } |
192 | | |
193 | | uint64_t CompactionOutputs::GetCurrentKeyGrandparentOverlappedBytes( |
194 | 0 | const Slice& internal_key) const { |
195 | | // no overlap with any grandparent file |
196 | 0 | if (being_grandparent_gap_) { |
197 | 0 | return 0; |
198 | 0 | } |
199 | 0 | uint64_t overlapped_bytes = 0; |
200 | |
|
201 | 0 | const std::vector<FileMetaData*>& grandparents = compaction_->grandparents(); |
202 | 0 | const Comparator* ucmp = compaction_->column_family_data()->user_comparator(); |
203 | 0 | InternalKey ikey; |
204 | 0 | ikey.DecodeFrom(internal_key); |
205 | | #ifndef NDEBUG |
206 | | // make sure the grandparent_index_ is pointing to the last files containing |
207 | | // the current key. |
208 | | int cmp_result = |
209 | | sstableKeyCompare(ucmp, ikey, grandparents[grandparent_index_]->largest); |
210 | | assert( |
211 | | cmp_result < 0 || |
212 | | (cmp_result == 0 && |
213 | | (grandparent_index_ == grandparents.size() - 1 || |
214 | | sstableKeyCompare( |
215 | | ucmp, ikey, grandparents[grandparent_index_ + 1]->smallest) < 0))); |
216 | | assert(sstableKeyCompare(ucmp, ikey, |
217 | | grandparents[grandparent_index_]->smallest) >= 0); |
218 | | #endif |
219 | 0 | overlapped_bytes += grandparents[grandparent_index_]->fd.GetFileSize(); |
220 | | |
221 | | // go backwards to find all overlapped files, one key can overlap multiple |
222 | | // files. In the following example, if the current output key is `c`, and one |
223 | | // compaction file was cut before `c`, current `c` can overlap with 3 files: |
224 | | // [a b] [c... |
225 | | // [b, b] [c, c] [c, c] [c, d] |
226 | 0 | for (int64_t i = static_cast<int64_t>(grandparent_index_) - 1; |
227 | 0 | i >= 0 && sstableKeyCompare(ucmp, ikey, grandparents[i]->largest) == 0; |
228 | 0 | i--) { |
229 | 0 | overlapped_bytes += grandparents[i]->fd.GetFileSize(); |
230 | 0 | } |
231 | |
|
232 | 0 | return overlapped_bytes; |
233 | 0 | } |
234 | | |
235 | 2.17k | bool CompactionOutputs::ShouldStopBefore(const CompactionIterator& c_iter) { |
236 | 2.17k | assert(c_iter.Valid()); |
237 | 2.17k | const Slice& internal_key = c_iter.key(); |
238 | | #ifndef NDEBUG |
239 | | bool should_stop = false; |
240 | | std::pair<bool*, const Slice> p{&should_stop, internal_key}; |
241 | | TEST_SYNC_POINT_CALLBACK( |
242 | | "CompactionOutputs::ShouldStopBefore::manual_decision", (void*)&p); |
243 | | if (should_stop) { |
244 | | return true; |
245 | | } |
246 | | #endif // NDEBUG |
247 | 2.17k | const uint64_t previous_overlapped_bytes = grandparent_overlapped_bytes_; |
248 | 2.17k | const InternalKeyComparator* icmp = |
249 | 2.17k | &compaction_->column_family_data()->internal_comparator(); |
250 | 2.17k | size_t num_grandparent_boundaries_crossed = 0; |
251 | 2.17k | bool should_stop_for_ttl = false; |
252 | | // Always update grandparent information like overlapped file number, size |
253 | | // etc., and TTL states. |
254 | | // If compaction_->output_level() == 0, there is no need to update grandparent |
255 | | // info, and that `grandparent` should be empty. |
256 | 2.17k | if (compaction_->output_level() > 0) { |
257 | 1.83k | num_grandparent_boundaries_crossed = |
258 | 1.83k | UpdateGrandparentBoundaryInfo(internal_key); |
259 | 1.83k | should_stop_for_ttl = UpdateFilesToCutForTTLStates(internal_key); |
260 | 1.83k | } |
261 | | |
262 | 2.17k | if (!HasBuilder()) { |
263 | 1.84k | return false; |
264 | 1.84k | } |
265 | | |
266 | 331 | if (should_stop_for_ttl) { |
267 | 0 | return true; |
268 | 0 | } |
269 | | |
270 | | // If there's user defined partitioner, check that first |
271 | 331 | if (partitioner_ && partitioner_->ShouldPartition(PartitionerRequest( |
272 | 0 | last_key_for_partitioner_, c_iter.user_key(), |
273 | 0 | current_output_file_size_)) == kRequired) { |
274 | 0 | return true; |
275 | 0 | } |
276 | | |
277 | | // files output to Level 0 won't be split |
278 | 331 | if (compaction_->output_level() == 0) { |
279 | 28 | return false; |
280 | 28 | } |
281 | | |
282 | | // reach the max file size |
283 | 303 | uint64_t estimated_file_size = current_output_file_size_; |
284 | 303 | if (compaction_->mutable_cf_options().target_file_size_is_upper_bound) { |
285 | 0 | estimated_file_size += builder_->EstimatedTailSize(); |
286 | 0 | } |
287 | 303 | if (estimated_file_size >= compaction_->max_output_file_size()) { |
288 | 0 | return true; |
289 | 0 | } |
290 | | |
291 | | // Check if it needs to split for RoundRobin |
292 | | // Invalid local_output_split_key indicates that we do not need to split |
293 | 303 | if (local_output_split_key_ != nullptr && !is_split_) { |
294 | | // Split occurs when the next key is larger than/equal to the cursor |
295 | 0 | if (icmp->Compare(internal_key, local_output_split_key_->Encode()) >= 0) { |
296 | 0 | is_split_ = true; |
297 | 0 | return true; |
298 | 0 | } |
299 | 0 | } |
300 | | |
301 | | // only check if the current key is going to cross the grandparents file |
302 | | // boundary (either the file beginning or ending). |
303 | 303 | if (num_grandparent_boundaries_crossed > 0) { |
304 | | // Cut the file before the current key if the size of the current output |
305 | | // file + its overlapped grandparent files is bigger than |
306 | | // max_compaction_bytes. Which is to prevent future bigger than |
307 | | // max_compaction_bytes compaction from the current output level. |
308 | 0 | if (grandparent_overlapped_bytes_ + current_output_file_size_ > |
309 | 0 | compaction_->max_compaction_bytes()) { |
310 | 0 | return true; |
311 | 0 | } |
312 | | |
313 | | // Cut the file if including the key is going to add a skippable file on |
314 | | // the grandparent level AND its size is reasonably big (1/8 of target file |
315 | | // size). For example, if it's compacting the files L0 + L1: |
316 | | // L0: [1, 21] |
317 | | // L1: [3, 23] |
318 | | // L2: [2, 4] [11, 15] [22, 24] |
319 | | // Without this break, it will output as: |
320 | | // L1: [1,3, 21,23] |
321 | | // With this break, it will output as (assuming [11, 15] at L2 is bigger |
322 | | // than 1/8 of target size): |
323 | | // L1: [1,3] [21,23] |
324 | | // Then for the future compactions, [11,15] won't be included. |
325 | | // For random datasets (either evenly distributed or skewed), it rarely |
326 | | // triggers this condition, but if the user is adding 2 different datasets |
327 | | // without any overlap, it may likely happen. |
328 | | // More details, check PR #1963 |
329 | 0 | const size_t num_skippable_boundaries_crossed = |
330 | 0 | being_grandparent_gap_ ? 2 : 3; |
331 | 0 | if (compaction_->immutable_options().compaction_style == |
332 | 0 | kCompactionStyleLevel && |
333 | 0 | num_grandparent_boundaries_crossed >= |
334 | 0 | num_skippable_boundaries_crossed && |
335 | 0 | grandparent_overlapped_bytes_ - previous_overlapped_bytes > |
336 | 0 | compaction_->target_output_file_size() / 8) { |
337 | 0 | return true; |
338 | 0 | } |
339 | | |
340 | | // Pre-cut the output file if it's reaching a certain size AND it's at the |
341 | | // boundary of a grandparent file. It can reduce the future compaction size, |
342 | | // the cost is having smaller files. |
343 | | // The pre-cut size threshold is based on how many grandparent boundaries |
344 | | // it has seen before. Basically, if it has seen no boundary at all, then it |
345 | | // will pre-cut at 50% target file size. Every boundary it has seen |
346 | | // increases the threshold by 5%, max at 90%, which it will always cut. |
347 | | // The idea is based on if it has seen more boundaries before, it will more |
348 | | // likely to see another boundary (file cutting opportunity) before the |
349 | | // target file size. The test shows it can generate larger files than a |
350 | | // static threshold like 75% and has a similar write amplification |
351 | | // improvement. |
352 | 0 | if (compaction_->immutable_options().compaction_style == |
353 | 0 | kCompactionStyleLevel && |
354 | 0 | current_output_file_size_ >= |
355 | 0 | ((compaction_->target_output_file_size() + 99) / 100) * |
356 | 0 | (50 + std::min(grandparent_boundary_switched_num_ * 5, |
357 | 0 | size_t{40}))) { |
358 | 0 | return true; |
359 | 0 | } |
360 | 0 | } |
361 | | |
362 | 303 | return false; |
363 | 303 | } |
364 | | |
365 | | Status CompactionOutputs::AddToOutput( |
366 | | const CompactionIterator& c_iter, |
367 | | const CompactionFileOpenFunc& open_file_func, |
368 | | const CompactionFileCloseFunc& close_file_func, |
369 | 2.17k | const ParsedInternalKey& prev_iter_output_internal_key) { |
370 | 2.17k | Status s; |
371 | 2.17k | bool is_range_del = c_iter.IsDeleteRangeSentinelKey(); |
372 | 2.17k | if (is_range_del && compaction_->bottommost_level()) { |
373 | | // We don't consider range tombstone for bottommost level since: |
374 | | // 1. there is no grandparent and hence no overlap to consider |
375 | | // 2. range tombstone may be dropped at bottommost level. |
376 | 0 | return s; |
377 | 0 | } |
378 | 2.17k | const Slice& key = c_iter.key(); |
379 | 2.17k | if (ShouldStopBefore(c_iter) && HasBuilder()) { |
380 | 0 | s = close_file_func(c_iter.InputStatus(), prev_iter_output_internal_key, |
381 | 0 | key, &c_iter, *this); |
382 | 0 | if (!s.ok()) { |
383 | 0 | return s; |
384 | 0 | } |
385 | | // reset grandparent information |
386 | 0 | grandparent_boundary_switched_num_ = 0; |
387 | 0 | grandparent_overlapped_bytes_ = |
388 | 0 | GetCurrentKeyGrandparentOverlappedBytes(key); |
389 | 0 | if (UNLIKELY(is_range_del)) { |
390 | | // lower bound for this new output file, this is needed as the lower bound |
391 | | // does not come from the smallest point key in this case. |
392 | 0 | range_tombstone_lower_bound_.DecodeFrom(key); |
393 | 0 | } else { |
394 | 0 | range_tombstone_lower_bound_.Clear(); |
395 | 0 | } |
396 | 0 | } |
397 | | |
398 | | // Open output file if necessary |
399 | 2.17k | if (!HasBuilder()) { |
400 | 1.84k | s = open_file_func(*this); |
401 | 1.84k | if (!s.ok()) { |
402 | 0 | return s; |
403 | 0 | } |
404 | 1.84k | } |
405 | | |
406 | | // c_iter may emit range deletion keys, so update `last_key_for_partitioner_` |
407 | | // here before returning below when `is_range_del` is true |
408 | 2.17k | if (partitioner_) { |
409 | 0 | last_key_for_partitioner_.assign(c_iter.user_key().data_, |
410 | 0 | c_iter.user_key().size_); |
411 | 0 | } |
412 | | |
413 | 2.17k | if (UNLIKELY(is_range_del)) { |
414 | 0 | return s; |
415 | 0 | } |
416 | | |
417 | 2.17k | assert(builder_ != nullptr); |
418 | 2.17k | const Slice& value = c_iter.value(); |
419 | 2.17k | s = current_output().validator.Add(key, value); |
420 | 2.17k | if (!s.ok()) { |
421 | 0 | return s; |
422 | 0 | } |
423 | 2.17k | builder_->Add(key, value); |
424 | | |
425 | 2.17k | stats_.num_output_records++; |
426 | 2.17k | current_output_file_size_ = builder_->EstimatedFileSize(); |
427 | | |
428 | 2.17k | if (blob_garbage_meter_) { |
429 | 0 | s = blob_garbage_meter_->ProcessOutFlow(key, value); |
430 | 0 | } |
431 | | |
432 | 2.17k | if (!s.ok()) { |
433 | 0 | return s; |
434 | 0 | } |
435 | | |
436 | 2.17k | const ParsedInternalKey& ikey = c_iter.ikey(); |
437 | 2.17k | if (ikey.type == kTypeValuePreferredSeqno) { |
438 | 0 | SequenceNumber preferred_seqno = ParsePackedValueForSeqno(value); |
439 | 0 | smallest_preferred_seqno_ = |
440 | 0 | std::min(smallest_preferred_seqno_, preferred_seqno); |
441 | 0 | } |
442 | 2.17k | s = current_output().meta.UpdateBoundaries(key, value, ikey.sequence, |
443 | 2.17k | ikey.type); |
444 | | |
445 | 2.17k | return s; |
446 | 2.17k | } |
447 | | |
448 | | namespace { |
449 | | void SetMaxSeqAndTs(InternalKey& internal_key, const Slice& user_key, |
450 | 0 | const size_t ts_sz) { |
451 | 0 | if (ts_sz) { |
452 | 0 | static constexpr char kTsMax[] = "\xff\xff\xff\xff\xff\xff\xff\xff\xff"; |
453 | 0 | if (ts_sz <= strlen(kTsMax)) { |
454 | 0 | internal_key = InternalKey(user_key, kMaxSequenceNumber, |
455 | 0 | kTypeRangeDeletion, Slice(kTsMax, ts_sz)); |
456 | 0 | } else { |
457 | 0 | internal_key = |
458 | 0 | InternalKey(user_key, kMaxSequenceNumber, kTypeRangeDeletion, |
459 | 0 | std::string(ts_sz, '\xff')); |
460 | 0 | } |
461 | 0 | } else { |
462 | 0 | internal_key.Set(user_key, kMaxSequenceNumber, kTypeRangeDeletion); |
463 | 0 | } |
464 | 0 | } |
465 | | } // namespace |
466 | | |
467 | | Status CompactionOutputs::AddRangeDels( |
468 | | CompactionRangeDelAggregator& range_del_agg, |
469 | | const Slice* comp_start_user_key, const Slice* comp_end_user_key, |
470 | | CompactionIterationStats& range_del_out_stats, bool bottommost_level, |
471 | | const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, |
472 | | std::pair<SequenceNumber, SequenceNumber> keep_seqno_range, |
473 | 0 | const Slice& next_table_min_key, const std::string& full_history_ts_low) { |
474 | | // The following example does not happen since |
475 | | // CompactionOutput::ShouldStopBefore() always return false for the first |
476 | | // point key. But we should consider removing this dependency. Suppose for the |
477 | | // first compaction output file, |
478 | | // - next_table_min_key.user_key == comp_start_user_key |
479 | | // - no point key is in the output file |
480 | | // - there is a range tombstone @seqno to be added that covers |
481 | | // comp_start_user_key |
482 | | // Then meta.smallest will be set to comp_start_user_key@seqno |
483 | | // and meta.largest will be set to comp_start_user_key@kMaxSequenceNumber |
484 | | // which violates the assumption that meta.smallest should be <= meta.largest. |
485 | 0 | assert(!range_del_agg.IsEmpty()); |
486 | 0 | FileMetaData& meta = current_output().meta; |
487 | 0 | const Comparator* ucmp = icmp.user_comparator(); |
488 | 0 | InternalKey lower_bound_buf, upper_bound_buf; |
489 | 0 | Slice lower_bound_guard, upper_bound_guard; |
490 | 0 | std::string smallest_user_key; |
491 | 0 | const Slice *lower_bound, *upper_bound; |
492 | | |
493 | | // We first determine the internal key lower_bound and upper_bound for |
494 | | // this output file. All and only range tombstones that overlap with |
495 | | // [lower_bound, upper_bound] should be added to this file. File |
496 | | // boundaries (meta.smallest/largest) should be updated accordingly when |
497 | | // extended by range tombstones. |
498 | 0 | size_t output_size = outputs_.size(); |
499 | 0 | if (output_size == 1) { |
500 | | // This is the first file in the subcompaction. |
501 | | // |
502 | | // When outputting a range tombstone that spans a subcompaction boundary, |
503 | | // the files on either side of that boundary need to include that |
504 | | // boundary's user key. Otherwise, the spanning range tombstone would lose |
505 | | // coverage. |
506 | | // |
507 | | // To achieve this while preventing files from overlapping in internal key |
508 | | // (an LSM invariant violation), we allow the earlier file to include the |
509 | | // boundary user key up to `kMaxSequenceNumber,kTypeRangeDeletion`. The |
510 | | // later file can begin at the boundary user key at the newest key version |
511 | | // it contains. At this point that version number is unknown since we have |
512 | | // not processed the range tombstones yet, so permit any version. Same story |
513 | | // applies to timestamp, and a non-nullptr `comp_start_user_key` should have |
514 | | // `kMaxTs` here, which similarly permits any timestamp. |
515 | 0 | if (comp_start_user_key) { |
516 | 0 | lower_bound_buf.Set(*comp_start_user_key, kMaxSequenceNumber, |
517 | 0 | kTypeRangeDeletion); |
518 | 0 | lower_bound_guard = lower_bound_buf.Encode(); |
519 | 0 | lower_bound = &lower_bound_guard; |
520 | 0 | } else { |
521 | 0 | lower_bound = nullptr; |
522 | 0 | } |
523 | 0 | } else { |
524 | | // For subsequent output tables, only include range tombstones from min |
525 | | // key onwards since the previous file was extended to contain range |
526 | | // tombstones falling before min key. |
527 | 0 | if (range_tombstone_lower_bound_.size() > 0) { |
528 | 0 | assert(meta.smallest.size() == 0 || |
529 | 0 | icmp.Compare(range_tombstone_lower_bound_, meta.smallest) < 0); |
530 | 0 | lower_bound_guard = range_tombstone_lower_bound_.Encode(); |
531 | 0 | } else { |
532 | 0 | assert(meta.smallest.size() > 0); |
533 | 0 | lower_bound_guard = meta.smallest.Encode(); |
534 | 0 | } |
535 | 0 | lower_bound = &lower_bound_guard; |
536 | 0 | } |
537 | |
|
538 | 0 | const size_t ts_sz = ucmp->timestamp_size(); |
539 | 0 | if (next_table_min_key.empty()) { |
540 | | // Last file of the subcompaction. |
541 | 0 | if (comp_end_user_key) { |
542 | 0 | upper_bound_buf.Set(*comp_end_user_key, kMaxSequenceNumber, |
543 | 0 | kTypeRangeDeletion); |
544 | 0 | upper_bound_guard = upper_bound_buf.Encode(); |
545 | 0 | upper_bound = &upper_bound_guard; |
546 | 0 | } else { |
547 | 0 | upper_bound = nullptr; |
548 | 0 | } |
549 | 0 | } else { |
550 | | // There is another file coming whose coverage will begin at |
551 | | // `next_table_min_key`. The current file needs to extend range tombstone |
552 | | // coverage through its own keys (through `meta.largest`) and through user |
553 | | // keys preceding `next_table_min_key`'s user key. |
554 | 0 | ParsedInternalKey next_table_min_key_parsed; |
555 | 0 | ParseInternalKey(next_table_min_key, &next_table_min_key_parsed, |
556 | 0 | false /* log_err_key */) |
557 | 0 | .PermitUncheckedError(); |
558 | 0 | assert(next_table_min_key_parsed.sequence < kMaxSequenceNumber); |
559 | 0 | assert(meta.largest.size() == 0 || |
560 | 0 | icmp.Compare(meta.largest.Encode(), next_table_min_key) < 0); |
561 | 0 | assert(!lower_bound || icmp.Compare(*lower_bound, next_table_min_key) <= 0); |
562 | 0 | if (meta.largest.size() > 0 && |
563 | 0 | ucmp->EqualWithoutTimestamp(meta.largest.user_key(), |
564 | 0 | next_table_min_key_parsed.user_key)) { |
565 | | // Caution: this assumes meta.largest.Encode() lives longer than |
566 | | // upper_bound, which is only true if meta.largest is never updated. |
567 | | // This just happens to be the case here since meta.largest serves |
568 | | // as the upper_bound. |
569 | 0 | upper_bound_guard = meta.largest.Encode(); |
570 | 0 | } else { |
571 | 0 | SetMaxSeqAndTs(upper_bound_buf, next_table_min_key_parsed.user_key, |
572 | 0 | ts_sz); |
573 | 0 | upper_bound_guard = upper_bound_buf.Encode(); |
574 | 0 | } |
575 | 0 | upper_bound = &upper_bound_guard; |
576 | 0 | } |
577 | 0 | if (lower_bound && upper_bound && |
578 | 0 | icmp.Compare(*lower_bound, *upper_bound) > 0) { |
579 | 0 | assert(meta.smallest.size() == 0 && |
580 | 0 | ucmp->EqualWithoutTimestamp(ExtractUserKey(*lower_bound), |
581 | 0 | ExtractUserKey(*upper_bound))); |
582 | | // This can only happen when lower_bound have the same user key as |
583 | | // next_table_min_key and that there is no point key in the current |
584 | | // compaction output file. |
585 | 0 | return Status::OK(); |
586 | 0 | } |
587 | | // The end key of the subcompaction must be bigger or equal to the upper |
588 | | // bound. If the end of subcompaction is null or the upper bound is null, |
589 | | // it means that this file is the last file in the compaction. So there |
590 | | // will be no overlapping between this file and others. |
591 | 0 | assert(comp_end_user_key == nullptr || upper_bound == nullptr || |
592 | 0 | ucmp->CompareWithoutTimestamp(ExtractUserKey(*upper_bound), |
593 | 0 | *comp_end_user_key) <= 0); |
594 | 0 | auto it = range_del_agg.NewIterator(lower_bound, upper_bound); |
595 | 0 | Slice last_tombstone_start_user_key{}; |
596 | 0 | bool reached_lower_bound = false; |
597 | 0 | const ReadOptions read_options(Env::IOActivity::kCompaction); |
598 | 0 | for (it->SeekToFirst(); it->Valid(); it->Next()) { |
599 | 0 | auto tombstone = it->Tombstone(); |
600 | 0 | auto kv = tombstone.Serialize(); |
601 | | // Filter out by seqno for per-key placement |
602 | 0 | if (tombstone.seq_ < keep_seqno_range.first || |
603 | 0 | tombstone.seq_ >= keep_seqno_range.second) { |
604 | 0 | continue; |
605 | 0 | } |
606 | | |
607 | 0 | InternalKey tombstone_end = tombstone.SerializeEndKey(); |
608 | | // TODO: the underlying iterator should support clamping the bounds. |
609 | | // tombstone_end.Encode is of form user_key@kMaxSeqno |
610 | | // if it is equal to lower_bound, there is no need to include |
611 | | // such range tombstone. |
612 | 0 | if (!reached_lower_bound && lower_bound && |
613 | 0 | icmp.Compare(tombstone_end.Encode(), *lower_bound) <= 0) { |
614 | 0 | continue; |
615 | 0 | } |
616 | 0 | assert(!lower_bound || |
617 | 0 | icmp.Compare(*lower_bound, tombstone_end.Encode()) <= 0); |
618 | 0 | reached_lower_bound = true; |
619 | | |
620 | | // Garbage collection for range tombstones. |
621 | | // If user-defined timestamp is enabled, range tombstones are dropped if |
622 | | // they are at bottommost_level, below full_history_ts_low and not visible |
623 | | // in any snapshot. trim_ts_ is passed to the constructor for |
624 | | // range_del_agg_, and range_del_agg_ internally drops tombstones above |
625 | | // trim_ts_. |
626 | 0 | bool consider_drop = |
627 | 0 | tombstone.seq_ <= earliest_snapshot && |
628 | 0 | (ts_sz == 0 || |
629 | 0 | (!full_history_ts_low.empty() && |
630 | 0 | ucmp->CompareTimestamp(tombstone.ts_, full_history_ts_low) < 0)); |
631 | 0 | if (consider_drop && bottommost_level) { |
632 | | // TODO(andrewkr): tombstones that span multiple output files are |
633 | | // counted for each compaction output file, so lots of double |
634 | | // counting. |
635 | 0 | range_del_out_stats.num_range_del_drop_obsolete++; |
636 | 0 | range_del_out_stats.num_record_drop_obsolete++; |
637 | 0 | continue; |
638 | 0 | } |
639 | | |
640 | 0 | assert(lower_bound == nullptr || |
641 | 0 | ucmp->CompareWithoutTimestamp(ExtractUserKey(*lower_bound), |
642 | 0 | kv.second) < 0); |
643 | 0 | InternalKey tombstone_start = kv.first; |
644 | 0 | if (lower_bound && |
645 | 0 | ucmp->CompareWithoutTimestamp(tombstone_start.user_key(), |
646 | 0 | ExtractUserKey(*lower_bound)) < 0) { |
647 | | // This just updates the non-timestamp portion of `tombstone_start`'s user |
648 | | // key. Ideally there would be a simpler API usage |
649 | 0 | ParsedInternalKey tombstone_start_parsed; |
650 | 0 | ParseInternalKey(tombstone_start.Encode(), &tombstone_start_parsed, |
651 | 0 | false /* log_err_key */) |
652 | 0 | .PermitUncheckedError(); |
653 | | // timestamp should be from where sequence number is from, which is from |
654 | | // tombstone in this case |
655 | 0 | std::string ts = |
656 | 0 | tombstone_start_parsed.GetTimestamp(ucmp->timestamp_size()) |
657 | 0 | .ToString(); |
658 | 0 | tombstone_start_parsed.user_key = ExtractUserKey(*lower_bound); |
659 | 0 | tombstone_start.SetFrom(tombstone_start_parsed, ts); |
660 | 0 | } |
661 | 0 | if (upper_bound != nullptr && |
662 | 0 | icmp.Compare(*upper_bound, tombstone_start.Encode()) < 0) { |
663 | 0 | break; |
664 | 0 | } |
665 | 0 | if (lower_bound && |
666 | 0 | icmp.Compare(tombstone_start.Encode(), *lower_bound) < 0) { |
667 | 0 | tombstone_start.DecodeFrom(*lower_bound); |
668 | 0 | } |
669 | 0 | if (upper_bound && icmp.Compare(*upper_bound, tombstone_end.Encode()) < 0) { |
670 | 0 | tombstone_end.DecodeFrom(*upper_bound); |
671 | 0 | } |
672 | 0 | if (consider_drop && compaction_->KeyRangeNotExistsBeyondOutputLevel( |
673 | 0 | tombstone_start.user_key(), |
674 | 0 | tombstone_end.user_key(), &level_ptrs_)) { |
675 | 0 | range_del_out_stats.num_range_del_drop_obsolete++; |
676 | 0 | range_del_out_stats.num_record_drop_obsolete++; |
677 | 0 | continue; |
678 | 0 | } |
679 | | // Here we show that *only* range tombstones that overlap with |
680 | | // [lower_bound, upper_bound] are added to the current file, and |
681 | | // sanity checking invariants that should hold: |
682 | | // - [tombstone_start, tombstone_end] overlaps with [lower_bound, |
683 | | // upper_bound] |
684 | | // - meta.smallest <= meta.largest |
685 | | // Corresponding assertions are made, the proof is broken is any of them |
686 | | // fails. |
687 | | // TODO: show that *all* range tombstones that overlap with |
688 | | // [lower_bound, upper_bound] are added. |
689 | | // TODO: some invariant about boundaries are correctly updated. |
690 | | // |
691 | | // Note that `tombstone_start` is updated in the if condition above, we use |
692 | | // tombstone_start to refer to its initial value, i.e., |
693 | | // it->Tombstone().first, and use tombstone_start* to refer to its value |
694 | | // after the update. |
695 | | // |
696 | | // To show [lower_bound, upper_bound] overlaps with [tombstone_start, |
697 | | // tombstone_end]: |
698 | | // lower_bound <= upper_bound from the if condition right after all |
699 | | // bounds are initialized. We assume each tombstone fragment has |
700 | | // start_key.user_key < end_key.user_key, so |
701 | | // tombstone_start < tombstone_end by |
702 | | // FragmentedTombstoneIterator::Tombstone(). So these two ranges are both |
703 | | // non-emtpy. The flag `reached_lower_bound` and the if logic before it |
704 | | // ensures lower_bound <= tombstone_end. tombstone_start is only updated |
705 | | // if it has a smaller user_key than lower_bound user_key, so |
706 | | // tombstone_start <= tombstone_start*. The above if condition implies |
707 | | // tombstone_start* <= upper_bound. So we have |
708 | | // tombstone_start <= upper_bound and lower_bound <= tombstone_end |
709 | | // and the two ranges overlap. |
710 | | // |
711 | | // To show meta.smallest <= meta.largest: |
712 | | // From the implementation of UpdateBoundariesForRange(), it suffices to |
713 | | // prove that when it is first called in this function, its parameters |
714 | | // satisfy `start <= end`, where start = max(tombstone_start*, lower_bound) |
715 | | // and end = min(tombstone_end, upper_bound). From the above proof we have |
716 | | // lower_bound <= tombstone_end and lower_bound <= upper_bound. We only need |
717 | | // to show that tombstone_start* <= min(tombstone_end, upper_bound). |
718 | | // Note that tombstone_start*.user_key = max(tombstone_start.user_key, |
719 | | // lower_bound.user_key). Assuming tombstone_end always has |
720 | | // kMaxSequenceNumber and lower_bound.seqno < kMaxSequenceNumber. |
721 | | // Since lower_bound <= tombstone_end and lower_bound.seqno < |
722 | | // tombstone_end.seqno (in absolute number order, not internal key order), |
723 | | // lower_bound.user_key < tombstone_end.user_key. |
724 | | // Since lower_bound.user_key < tombstone_end.user_key and |
725 | | // tombstone_start.user_key < tombstone_end.user_key, tombstone_start* < |
726 | | // tombstone_end. Since tombstone_start* <= upper_bound from the above proof |
727 | | // and tombstone_start* < tombstone_end, tombstone_start* <= |
728 | | // min(tombstone_end, upper_bound), so the two ranges overlap. |
729 | | |
730 | | // Range tombstone is not supported by output validator yet. |
731 | 0 | builder_->Add(kv.first.Encode(), kv.second); |
732 | 0 | assert(icmp.Compare(tombstone_start, tombstone_end) <= 0); |
733 | 0 | meta.UpdateBoundariesForRange(tombstone_start, tombstone_end, |
734 | 0 | tombstone.seq_, icmp); |
735 | 0 | if (!bottommost_level) { |
736 | 0 | bool start_user_key_changed = |
737 | 0 | last_tombstone_start_user_key.empty() || |
738 | 0 | ucmp->CompareWithoutTimestamp(last_tombstone_start_user_key, |
739 | 0 | it->start_key()) < 0; |
740 | 0 | last_tombstone_start_user_key = it->start_key(); |
741 | 0 | if (start_user_key_changed) { |
742 | | // If tombstone_start >= tombstone_end, then either no key range is |
743 | | // covered, or that they have the same user key. If they have the same |
744 | | // user key, then the internal key range should only be within this |
745 | | // level, and no keys from older levels is covered. |
746 | 0 | if (ucmp->CompareWithoutTimestamp(tombstone_start.user_key(), |
747 | 0 | tombstone_end.user_key()) < 0) { |
748 | 0 | SizeApproximationOptions approx_opts; |
749 | 0 | approx_opts.files_size_error_margin = 0.1; |
750 | 0 | auto approximate_covered_size = |
751 | 0 | compaction_->input_version()->version_set()->ApproximateSize( |
752 | 0 | approx_opts, read_options, compaction_->input_version(), |
753 | 0 | tombstone_start.Encode(), tombstone_end.Encode(), |
754 | 0 | compaction_->output_level() + 1 /* start_level */, |
755 | 0 | -1 /* end_level */, kCompaction); |
756 | 0 | meta.compensated_range_deletion_size += approximate_covered_size; |
757 | 0 | } |
758 | 0 | } |
759 | 0 | } |
760 | 0 | } |
761 | 0 | return Status::OK(); |
762 | 0 | } |
763 | | |
764 | 5.57k | void CompactionOutputs::FillFilesToCutForTtl() { |
765 | 5.57k | if (compaction_->immutable_options().compaction_style != |
766 | 5.57k | kCompactionStyleLevel || |
767 | 5.57k | compaction_->immutable_options().compaction_pri != kMinOverlappingRatio || |
768 | 5.57k | compaction_->mutable_cf_options().ttl == 0 || |
769 | 5.57k | compaction_->num_input_levels() < 2 || compaction_->bottommost_level()) { |
770 | 5.57k | return; |
771 | 5.57k | } |
772 | | |
773 | | // We define new file with the oldest ancestor time to be younger than 1/4 |
774 | | // TTL, and an old one to be older than 1/2 TTL time. |
775 | 0 | int64_t temp_current_time; |
776 | 0 | auto get_time_status = compaction_->immutable_options().clock->GetCurrentTime( |
777 | 0 | &temp_current_time); |
778 | 0 | if (!get_time_status.ok()) { |
779 | 0 | return; |
780 | 0 | } |
781 | | |
782 | 0 | auto current_time = static_cast<uint64_t>(temp_current_time); |
783 | 0 | if (current_time < compaction_->mutable_cf_options().ttl) { |
784 | 0 | return; |
785 | 0 | } |
786 | | |
787 | 0 | uint64_t old_age_thres = |
788 | 0 | current_time - compaction_->mutable_cf_options().ttl / 2; |
789 | 0 | const std::vector<FileMetaData*>& olevel = |
790 | 0 | *(compaction_->inputs(compaction_->num_input_levels() - 1)); |
791 | 0 | for (FileMetaData* file : olevel) { |
792 | | // Worth filtering out by start and end? |
793 | 0 | uint64_t oldest_ancester_time = file->TryGetOldestAncesterTime(); |
794 | | // We put old files if they are not too small to prevent a flood |
795 | | // of small files. |
796 | 0 | if (oldest_ancester_time < old_age_thres && |
797 | 0 | file->fd.GetFileSize() > |
798 | 0 | compaction_->mutable_cf_options().target_file_size_base / 2) { |
799 | 0 | files_to_cut_for_ttl_.push_back(file); |
800 | 0 | } |
801 | 0 | } |
802 | 0 | } |
803 | | |
804 | | CompactionOutputs::CompactionOutputs(const Compaction* compaction, |
805 | | const bool is_proximal_level) |
806 | 6.85k | : compaction_(compaction), is_proximal_level_(is_proximal_level) { |
807 | 6.85k | partitioner_ = compaction->output_level() == 0 |
808 | 6.85k | ? nullptr |
809 | 6.85k | : compaction->CreateSstPartitioner(); |
810 | | |
811 | 6.85k | if (compaction->output_level() != 0) { |
812 | 5.57k | FillFilesToCutForTtl(); |
813 | 5.57k | } |
814 | | |
815 | 6.85k | level_ptrs_ = std::vector<size_t>(compaction_->number_levels(), 0); |
816 | 6.85k | } |
817 | | |
818 | | } // namespace ROCKSDB_NAMESPACE |