/src/rocksdb/db/compaction/compaction_picker_fifo.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/compaction/compaction_picker_fifo.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <cinttypes> |
14 | | #include <string> |
15 | | #include <vector> |
16 | | |
17 | | #include "db/column_family.h" |
18 | | #include "logging/log_buffer.h" |
19 | | #include "logging/logging.h" |
20 | | #include "options/options_helper.h" |
21 | | #include "rocksdb/listener.h" |
22 | | #include "rocksdb/statistics.h" |
23 | | #include "rocksdb/status.h" |
24 | | #include "util/string_util.h" |
25 | | |
26 | | namespace ROCKSDB_NAMESPACE { |
27 | | namespace { |
28 | 0 | uint64_t GetTotalFilesSize(const std::vector<FileMetaData*>& files) { |
29 | 0 | uint64_t total_size = 0; |
30 | 0 | for (const auto& f : files) { |
31 | 0 | total_size += f->fd.file_size; |
32 | 0 | } |
33 | 0 | return total_size; |
34 | 0 | } |
35 | | |
36 | | // Compute effective data size and capacity limit for FIFO compaction. |
37 | | // When max_data_files_size > 0 (blob-aware mode), the effective size includes |
38 | | // both SST and blob file sizes, and the limit is max_data_files_size. |
39 | | // Otherwise, only SST sizes are used with max_table_files_size as the limit. |
40 | | void GetEffectiveSizeAndLimit(const CompactionOptionsFIFO& fifo_opts, |
41 | | uint64_t total_sst_size, uint64_t total_blob_size, |
42 | | uint64_t* effective_size, |
43 | 0 | uint64_t* effective_max) { |
44 | 0 | *effective_size = total_sst_size; |
45 | 0 | *effective_max = fifo_opts.max_table_files_size; |
46 | 0 | if (fifo_opts.max_data_files_size > 0) { |
47 | 0 | *effective_size += total_blob_size; |
48 | 0 | *effective_max = fifo_opts.max_data_files_size; |
49 | 0 | } |
50 | 0 | } |
51 | | |
52 | | // Return the effective capacity limit for FIFO compaction. |
53 | | // Convenience wrapper when only the limit is needed (e.g., PickTTLCompaction). |
54 | 0 | uint64_t GetEffectiveMax(const CompactionOptionsFIFO& fifo_opts) { |
55 | 0 | return fifo_opts.max_data_files_size > 0 ? fifo_opts.max_data_files_size |
56 | 0 | : fifo_opts.max_table_files_size; |
57 | 0 | } |
58 | | } // anonymous namespace |
59 | | |
60 | | bool FIFOCompactionPicker::NeedsCompaction( |
61 | 0 | const VersionStorageInfo* vstorage) const { |
62 | 0 | const int kLevel0 = 0; |
63 | 0 | return vstorage->CompactionScore(kLevel0) >= 1; |
64 | 0 | } |
65 | | |
66 | | Compaction* FIFOCompactionPicker::PickTTLCompaction( |
67 | | const std::string& cf_name, const MutableCFOptions& mutable_cf_options, |
68 | | const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, |
69 | 0 | LogBuffer* log_buffer) { |
70 | 0 | assert(mutable_cf_options.ttl > 0); |
71 | |
|
72 | 0 | const int kLevel0 = 0; |
73 | 0 | const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0); |
74 | 0 | uint64_t total_size = GetTotalFilesSize(level_files); |
75 | |
|
76 | 0 | int64_t _current_time; |
77 | 0 | auto status = ioptions_.clock->GetCurrentTime(&_current_time); |
78 | 0 | if (!status.ok()) { |
79 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
80 | 0 | "[%s] FIFO compaction: Couldn't get current time: %s. " |
81 | 0 | "Not doing compactions based on TTL. ", |
82 | 0 | cf_name.c_str(), status.ToString().c_str()); |
83 | 0 | return nullptr; |
84 | 0 | } |
85 | 0 | const uint64_t current_time = static_cast<uint64_t>(_current_time); |
86 | |
|
87 | 0 | if (!level0_compactions_in_progress_.empty()) { |
88 | 0 | ROCKS_LOG_BUFFER( |
89 | 0 | log_buffer, |
90 | 0 | "[%s] FIFO compaction: Already executing compaction. No need " |
91 | 0 | "to run parallel compactions since compactions are very fast", |
92 | 0 | cf_name.c_str()); |
93 | 0 | return nullptr; |
94 | 0 | } |
95 | | |
96 | 0 | std::vector<CompactionInputFiles> inputs; |
97 | 0 | inputs.emplace_back(); |
98 | 0 | inputs[0].level = 0; |
99 | | |
100 | | // avoid underflow |
101 | 0 | if (current_time > mutable_cf_options.ttl) { |
102 | 0 | for (auto ritr = level_files.rbegin(); ritr != level_files.rend(); ++ritr) { |
103 | 0 | FileMetaData* f = *ritr; |
104 | 0 | assert(f); |
105 | 0 | TableReader* reader = f->fd.pinned_reader.Get(); |
106 | 0 | if (reader != nullptr && reader->GetTableProperties() != nullptr) { |
107 | 0 | uint64_t newest_key_time = f->TryGetNewestKeyTime(); |
108 | 0 | uint64_t creation_time = reader->GetTableProperties()->creation_time; |
109 | 0 | uint64_t est_newest_key_time = newest_key_time == kUnknownNewestKeyTime |
110 | 0 | ? creation_time |
111 | 0 | : newest_key_time; |
112 | 0 | if (est_newest_key_time == kUnknownNewestKeyTime || |
113 | 0 | est_newest_key_time >= (current_time - mutable_cf_options.ttl)) { |
114 | 0 | break; |
115 | 0 | } |
116 | 0 | } |
117 | 0 | total_size -= f->fd.file_size; |
118 | 0 | inputs[0].files.push_back(f); |
119 | 0 | } |
120 | 0 | } |
121 | | |
122 | | // Return a nullptr and proceed to size-based FIFO compaction if: |
123 | | // 1. there are no files older than ttl OR |
124 | | // 2. there are a few files older than ttl, but deleting them will not bring |
125 | | // the total size to be less than the size threshold. |
126 | 0 | uint64_t effective_max = |
127 | 0 | GetEffectiveMax(mutable_cf_options.compaction_options_fifo); |
128 | | // Estimate the effective remaining data after dropping TTL-expired SSTs. |
129 | | // Each dropped SST also frees a proportional share of blob data. |
130 | | // |
131 | | // In multi-level FIFO (migration), we must use total SST across ALL levels |
132 | | // as the reference, because total_blob covers all levels. Using only L0 |
133 | | // SST would inflate the blob estimate. |
134 | 0 | uint64_t effective_remaining = total_size; |
135 | 0 | if (mutable_cf_options.compaction_options_fifo.max_data_files_size > 0) { |
136 | 0 | uint64_t total_blob = vstorage->GetBlobStats().total_file_size; |
137 | | // Compute total SST across all levels so the reference scope matches |
138 | | // total_blob's scope (all levels). |
139 | 0 | uint64_t total_sst_all_levels = GetTotalFilesSize(level_files); |
140 | 0 | for (int level = 1; level < vstorage->num_levels(); ++level) { |
141 | 0 | total_sst_all_levels += GetTotalFilesSize(vstorage->LevelFiles(level)); |
142 | 0 | } |
143 | | // remaining_sst_all = total_sst_all - dropped_l0_sst |
144 | | // total_size is the remaining L0 SST after removing expired files; |
145 | | // original L0 SST minus remaining L0 SST = dropped. |
146 | 0 | uint64_t original_l0_sst = GetTotalFilesSize(level_files); |
147 | 0 | uint64_t dropped_sst = original_l0_sst - total_size; |
148 | 0 | uint64_t remaining_sst_all = total_sst_all_levels - dropped_sst; |
149 | | // Proportional blob estimate: each SST byte "owns" a proportional |
150 | | // share of blob bytes. Both reference sizes must come from the same |
151 | | // scope (all levels) to avoid inflated estimates. |
152 | 0 | if (total_sst_all_levels > 0 && total_blob > 0) { |
153 | 0 | effective_remaining = |
154 | 0 | remaining_sst_all + |
155 | 0 | static_cast<uint64_t>(static_cast<double>(remaining_sst_all) / |
156 | 0 | total_sst_all_levels * total_blob); |
157 | 0 | } else { |
158 | 0 | effective_remaining = remaining_sst_all; |
159 | 0 | } |
160 | 0 | } |
161 | 0 | if (inputs[0].files.empty() || effective_remaining > effective_max) { |
162 | 0 | return nullptr; |
163 | 0 | } |
164 | | |
165 | 0 | for (const auto& f : inputs[0].files) { |
166 | 0 | assert(f); |
167 | 0 | uint64_t newest_key_time = f->TryGetNewestKeyTime(); |
168 | 0 | uint64_t creation_time = 0; |
169 | 0 | TableReader* reader = f->fd.pinned_reader.Get(); |
170 | 0 | if (reader != nullptr && reader->GetTableProperties() != nullptr) { |
171 | 0 | creation_time = reader->GetTableProperties()->creation_time; |
172 | 0 | } |
173 | 0 | uint64_t est_newest_key_time = newest_key_time == kUnknownNewestKeyTime |
174 | 0 | ? creation_time |
175 | 0 | : newest_key_time; |
176 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
177 | 0 | "[%s] FIFO compaction: picking file %" PRIu64 |
178 | 0 | " with estimated newest key time %" PRIu64 " for deletion", |
179 | 0 | cf_name.c_str(), f->fd.GetNumber(), est_newest_key_time); |
180 | 0 | } |
181 | |
|
182 | 0 | Compaction* c = new Compaction( |
183 | 0 | vstorage, ioptions_, mutable_cf_options, mutable_db_options, |
184 | 0 | std::move(inputs), 0, 0, 0, 0, kNoCompression, |
185 | 0 | mutable_cf_options.compression_opts, Temperature::kUnknown, |
186 | 0 | /* max_subcompactions */ 0, {}, /* earliest_snapshot */ std::nullopt, |
187 | 0 | /* snapshot_checker */ nullptr, CompactionReason::kFIFOTtl, |
188 | 0 | /* trim_ts */ "", vstorage->CompactionScore(0), |
189 | 0 | /* l0_files_might_overlap */ true); |
190 | 0 | return c; |
191 | 0 | } |
192 | | |
193 | | // The size-based compaction picker for FIFO. |
194 | | // |
195 | | // When the entire column family size exceeds max_table_files_size, FIFO will |
196 | | // try to delete the oldest sst file(s) until the resulting column family size |
197 | | // is smaller than max_table_files_size. |
198 | | // |
199 | | // This function also takes care the case where a DB is migrating from level / |
200 | | // universal compaction to FIFO compaction. During the migration, the column |
201 | | // family will also have non-L0 files while FIFO can only create L0 files. |
202 | | // In this case, this function will first purge the sst files in the bottom- |
203 | | // most non-empty level first, and the DB will eventually converge to the |
204 | | // regular FIFO case where there're only L0 files. Note that during the |
205 | | // migration case, the purge order will only be an approximation of "FIFO" |
206 | | // as entries inside lower-level files might sometimes be newer than some |
207 | | // entries inside upper-level files. |
208 | | Compaction* FIFOCompactionPicker::PickSizeCompaction( |
209 | | const std::string& cf_name, const MutableCFOptions& mutable_cf_options, |
210 | | const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, |
211 | 0 | LogBuffer* log_buffer) { |
212 | 0 | const auto& fifo_opts = mutable_cf_options.compaction_options_fifo; |
213 | | |
214 | | // compute the total SST size and identify the last non-empty level |
215 | 0 | int last_level = 0; |
216 | 0 | uint64_t total_size = 0; |
217 | 0 | for (int level = 0; level < vstorage->num_levels(); ++level) { |
218 | 0 | auto level_size = GetTotalFilesSize(vstorage->LevelFiles(level)); |
219 | 0 | total_size += level_size; |
220 | 0 | if (level_size > 0) { |
221 | 0 | last_level = level; |
222 | 0 | } |
223 | 0 | } |
224 | 0 | const std::vector<FileMetaData*>& last_level_files = |
225 | 0 | vstorage->LevelFiles(last_level); |
226 | | |
227 | | // Compute effective size and limit for comparison. |
228 | 0 | uint64_t effective_size, effective_max; |
229 | 0 | GetEffectiveSizeAndLimit(fifo_opts, total_size, |
230 | 0 | vstorage->GetBlobStats().total_file_size, |
231 | 0 | &effective_size, &effective_max); |
232 | |
|
233 | 0 | if (last_level == 0 && effective_size <= effective_max) { |
234 | 0 | return nullptr; |
235 | 0 | } |
236 | | |
237 | 0 | if (!level0_compactions_in_progress_.empty()) { |
238 | 0 | ROCKS_LOG_BUFFER( |
239 | 0 | log_buffer, |
240 | 0 | "[%s] FIFO compaction: Already executing compaction. No need " |
241 | 0 | "to run parallel compactions since compactions are very fast", |
242 | 0 | cf_name.c_str()); |
243 | 0 | return nullptr; |
244 | 0 | } |
245 | | |
246 | 0 | std::vector<CompactionInputFiles> inputs; |
247 | 0 | inputs.emplace_back(); |
248 | 0 | inputs[0].level = last_level; |
249 | |
|
250 | 0 | if (last_level == 0) { |
251 | | // When using blob-aware sizing, use proportional estimation (same |
252 | | // principle as EstimateTotalDataForSST): each SST "owns" |
253 | | // effective_size / num_files of total data. This is an approximation |
254 | | // -- individual SSTs may reference different amounts of blob data, |
255 | | // but uniform distribution is a reasonable estimate for FIFO dropping. |
256 | 0 | uint64_t remaining_size = effective_size; |
257 | 0 | const uint64_t num_files = last_level_files.size(); |
258 | | // Proportional estimate of data per file (SST + blob). |
259 | | // Use max(1) to prevent stalling when effective_size < num_files. |
260 | 0 | const uint64_t data_per_file = |
261 | 0 | (fifo_opts.max_data_files_size > 0 && num_files > 0) |
262 | 0 | ? std::max(effective_size / num_files, uint64_t{1}) |
263 | 0 | : 0; |
264 | | |
265 | | // In L0, right-most files are the oldest files. |
266 | 0 | for (auto ritr = last_level_files.rbegin(); ritr != last_level_files.rend(); |
267 | 0 | ++ritr) { |
268 | 0 | auto f = *ritr; |
269 | 0 | if (fifo_opts.max_data_files_size > 0) { |
270 | 0 | remaining_size -= std::min(remaining_size, data_per_file); |
271 | 0 | } else { |
272 | 0 | remaining_size -= std::min(remaining_size, f->fd.file_size); |
273 | 0 | } |
274 | 0 | inputs[0].files.push_back(f); |
275 | 0 | char tmp_fsize[16]; |
276 | 0 | AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); |
277 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
278 | 0 | "[%s] FIFO compaction: picking file %" PRIu64 |
279 | 0 | " with size %s for deletion", |
280 | 0 | cf_name.c_str(), f->fd.GetNumber(), tmp_fsize); |
281 | 0 | if (remaining_size <= effective_max) { |
282 | 0 | break; |
283 | 0 | } |
284 | 0 | } |
285 | 0 | } else if (effective_size > effective_max) { |
286 | | // If the last level is non-L0, we actually don't know which file is |
287 | | // logically the oldest since the file creation time only represents |
288 | | // when this file was compacted to this level, which is independent |
289 | | // to when the entries in this file were first inserted. |
290 | | // |
291 | | // As a result, we delete files from the left instead. This means the sst |
292 | | // file with the smallest key will be deleted first. This design decision |
293 | | // better serves a major type of FIFO use cases where smaller keys are |
294 | | // associated with older data. |
295 | 0 | const uint64_t num_files = last_level_files.size(); |
296 | | // Proportional estimate of data per file (SST + blob), same as L0 path. |
297 | 0 | const uint64_t data_per_file = |
298 | 0 | (fifo_opts.max_data_files_size > 0 && num_files > 0) |
299 | 0 | ? std::max(effective_size / num_files, uint64_t{1}) |
300 | 0 | : 0; |
301 | 0 | for (const auto& f : last_level_files) { |
302 | 0 | if (f->being_compacted) { |
303 | 0 | continue; |
304 | 0 | } |
305 | 0 | if (fifo_opts.max_data_files_size > 0) { |
306 | 0 | effective_size -= std::min(effective_size, data_per_file); |
307 | 0 | } else { |
308 | 0 | effective_size -= std::min(effective_size, f->fd.file_size); |
309 | 0 | } |
310 | 0 | inputs[0].files.push_back(f); |
311 | 0 | char tmp_fsize[16]; |
312 | 0 | AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); |
313 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
314 | 0 | "[%s] FIFO compaction: picking file %" PRIu64 |
315 | 0 | " with size %s for deletion under total size %" PRIu64 |
316 | 0 | " vs max size %" PRIu64, |
317 | 0 | cf_name.c_str(), f->fd.GetNumber(), tmp_fsize, |
318 | 0 | effective_size, effective_max); |
319 | |
|
320 | 0 | if (effective_size <= effective_max) { |
321 | 0 | break; |
322 | 0 | } |
323 | 0 | } |
324 | 0 | } else { |
325 | 0 | return nullptr; |
326 | 0 | } |
327 | | |
328 | 0 | Compaction* c = new Compaction( |
329 | 0 | vstorage, ioptions_, mutable_cf_options, mutable_db_options, |
330 | 0 | std::move(inputs), last_level, |
331 | 0 | /* target_file_size */ 0, |
332 | 0 | /* max_compaction_bytes */ 0, |
333 | 0 | /* output_path_id */ 0, kNoCompression, |
334 | 0 | mutable_cf_options.compression_opts, Temperature::kUnknown, |
335 | 0 | /* max_subcompactions */ 0, {}, /* earliest_snapshot */ std::nullopt, |
336 | 0 | /* snapshot_checker */ nullptr, CompactionReason::kFIFOMaxSize, |
337 | 0 | /* trim_ts */ "", vstorage->CompactionScore(0), |
338 | 0 | /* l0_files_might_overlap */ true); |
339 | 0 | return c; |
340 | 0 | } |
341 | | |
342 | | Compaction* FIFOCompactionPicker::PickTemperatureChangeCompaction( |
343 | | const std::string& cf_name, const MutableCFOptions& mutable_cf_options, |
344 | | const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, |
345 | 0 | LogBuffer* log_buffer) const { |
346 | 0 | const std::vector<FileTemperatureAge>& ages = |
347 | 0 | mutable_cf_options.compaction_options_fifo |
348 | 0 | .file_temperature_age_thresholds; |
349 | 0 | if (ages.empty()) { |
350 | 0 | return nullptr; |
351 | 0 | } |
352 | | |
353 | | // Does not apply to multi-level FIFO. |
354 | 0 | if (vstorage->num_levels() > 1) { |
355 | 0 | return nullptr; |
356 | 0 | } |
357 | | |
358 | 0 | const int kLevel0 = 0; |
359 | 0 | const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(kLevel0); |
360 | 0 | if (level_files.empty()) { |
361 | 0 | return nullptr; |
362 | 0 | } |
363 | | |
364 | 0 | int64_t _current_time; |
365 | 0 | auto status = ioptions_.clock->GetCurrentTime(&_current_time); |
366 | 0 | if (!status.ok()) { |
367 | 0 | ROCKS_LOG_BUFFER( |
368 | 0 | log_buffer, |
369 | 0 | "[%s] FIFO compaction: Couldn't get current time: %s. " |
370 | 0 | "Not doing compactions based on file temperature-age threshold. ", |
371 | 0 | cf_name.c_str(), status.ToString().c_str()); |
372 | 0 | return nullptr; |
373 | 0 | } |
374 | 0 | const uint64_t current_time = static_cast<uint64_t>(_current_time); |
375 | |
|
376 | 0 | if (!level0_compactions_in_progress_.empty()) { |
377 | 0 | ROCKS_LOG_BUFFER( |
378 | 0 | log_buffer, |
379 | 0 | "[%s] FIFO compaction: Already executing compaction. Parallel " |
380 | 0 | "compactions are not supported", |
381 | 0 | cf_name.c_str()); |
382 | 0 | return nullptr; |
383 | 0 | } |
384 | | |
385 | 0 | std::vector<CompactionInputFiles> inputs; |
386 | 0 | inputs.emplace_back(); |
387 | 0 | inputs[0].level = 0; |
388 | | |
389 | | // avoid underflow |
390 | 0 | uint64_t min_age = ages[0].age; |
391 | | // kLastTemperature means target temperature is to be determined. |
392 | 0 | Temperature compaction_target_temp = Temperature::kLastTemperature; |
393 | 0 | if (current_time > min_age) { |
394 | 0 | uint64_t create_time_threshold = current_time - min_age; |
395 | 0 | assert(level_files.size() >= 1); |
396 | 0 | for (size_t index = level_files.size(); index >= 1; --index) { |
397 | | // Try to add cur_file to compaction inputs. |
398 | 0 | FileMetaData* cur_file = level_files[index - 1]; |
399 | 0 | FileMetaData* prev_file = index < 2 ? nullptr : level_files[index - 2]; |
400 | 0 | if (cur_file->being_compacted) { |
401 | | // Should not happen since we check for |
402 | | // `level0_compactions_in_progress_` above. Here we simply just don't |
403 | | // schedule anything. |
404 | 0 | return nullptr; |
405 | 0 | } |
406 | 0 | uint64_t est_newest_key_time = cur_file->TryGetNewestKeyTime(prev_file); |
407 | | // Newer file could have newest_key_time populated |
408 | 0 | if (est_newest_key_time == kUnknownNewestKeyTime) { |
409 | 0 | continue; |
410 | 0 | } |
411 | 0 | if (est_newest_key_time > create_time_threshold) { |
412 | 0 | break; |
413 | 0 | } |
414 | 0 | Temperature cur_target_temp = ages[0].temperature; |
415 | 0 | for (size_t i = 1; i < ages.size(); ++i) { |
416 | 0 | if (current_time >= ages[i].age && |
417 | 0 | est_newest_key_time <= current_time - ages[i].age) { |
418 | 0 | cur_target_temp = ages[i].temperature; |
419 | 0 | } |
420 | 0 | } |
421 | 0 | if (cur_file->temperature == cur_target_temp) { |
422 | 0 | continue; |
423 | 0 | } |
424 | | |
425 | | // cur_file needs to change temperature |
426 | 0 | assert(compaction_target_temp == Temperature::kLastTemperature); |
427 | 0 | compaction_target_temp = cur_target_temp; |
428 | 0 | inputs[0].files.push_back(cur_file); |
429 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
430 | 0 | "[%s] FIFO compaction: picking file %" PRIu64 |
431 | 0 | " with estimated newest key time %" PRIu64 |
432 | 0 | " and temperature %s for temperature %s.", |
433 | 0 | cf_name.c_str(), cur_file->fd.GetNumber(), |
434 | 0 | est_newest_key_time, |
435 | 0 | temperature_to_string[cur_file->temperature].c_str(), |
436 | 0 | temperature_to_string[cur_target_temp].c_str()); |
437 | 0 | break; |
438 | 0 | } |
439 | 0 | } |
440 | | |
441 | 0 | if (inputs[0].files.empty()) { |
442 | 0 | return nullptr; |
443 | 0 | } |
444 | 0 | assert(compaction_target_temp != Temperature::kLastTemperature); |
445 | | // Only compact one file at a time. |
446 | 0 | assert(inputs.size() == 1); |
447 | 0 | assert(inputs[0].size() == 1); |
448 | 0 | Compaction* c = new Compaction( |
449 | 0 | vstorage, ioptions_, mutable_cf_options, mutable_db_options, |
450 | 0 | std::move(inputs), 0, 0 /* output file size limit */, |
451 | 0 | 0 /* max compaction bytes, not applicable */, 0 /* output path ID */, |
452 | 0 | mutable_cf_options.compression, mutable_cf_options.compression_opts, |
453 | 0 | compaction_target_temp, |
454 | 0 | /* max_subcompactions */ 0, {}, /* earliest_snapshot */ std::nullopt, |
455 | 0 | /* snapshot_checker */ nullptr, CompactionReason::kChangeTemperature, |
456 | 0 | /* trim_ts */ "", vstorage->CompactionScore(0), |
457 | 0 | /* l0_files_might_overlap */ true); |
458 | 0 | return c; |
459 | 0 | } |
460 | | |
461 | | Compaction* FIFOCompactionPicker::PickIntraL0Compaction( |
462 | | const std::string& cf_name, const MutableCFOptions& mutable_cf_options, |
463 | | const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, |
464 | 0 | LogBuffer* log_buffer) { |
465 | 0 | const auto& fifo_opts = mutable_cf_options.compaction_options_fifo; |
466 | |
|
467 | 0 | if (!fifo_opts.allow_compaction) { |
468 | 0 | return nullptr; |
469 | 0 | } |
470 | | |
471 | 0 | const std::vector<FileMetaData*>& level0_files = vstorage->LevelFiles(0); |
472 | 0 | if (level0_files.empty()) { |
473 | 0 | return nullptr; |
474 | 0 | } |
475 | | |
476 | 0 | if (fifo_opts.use_kv_ratio_compaction) { |
477 | 0 | if (fifo_opts.max_data_files_size == 0) { |
478 | 0 | ROCKS_LOG_BUFFER( |
479 | 0 | log_buffer, |
480 | 0 | "[%s] FIFO kv-ratio compaction: skipping -- " |
481 | 0 | "max_data_files_size is 0, cannot compute target file size. ", |
482 | 0 | cf_name.c_str()); |
483 | 0 | } else if (fifo_opts.max_data_files_size < fifo_opts.max_table_files_size) { |
484 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
485 | 0 | "[%s] FIFO kv-ratio compaction: skipping -- " |
486 | 0 | "max_data_files_size (%" PRIu64 |
487 | 0 | ") < max_table_files_size " |
488 | 0 | "(%" PRIu64 ").", |
489 | 0 | cf_name.c_str(), fifo_opts.max_data_files_size, |
490 | 0 | fifo_opts.max_table_files_size); |
491 | 0 | } else { |
492 | 0 | return PickRatioBasedIntraL0Compaction(cf_name, mutable_cf_options, |
493 | 0 | mutable_db_options, vstorage, |
494 | 0 | log_buffer); |
495 | 0 | } |
496 | 0 | ROCKS_LOG_BUFFER( |
497 | 0 | log_buffer, "[%s] FIFO: falling back to cost-based intra-L0 compaction", |
498 | 0 | cf_name.c_str()); |
499 | 0 | } |
500 | | |
501 | | // Old intra-L0 path: merge small files using PickCostBasedIntraL0Compaction. |
502 | | // Minimum files to compact follows level0_file_num_compaction_trigger. |
503 | | // Try to prevent same files from being compacted multiple times, which |
504 | | // could produce large files that may never TTL-expire. Achieve this by |
505 | | // disallowing compactions with files larger than memtable (inflate its |
506 | | // size by 10% to account for uncompressed L0 files that may have size |
507 | | // slightly greater than memtable size limit). |
508 | | |
509 | 0 | CompactionInputFiles comp_inputs; |
510 | 0 | size_t max_compact_bytes_per_del_file = |
511 | 0 | static_cast<size_t>(MultiplyCheckOverflow( |
512 | 0 | static_cast<uint64_t>(mutable_cf_options.write_buffer_size), 1.1)); |
513 | 0 | if (PickCostBasedIntraL0Compaction( |
514 | 0 | level0_files, |
515 | 0 | mutable_cf_options |
516 | 0 | .level0_file_num_compaction_trigger /* min_files_to_compact */, |
517 | 0 | max_compact_bytes_per_del_file, |
518 | 0 | mutable_cf_options.max_compaction_bytes, &comp_inputs)) { |
519 | 0 | Compaction* c = new Compaction( |
520 | 0 | vstorage, ioptions_, mutable_cf_options, mutable_db_options, |
521 | 0 | {comp_inputs}, 0, 16 * 1024 * 1024 /* output file size limit */, |
522 | 0 | 0 /* max compaction bytes, not applicable */, 0 /* output path ID */, |
523 | 0 | mutable_cf_options.compression, mutable_cf_options.compression_opts, |
524 | 0 | Temperature::kUnknown, 0 /* max_subcompactions */, {}, |
525 | 0 | /* earliest_snapshot */ std::nullopt, |
526 | 0 | /* snapshot_checker */ nullptr, CompactionReason::kFIFOReduceNumFiles, |
527 | 0 | /* trim_ts */ "", vstorage->CompactionScore(0), |
528 | 0 | /* l0_files_might_overlap */ true); |
529 | 0 | return c; |
530 | 0 | } |
531 | | |
532 | 0 | return nullptr; |
533 | 0 | } |
534 | | |
535 | | Compaction* FIFOCompactionPicker::PickRatioBasedIntraL0Compaction( |
536 | | const std::string& cf_name, const MutableCFOptions& mutable_cf_options, |
537 | | const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, |
538 | 0 | LogBuffer* log_buffer) { |
539 | 0 | const auto& fifo_opts = mutable_cf_options.compaction_options_fifo; |
540 | 0 | assert(fifo_opts.use_kv_ratio_compaction); |
541 | 0 | assert(fifo_opts.max_data_files_size > 0); |
542 | | |
543 | | // During migration from level/universal compaction to FIFO, non-L0 levels |
544 | | // may still contain files. The ratio-based algorithm only operates on L0, |
545 | | // so skip it until PickSizeCompaction has drained all non-L0 levels. |
546 | | // Once levels collapse to L0-only, this algorithm will kick in. |
547 | 0 | for (int level = 1; level < vstorage->num_levels(); ++level) { |
548 | 0 | if (!vstorage->LevelFiles(level).empty()) { |
549 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
550 | 0 | "[%s] FIFO kv-ratio compaction: skipping -- non-L0 " |
551 | 0 | "level %d still has %" ROCKSDB_PRIszt |
552 | 0 | " files (migration in progress)", |
553 | 0 | cf_name.c_str(), level, |
554 | 0 | vstorage->LevelFiles(level).size()); |
555 | 0 | return nullptr; |
556 | 0 | } |
557 | 0 | } |
558 | | |
559 | 0 | if (!level0_compactions_in_progress_.empty()) { |
560 | 0 | return nullptr; |
561 | 0 | } |
562 | | |
563 | 0 | const std::vector<FileMetaData*>& level0_files = vstorage->LevelFiles(0); |
564 | 0 | if (mutable_cf_options.level0_file_num_compaction_trigger <= 1) { |
565 | | // trigger <= 0 is invalid; trigger == 1 means compact after every flush, |
566 | | // which doesn't make sense for tiered merging (the tier boundary loop |
567 | | // divides by trigger, so trigger == 1 would cause an infinite loop). |
568 | 0 | return nullptr; |
569 | 0 | } |
570 | 0 | const size_t trigger = static_cast<size_t>( |
571 | 0 | mutable_cf_options.level0_file_num_compaction_trigger); |
572 | 0 | if (level0_files.size() < trigger) { |
573 | 0 | return nullptr; |
574 | 0 | } |
575 | | |
576 | | // Determine the target compacted file size. |
577 | | // |
578 | | // When max_compaction_bytes > 0 (explicitly set by user), use it directly |
579 | | // as the target. This allows users to override the auto-calculated value. |
580 | | // |
581 | | // When max_compaction_bytes == 0 (default), auto-calculate from the data |
582 | | // capacity and observed SST/blob ratio: |
583 | | // target = max_data_files_size * sst_ratio / trigger |
584 | | // |
585 | | // This is recomputed on every PickCompaction call. The computation is |
586 | | // trivial (sum file sizes + arithmetic) and PickCompaction is only called |
587 | | // once per flush or compaction completion, so no caching is needed. |
588 | 0 | uint64_t target = 0; |
589 | 0 | if (mutable_cf_options.max_compaction_bytes > 0) { |
590 | | // User explicitly set max_compaction_bytes -- use it as target |
591 | 0 | target = mutable_cf_options.max_compaction_bytes; |
592 | 0 | } else { |
593 | | // Auto-calculate from capacity and observed SST/blob ratio |
594 | 0 | uint64_t total_sst = GetTotalFilesSize(level0_files); |
595 | 0 | uint64_t total_blob = vstorage->GetBlobStats().total_file_size; |
596 | 0 | uint64_t total_data = total_sst + total_blob; |
597 | |
|
598 | 0 | if (total_data == 0 || total_sst == 0) { |
599 | 0 | return nullptr; |
600 | 0 | } |
601 | | |
602 | | // Compute sst_ratio (inverse of EstimateTotalDataForSST's proportion): |
603 | | // when no blob files exist, sst_ratio is 1.0 and the target becomes |
604 | | // max_data_files_size / trigger, which is large. The algorithm will |
605 | | // naturally not find small enough files to compact. |
606 | 0 | double sst_ratio = |
607 | 0 | (total_blob > 0) ? static_cast<double>(total_sst) / total_data : 1.0; |
608 | |
|
609 | 0 | uint64_t total_sst_at_cap = |
610 | 0 | static_cast<uint64_t>(fifo_opts.max_data_files_size * sst_ratio); |
611 | 0 | target = total_sst_at_cap / trigger; |
612 | |
|
613 | 0 | ROCKS_LOG_BUFFER(log_buffer, |
614 | 0 | "[%s] FIFO ratio-based compaction: sst_ratio=%.4f, " |
615 | 0 | "target_file_size=%" PRIu64, |
616 | 0 | cf_name.c_str(), sst_ratio, target); |
617 | 0 | } |
618 | 0 | if (target == 0) { |
619 | 0 | return nullptr; |
620 | 0 | } |
621 | | |
622 | | // Tiered size-based file selection. |
623 | | // |
624 | | // Tier boundaries form a geometric sequence descending from target: |
625 | | // ..., target/trigger^2, target/trigger, target |
626 | | // For each boundary (smallest first), find contiguous L0 files with |
627 | | // size < boundary. If their accumulated bytes >= boundary, merge them. |
628 | | // The output (~boundary bytes) advances to the next tier. Files that |
629 | | // reach target are "graduated" and never compacted again. |
630 | | // |
631 | | // Trade-off: write amplification vs L0 file count. |
632 | | // |
633 | | // Write amp: O(log(target/flush) / log(trigger)) per byte, instead of |
634 | | // O(target / (trigger * flush)) from flat merging. Each byte is |
635 | | // rewritten once per tier crossing. |
636 | | // |
637 | | // L0 file count: trigger + k * (trigger - 1) at steady state, where |
638 | | // k = ceil(log(target/flush) / log(trigger)). This is higher than |
639 | | // the original trigger target because intermediate tier files |
640 | | // accumulate while waiting for the next tier merge. The trade-off |
641 | | // is explicit: more L0 files in exchange for logarithmic (instead |
642 | | // of linear) write amplification. |
643 | | |
644 | | // Build tier boundaries from smallest to largest. |
645 | | // Stop at 10KB minimum -- SST files of most workloads are larger than |
646 | | // this, so lower boundaries would only waste CPU scanning L0 files. |
647 | | // Files smaller than the lowest boundary simply merge at that boundary. |
648 | 0 | static constexpr uint64_t kMinTierBoundary = 10 * 1024; // 10KB |
649 | 0 | std::vector<uint64_t> boundaries; |
650 | 0 | for (uint64_t b = target; b >= kMinTierBoundary; b /= trigger) { |
651 | 0 | boundaries.push_back(b); |
652 | 0 | } |
653 | 0 | if (boundaries.empty()) { |
654 | | // target itself is below kMinTierBoundary -- use target as the |
655 | | // sole boundary so we can still compact at the target size. |
656 | 0 | boundaries.push_back(target); |
657 | 0 | } |
658 | 0 | std::reverse(boundaries.begin(), boundaries.end()); |
659 | | |
660 | | // For each tier boundary (smallest first), scan L0 for mergeable batches. |
661 | | // L0 files are stored newest-first; oldest is at the end. |
662 | 0 | for (const uint64_t boundary : boundaries) { |
663 | 0 | for (size_t scan = level0_files.size(); scan > 0;) { |
664 | | // Skip files >= boundary (they belong to higher tiers) or in-progress |
665 | 0 | if (level0_files[scan - 1]->fd.file_size >= boundary || |
666 | 0 | level0_files[scan - 1]->being_compacted) { |
667 | 0 | --scan; |
668 | 0 | continue; |
669 | 0 | } |
670 | | |
671 | | // Found a file < boundary -- collect contiguous batch |
672 | 0 | std::vector<FileMetaData*> batch; |
673 | 0 | uint64_t accumulated = 0; |
674 | 0 | size_t pos = scan; |
675 | 0 | while (pos > 0 && level0_files[pos - 1]->fd.file_size < boundary && |
676 | 0 | !level0_files[pos - 1]->being_compacted) { |
677 | | // Don't let output exceed 2x boundary (prevent tier-skipping) |
678 | 0 | if (accumulated >= boundary && |
679 | 0 | accumulated + level0_files[pos - 1]->fd.file_size > boundary * 2) { |
680 | 0 | break; |
681 | 0 | } |
682 | 0 | batch.push_back(level0_files[pos - 1]); |
683 | 0 | accumulated += level0_files[pos - 1]->fd.file_size; |
684 | 0 | --pos; |
685 | 0 | } |
686 | | |
687 | | // Viable: >= 2 files and accumulated >= boundary |
688 | 0 | if (batch.size() >= 2 && accumulated >= boundary) { |
689 | 0 | CompactionInputFiles comp_inputs; |
690 | 0 | comp_inputs.level = 0; |
691 | 0 | comp_inputs.files = std::move(batch); |
692 | |
|
693 | 0 | ROCKS_LOG_BUFFER( |
694 | 0 | log_buffer, |
695 | 0 | "[%s] FIFO kv-ratio compaction: picking %" ROCKSDB_PRIszt |
696 | 0 | " files (%" PRIu64 " bytes) at tier boundary %" PRIu64 |
697 | 0 | " for intra-L0 compaction, target=%" PRIu64, |
698 | 0 | cf_name.c_str(), comp_inputs.files.size(), accumulated, boundary, |
699 | 0 | target); |
700 | |
|
701 | 0 | Compaction* c = new Compaction( |
702 | 0 | vstorage, ioptions_, mutable_cf_options, mutable_db_options, |
703 | 0 | {comp_inputs}, 0, boundary /* output file size limit */, |
704 | 0 | 0 /* max compaction bytes, not applicable */, |
705 | 0 | 0 /* output path ID */, mutable_cf_options.compression, |
706 | 0 | mutable_cf_options.compression_opts, Temperature::kUnknown, |
707 | 0 | 0 /* max_subcompactions */, {}, |
708 | 0 | /* earliest_snapshot */ std::nullopt, |
709 | 0 | /* snapshot_checker */ nullptr, |
710 | 0 | CompactionReason::kFIFOReduceNumFiles, |
711 | 0 | /* trim_ts */ "", vstorage->CompactionScore(0), |
712 | 0 | /* l0_files_might_overlap */ true); |
713 | 0 | return c; |
714 | 0 | } |
715 | | |
716 | | // This batch wasn't enough -- advance past it |
717 | 0 | scan = pos; |
718 | 0 | } |
719 | 0 | } |
720 | | |
721 | 0 | return nullptr; |
722 | 0 | } |
723 | | |
724 | | // The full_history_ts_low parameter is used to control bottommost file marking |
725 | | // for compaction when user-defined timestamps (UDT) are enabled. |
726 | | |
727 | | // TODO leverage full_history_ts_low for FIFO compaction, by trigggerring |
728 | | // compaction early for data that has already expired to achieve the goal of TTL |
729 | | // enforced compliance. |
730 | | Compaction* FIFOCompactionPicker::PickCompaction( |
731 | | const std::string& cf_name, const MutableCFOptions& mutable_cf_options, |
732 | | const MutableDBOptions& mutable_db_options, |
733 | | const std::vector<SequenceNumber>& /* existing_snapshots */, |
734 | | const SnapshotChecker* /* snapshot_checker */, VersionStorageInfo* vstorage, |
735 | | LogBuffer* log_buffer, const std::string& /* full_history_ts_low */, |
736 | 0 | bool /* require_max_output_level*/) { |
737 | 0 | Compaction* c = nullptr; |
738 | 0 | if (mutable_cf_options.ttl > 0) { |
739 | 0 | c = PickTTLCompaction(cf_name, mutable_cf_options, mutable_db_options, |
740 | 0 | vstorage, log_buffer); |
741 | 0 | } |
742 | 0 | if (c == nullptr) { |
743 | 0 | c = PickSizeCompaction(cf_name, mutable_cf_options, mutable_db_options, |
744 | 0 | vstorage, log_buffer); |
745 | 0 | } |
746 | | // Intra-L0 compaction merges small files to reduce file count. |
747 | | // It runs after size-based dropping: if PickSizeCompaction dropped files, |
748 | | // it returned non-null and we skip this. Otherwise, we try to reduce |
749 | | // L0 file count by merging small files together. |
750 | 0 | if (c == nullptr) { |
751 | 0 | c = PickIntraL0Compaction(cf_name, mutable_cf_options, mutable_db_options, |
752 | 0 | vstorage, log_buffer); |
753 | 0 | } |
754 | 0 | if (c == nullptr) { |
755 | 0 | c = PickTemperatureChangeCompaction( |
756 | 0 | cf_name, mutable_cf_options, mutable_db_options, vstorage, log_buffer); |
757 | 0 | } |
758 | 0 | if (c == nullptr) { |
759 | 0 | ROCKS_LOG_BUFFER(log_buffer, "[%s] FIFO compaction: no compaction picked", |
760 | 0 | cf_name.c_str()); |
761 | 0 | } |
762 | 0 | RegisterCompaction(c); |
763 | 0 | return c; |
764 | 0 | } |
765 | | |
766 | | Compaction* FIFOCompactionPicker::PickCompactionForCompactRange( |
767 | | const std::string& cf_name, const MutableCFOptions& mutable_cf_options, |
768 | | const MutableDBOptions& mutable_db_options, VersionStorageInfo* vstorage, |
769 | | int input_level, int output_level, |
770 | | const CompactRangeOptions& /*compact_range_options*/, |
771 | | const InternalKey* /*begin*/, const InternalKey* /*end*/, |
772 | | InternalKey** compaction_end, bool* /*manual_conflict*/, |
773 | | uint64_t /*max_file_num_to_ignore*/, const std::string& /*trim_ts*/, |
774 | 0 | const std::string& full_history_ts_low) { |
775 | 0 | #ifdef NDEBUG |
776 | 0 | (void)input_level; |
777 | 0 | (void)output_level; |
778 | 0 | #endif |
779 | 0 | assert(input_level == 0); |
780 | | assert(output_level == 0); |
781 | 0 | *compaction_end = nullptr; |
782 | 0 | LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, ioptions_.logger); |
783 | 0 | Compaction* c = PickCompaction( |
784 | 0 | cf_name, mutable_cf_options, mutable_db_options, |
785 | 0 | /*existing_snapshots*/ {}, /*snapshot_checker*/ nullptr, vstorage, |
786 | 0 | &log_buffer, full_history_ts_low, /* require_max_output_level */ false); |
787 | 0 | log_buffer.FlushBufferToLog(); |
788 | 0 | return c; |
789 | 0 | } |
790 | | |
791 | | } // namespace ROCKSDB_NAMESPACE |