/src/rocksdb/db/compaction/compaction_outputs.h
Line | Count | Source |
1 | | // Copyright (c) Meta Platforms, Inc. and affiliates. |
2 | | // |
3 | | // This source code is licensed under both the GPLv2 (found in the |
4 | | // COPYING file in the root directory) and Apache 2.0 License |
5 | | // (found in the LICENSE.Apache file in the root directory). |
6 | | // |
7 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
8 | | // Use of this source code is governed by a BSD-style license that can be |
9 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
10 | | |
11 | | #pragma once |
12 | | |
13 | | #include "db/blob/blob_garbage_meter.h" |
14 | | #include "db/compaction/compaction.h" |
15 | | #include "db/compaction/compaction_iterator.h" |
16 | | #include "db/internal_stats.h" |
17 | | #include "db/output_validator.h" |
18 | | |
19 | | namespace ROCKSDB_NAMESPACE { |
20 | | |
21 | | class CompactionOutputs; |
22 | | using CompactionFileOpenFunc = std::function<Status(CompactionOutputs&)>; |
23 | | using CompactionFileCloseFunc = |
24 | | std::function<Status(const Status&, const ParsedInternalKey&, const Slice&, |
25 | | const CompactionIterator*, CompactionOutputs&)>; |
26 | | |
27 | | // Files produced by subcompaction, most of the functions are used by |
28 | | // compaction_job Open/Close compaction file functions. |
29 | | class CompactionOutputs { |
30 | | public: |
31 | | // compaction output file |
32 | | struct Output { |
33 | | Output(FileMetaData&& _meta, const InternalKeyComparator& _icmp, |
34 | | bool _enable_hash, bool _finished, uint64_t precalculated_hash, |
35 | | bool _is_proximal_level) |
36 | 1.63k | : meta(std::move(_meta)), |
37 | 1.63k | validator(_icmp, _enable_hash, precalculated_hash), |
38 | 1.63k | finished(_finished), |
39 | 1.63k | is_proximal_level(_is_proximal_level) {} |
40 | | FileMetaData meta; |
41 | | OutputValidator validator; |
42 | | bool finished; |
43 | | bool is_proximal_level; |
44 | | std::shared_ptr<const TableProperties> table_properties; |
45 | | }; |
46 | | |
47 | | CompactionOutputs() = delete; |
48 | | |
49 | | explicit CompactionOutputs(const Compaction* compaction, |
50 | | const bool is_proximal_level); |
51 | | |
52 | 1.63k | bool IsProximalLevel() const { return is_proximal_level_; } |
53 | | |
54 | | // Add generated output to the list |
55 | | void AddOutput(FileMetaData&& meta, const InternalKeyComparator& icmp, |
56 | | bool enable_hash, bool finished = false, |
57 | 1.63k | uint64_t precalculated_hash = 0) { |
58 | 1.63k | outputs_.emplace_back(std::move(meta), icmp, enable_hash, finished, |
59 | 1.63k | precalculated_hash, is_proximal_level_); |
60 | 1.63k | } |
61 | | |
62 | 0 | const std::vector<Output>& GetOutputs() const { return outputs_; } |
63 | | |
64 | | // Set new table builder for the current output |
65 | | void NewBuilder(const TableBuilderOptions& tboptions); |
66 | | |
67 | | // Assign a new WritableFileWriter to the current output |
68 | 1.63k | void AssignFileWriter(WritableFileWriter* writer) { |
69 | 1.63k | file_writer_.reset(writer); |
70 | 1.63k | } |
71 | | |
72 | | // TODO: Move the BlobDB builder into CompactionOutputs |
73 | 2.03k | const std::vector<BlobFileAddition>& GetBlobFileAdditions() const { |
74 | 2.03k | if (is_proximal_level_) { |
75 | 0 | assert(blob_file_additions_.empty()); |
76 | 0 | } |
77 | 2.03k | return blob_file_additions_; |
78 | 2.03k | } |
79 | | |
80 | 0 | std::vector<BlobFileAddition>* GetBlobFileAdditionsPtr() { |
81 | 0 | assert(!is_proximal_level_); |
82 | 0 | return &blob_file_additions_; |
83 | 0 | } |
84 | | |
85 | 2.03k | bool HasBlobFileAdditions() const { return !blob_file_additions_.empty(); } |
86 | | |
87 | | // Get all file paths (SST and blob) created during compaction. |
88 | 0 | const std::vector<std::string>& GetOutputFilePaths() const { |
89 | 0 | return output_file_paths_; |
90 | 0 | } |
91 | | |
92 | 0 | std::vector<std::string>* GetOutputFilePathsPtr() { |
93 | 0 | return &output_file_paths_; |
94 | 0 | } |
95 | | |
96 | 1.63k | void AddOutputFilePath(const std::string& path) { |
97 | 1.63k | output_file_paths_.push_back(path); |
98 | 1.63k | } |
99 | | |
100 | 0 | BlobGarbageMeter* CreateBlobGarbageMeter() { |
101 | 0 | assert(!is_proximal_level_); |
102 | 0 | blob_garbage_meter_ = std::make_unique<BlobGarbageMeter>(); |
103 | 0 | return blob_garbage_meter_.get(); |
104 | 0 | } |
105 | | |
106 | 2.03k | BlobGarbageMeter* GetBlobGarbageMeter() const { |
107 | 2.03k | if (is_proximal_level_) { |
108 | | // blobdb doesn't support per_key_placement yet |
109 | 0 | assert(blob_garbage_meter_ == nullptr); |
110 | 0 | return nullptr; |
111 | 0 | } |
112 | 2.03k | return blob_garbage_meter_.get(); |
113 | 2.03k | } |
114 | | |
115 | 0 | void UpdateBlobStats() { |
116 | 0 | assert(!is_proximal_level_); |
117 | 0 | stats_.num_output_files_blob = |
118 | 0 | static_cast<int>(blob_file_additions_.size()); |
119 | 0 | for (const auto& blob : blob_file_additions_) { |
120 | 0 | stats_.bytes_written_blob += blob.GetTotalBlobBytes(); |
121 | 0 | } |
122 | 0 | } |
123 | | |
124 | | // Finish the current output file |
125 | | Status Finish(const Status& intput_status, |
126 | | const SeqnoToTimeMapping& seqno_to_time_mapping); |
127 | | |
128 | | // Update output table properties from already populated TableProperties. |
129 | | // Used for remote compaction |
130 | 0 | void UpdateTableProperties(const TableProperties& table_properties) { |
131 | 0 | current_output().table_properties = |
132 | 0 | std::make_shared<TableProperties>(table_properties); |
133 | 0 | } |
134 | | // Update output table properties from table builder |
135 | 1.19k | void UpdateTableProperties() { |
136 | 1.19k | current_output().table_properties = |
137 | 1.19k | std::make_shared<TableProperties>(GetTableProperties()); |
138 | 1.19k | } |
139 | | |
140 | | IOStatus WriterSyncClose(const Status& intput_status, SystemClock* clock, |
141 | | Statistics* statistics, bool use_fsync); |
142 | | |
143 | 2.39k | TableProperties GetTableProperties() { |
144 | 2.39k | return builder_->GetTableProperties(); |
145 | 2.39k | } |
146 | | |
147 | 1.19k | Slice SmallestUserKey() const { |
148 | 1.19k | if (!outputs_.empty() && outputs_[0].finished) { |
149 | 1.19k | return outputs_[0].meta.smallest.user_key(); |
150 | 1.19k | } else { |
151 | 0 | return Slice{nullptr, 0}; |
152 | 0 | } |
153 | 1.19k | } |
154 | | |
155 | 1.19k | Slice LargestUserKey() const { |
156 | 1.19k | if (!outputs_.empty() && outputs_.back().finished) { |
157 | 1.19k | return outputs_.back().meta.largest.user_key(); |
158 | 1.19k | } else { |
159 | 0 | return Slice{nullptr, 0}; |
160 | 0 | } |
161 | 1.19k | } |
162 | | |
163 | | // In case the last output file is empty, which doesn't need to keep. |
164 | 6.14k | void RemoveLastEmptyOutput() { |
165 | 6.14k | if (!outputs_.empty() && !outputs_.back().meta.fd.file_size) { |
166 | | // An error occurred, so ignore the last output. |
167 | 437 | outputs_.pop_back(); |
168 | 437 | } |
169 | 6.14k | } |
170 | | |
171 | | // Remove the last output, for example the last output doesn't have data (no |
172 | | // entry and no range-dels), but file_size might not be 0, as it has SST |
173 | | // metadata. |
174 | 0 | void RemoveLastOutput() { |
175 | 0 | assert(!outputs_.empty()); |
176 | 0 | outputs_.pop_back(); |
177 | 0 | } |
178 | | |
179 | 8.90k | bool HasBuilder() const { return builder_ != nullptr; } |
180 | | |
181 | 4.46k | FileMetaData* GetMetaData() { return ¤t_output().meta; } |
182 | | |
183 | 6.30k | bool HasOutput() const { return !outputs_.empty(); } |
184 | | |
185 | 1.63k | uint64_t NumEntries() const { return builder_->NumEntries(); } |
186 | | |
187 | 3.07k | uint64_t GetWorkerCPUMicros() const { |
188 | 3.07k | return worker_cpu_micros_ + (builder_ ? builder_->GetWorkerCPUMicros() : 0); |
189 | 3.07k | } |
190 | | |
191 | 1.63k | void ResetBuilder() { |
192 | 1.63k | builder_.reset(); |
193 | 1.63k | current_output_file_size_ = 0; |
194 | 1.63k | } |
195 | | |
196 | | // Add range deletions from the range_del_agg_ to the current output file. |
197 | | // Input parameters, `range_tombstone_lower_bound_` and current output's |
198 | | // metadata determine the bounds on range deletions to add. Updates output |
199 | | // file metadata boundary if extended by range tombstones. |
200 | | // |
201 | | // @param comp_start_user_key and comp_end_user_key include timestamp if |
202 | | // user-defined timestamp is enabled. Their timestamp should be max timestamp. |
203 | | // @param next_table_min_key internal key lower bound for the next compaction |
204 | | // output. |
205 | | // @param full_history_ts_low used for range tombstone garbage collection. |
206 | | Status AddRangeDels( |
207 | | CompactionRangeDelAggregator& range_del_agg, |
208 | | const Slice* comp_start_user_key, const Slice* comp_end_user_key, |
209 | | CompactionIterationStats& range_del_out_stats, bool bottommost_level, |
210 | | const InternalKeyComparator& icmp, SequenceNumber earliest_snapshot, |
211 | | std::pair<SequenceNumber, SequenceNumber> keep_seqno_range, |
212 | | const Slice& next_table_min_key, const std::string& full_history_ts_low); |
213 | | |
214 | 0 | void SetNumOutputRecords(uint64_t num_output_records) { |
215 | 0 | stats_.num_output_records = num_output_records; |
216 | 0 | } |
217 | | |
218 | | private: |
219 | | friend class SubcompactionState; |
220 | | |
221 | | void FillFilesToCutForTtl(); |
222 | | |
223 | | void SetOutputSlitKey(const std::optional<Slice> start, |
224 | 3.07k | const std::optional<Slice> end) { |
225 | 3.07k | const InternalKeyComparator* icmp = |
226 | 3.07k | &compaction_->column_family_data()->internal_comparator(); |
227 | | |
228 | 3.07k | const InternalKey* output_split_key = compaction_->GetOutputSplitKey(); |
229 | | // Invalid output_split_key indicates that we do not need to split |
230 | 3.07k | if (output_split_key != nullptr) { |
231 | | // We may only split the output when the cursor is in the range. Split |
232 | 0 | if ((!end.has_value() || |
233 | 0 | icmp->user_comparator()->Compare( |
234 | 0 | ExtractUserKey(output_split_key->Encode()), *end) < 0) && |
235 | 0 | (!start.has_value() || |
236 | 0 | icmp->user_comparator()->Compare( |
237 | 0 | ExtractUserKey(output_split_key->Encode()), *start) > 0)) { |
238 | 0 | local_output_split_key_ = output_split_key; |
239 | 0 | } |
240 | 0 | } |
241 | 3.07k | } |
242 | | |
243 | | // Returns true iff we should stop building the current output |
244 | | // before processing the current key in compaction iterator. |
245 | | bool ShouldStopBefore(const CompactionIterator& c_iter); |
246 | | |
247 | 6.14k | void Cleanup() { |
248 | 6.14k | if (builder_ != nullptr) { |
249 | | // May happen if we get a shutdown call in the middle of compaction |
250 | 0 | builder_->Abandon(); |
251 | 0 | builder_.reset(); |
252 | 0 | } |
253 | 6.14k | } |
254 | | |
255 | | // Updates states related to file cutting for TTL. |
256 | | // Returns a boolean value indicating whether the current |
257 | | // compaction output file should be cut before `internal_key`. |
258 | | // |
259 | | // @param internal_key the current key to be added to output. |
260 | | bool UpdateFilesToCutForTTLStates(const Slice& internal_key); |
261 | | |
262 | | // update tracked grandparents information like grandparent index, if it's |
263 | | // in the gap between 2 grandparent files, accumulated grandparent files size |
264 | | // etc. |
265 | | // It returns how many boundaries it crosses by including current key. |
266 | | size_t UpdateGrandparentBoundaryInfo(const Slice& internal_key); |
267 | | |
268 | | // helper function to get the overlapped grandparent files size, it's only |
269 | | // used for calculating the first key's overlap. |
270 | | uint64_t GetCurrentKeyGrandparentOverlappedBytes( |
271 | | const Slice& internal_key) const; |
272 | | |
273 | | // Add current key from compaction_iterator to the output file. If needed |
274 | | // close and open new compaction output with the functions provided. |
275 | | Status AddToOutput(const CompactionIterator& c_iter, |
276 | | const CompactionFileOpenFunc& open_file_func, |
277 | | const CompactionFileCloseFunc& close_file_func, |
278 | | const ParsedInternalKey& prev_iter_output_internal_key); |
279 | | |
280 | | // Close the current output. `open_file_func` is needed for creating new file |
281 | | // for range-dels only output file. |
282 | | Status CloseOutput(const Status& curr_status, |
283 | | CompactionRangeDelAggregator* range_del_agg, |
284 | | const CompactionFileOpenFunc& open_file_func, |
285 | 3.07k | const CompactionFileCloseFunc& close_file_func) { |
286 | 3.07k | Status status = curr_status; |
287 | | // Handle subcompaction containing only range deletions. They could |
288 | | // be dropped or sent to another output level, so this is only an |
289 | | // over-approximate check for whether opening is needed. |
290 | 3.07k | if (status.ok() && !HasBuilder() && !HasOutput() && range_del_agg && |
291 | 838 | !range_del_agg->IsEmpty()) { |
292 | 0 | status = open_file_func(*this); |
293 | 0 | } |
294 | | |
295 | 3.07k | if (HasBuilder()) { |
296 | 1.63k | const ParsedInternalKey empty_internal_key{}; |
297 | 1.63k | const Slice empty_key{}; |
298 | 1.63k | Status s = close_file_func(status, empty_internal_key, empty_key, |
299 | 1.63k | nullptr /* c_iter */, *this); |
300 | 1.63k | if (!s.ok() && status.ok()) { |
301 | 0 | status = s; |
302 | 0 | } |
303 | 1.63k | } |
304 | | |
305 | 3.07k | return status; |
306 | 3.07k | } |
307 | | |
308 | | // This subcompaction's output could be empty if compaction was aborted before |
309 | | // this subcompaction had a chance to generate any output files. When |
310 | | // subcompactions are executed sequentially this is more likely and will be |
311 | | // particularly likely for the later subcompactions to be empty. Once they are |
312 | | // run in parallel however it should be much rarer. |
313 | | // It's caller's responsibility to make sure it's not empty. |
314 | 11.0k | Output& current_output() { |
315 | 11.0k | assert(!outputs_.empty()); |
316 | 11.0k | return outputs_.back(); |
317 | 11.0k | } |
318 | | |
319 | | const Compaction* compaction_; |
320 | | |
321 | | // current output builder and writer |
322 | | std::unique_ptr<TableBuilder> builder_; |
323 | | std::unique_ptr<WritableFileWriter> file_writer_; |
324 | | uint64_t current_output_file_size_ = 0; |
325 | | SequenceNumber smallest_preferred_seqno_ = kMaxSequenceNumber; |
326 | | |
327 | | // Sum of all the GetWorkerCPUMicros() for all the closed builders so far. |
328 | | uint64_t worker_cpu_micros_ = 0; |
329 | | |
330 | | // all the compaction outputs so far |
331 | | std::vector<Output> outputs_; |
332 | | |
333 | | // BlobDB info |
334 | | std::vector<BlobFileAddition> blob_file_additions_; |
335 | | std::unique_ptr<BlobGarbageMeter> blob_garbage_meter_; |
336 | | |
337 | | // All file paths (SST and blob) created during compaction. |
338 | | // Used for cleanup on abort - ensures orphan files are deleted even if |
339 | | // they were removed from outputs_ or blob_file_additions_ (e.g., by |
340 | | // RemoveLastEmptyOutput when file_size is 0 because builder was abandoned). |
341 | | std::vector<std::string> output_file_paths_; |
342 | | |
343 | | // Per level's output stat |
344 | | InternalStats::CompactionStats stats_; |
345 | | |
346 | | // indicate if this CompactionOutputs obj for proximal_level, should always |
347 | | // be false if per_key_placement feature is not enabled. |
348 | | const bool is_proximal_level_; |
349 | | |
350 | | // partitioner information |
351 | | std::string last_key_for_partitioner_; |
352 | | std::unique_ptr<SstPartitioner> partitioner_; |
353 | | |
354 | | // A flag determines if this subcompaction has been split by the cursor |
355 | | // for RoundRobin compaction |
356 | | bool is_split_ = false; |
357 | | |
358 | | // We also maintain the output split key for each subcompaction to avoid |
359 | | // repetitive comparison in ShouldStopBefore() |
360 | | const InternalKey* local_output_split_key_ = nullptr; |
361 | | |
362 | | // Some identified files with old oldest ancester time and the range should be |
363 | | // isolated out so that the output file(s) in that range can be merged down |
364 | | // for TTL and clear the timestamps for the range. |
365 | | std::vector<FileMetaData*> files_to_cut_for_ttl_; |
366 | | int cur_files_to_cut_for_ttl_ = -1; |
367 | | int next_files_to_cut_for_ttl_ = 0; |
368 | | |
369 | | // An index that used to speed up ShouldStopBefore(). |
370 | | size_t grandparent_index_ = 0; |
371 | | |
372 | | // if the output key is being grandparent files gap, so: |
373 | | // key > grandparents[grandparent_index_ - 1].largest && |
374 | | // key < grandparents[grandparent_index_].smallest |
375 | | bool being_grandparent_gap_ = true; |
376 | | |
377 | | // The number of bytes overlapping between the current output and |
378 | | // grandparent files used in ShouldStopBefore(). |
379 | | uint64_t grandparent_overlapped_bytes_ = 0; |
380 | | |
381 | | // A flag determines whether the key has been seen in ShouldStopBefore() |
382 | | bool seen_key_ = false; |
383 | | |
384 | | // for the current output file, how many file boundaries has it crossed, |
385 | | // basically number of files overlapped * 2 |
386 | | size_t grandparent_boundary_switched_num_ = 0; |
387 | | |
388 | | // The smallest key of the current output file, this is set when current |
389 | | // output file's smallest key is a range tombstone start key. |
390 | | InternalKey range_tombstone_lower_bound_; |
391 | | |
392 | | // Used for calls to compaction->KeyRangeNotExistsBeyondOutputLevel() in |
393 | | // CompactionOutputs::AddRangeDels(). |
394 | | // level_ptrs_[i] holds index of the file that was checked during the last |
395 | | // call to compaction->KeyRangeNotExistsBeyondOutputLevel(). This allows |
396 | | // future calls to the function to pick up where it left off, since each |
397 | | // range tombstone added to output file within each subcompaction is in |
398 | | // increasing key range. |
399 | | std::vector<size_t> level_ptrs_; |
400 | | }; |
401 | | |
402 | | // helper struct to concatenate the last level and proximal level outputs |
403 | | // which could be replaced by std::ranges::join_view() in c++20 |
404 | | struct OutputIterator { |
405 | | public: |
406 | | explicit OutputIterator(const std::vector<CompactionOutputs::Output>& a, |
407 | | const std::vector<CompactionOutputs::Output>& b) |
408 | 7.14k | : a_(a), b_(b) { |
409 | 7.14k | within_a = !a_.empty(); |
410 | 7.14k | idx_ = 0; |
411 | 7.14k | } |
412 | | |
413 | 7.14k | OutputIterator begin() { return *this; } |
414 | | |
415 | 7.14k | OutputIterator end() { return *this; } |
416 | | |
417 | 0 | size_t size() { return a_.size() + b_.size(); } |
418 | | |
419 | 3.59k | const CompactionOutputs::Output& operator*() const { |
420 | 3.59k | return within_a ? a_[idx_] : b_[idx_]; |
421 | 3.59k | } |
422 | | |
423 | 3.59k | OutputIterator& operator++() { |
424 | 3.59k | idx_++; |
425 | 3.59k | if (within_a && idx_ >= a_.size()) { |
426 | 0 | within_a = false; |
427 | 0 | idx_ = 0; |
428 | 0 | } |
429 | 3.59k | assert(within_a || idx_ <= b_.size()); |
430 | 3.59k | return *this; |
431 | 3.59k | } |
432 | | |
433 | 10.7k | bool operator!=(const OutputIterator& /*rhs*/) const { |
434 | 10.7k | return within_a || idx_ < b_.size(); |
435 | 10.7k | } |
436 | | |
437 | | private: |
438 | | const std::vector<CompactionOutputs::Output>& a_; |
439 | | const std::vector<CompactionOutputs::Output>& b_; |
440 | | bool within_a; |
441 | | size_t idx_; |
442 | | }; |
443 | | |
444 | | } // namespace ROCKSDB_NAMESPACE |