/src/rocksdb/db/compaction/compaction_job.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/compaction/compaction_job.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <cinttypes> |
14 | | #include <memory> |
15 | | #include <optional> |
16 | | #include <set> |
17 | | #include <utility> |
18 | | #include <vector> |
19 | | |
20 | | #include "db/blob/blob_counting_iterator.h" |
21 | | #include "db/blob/blob_file_addition.h" |
22 | | #include "db/blob/blob_file_builder.h" |
23 | | #include "db/builder.h" |
24 | | #include "db/compaction/clipping_iterator.h" |
25 | | #include "db/compaction/compaction_state.h" |
26 | | #include "db/db_impl/db_impl.h" |
27 | | #include "db/dbformat.h" |
28 | | #include "db/error_handler.h" |
29 | | #include "db/event_helpers.h" |
30 | | #include "db/history_trimming_iterator.h" |
31 | | #include "db/log_writer.h" |
32 | | #include "db/merge_helper.h" |
33 | | #include "db/range_del_aggregator.h" |
34 | | #include "db/version_edit.h" |
35 | | #include "db/version_set.h" |
36 | | #include "file/file_util.h" |
37 | | #include "file/filename.h" |
38 | | #include "file/read_write_util.h" |
39 | | #include "file/sst_file_manager_impl.h" |
40 | | #include "file/writable_file_writer.h" |
41 | | #include "logging/log_buffer.h" |
42 | | #include "logging/logging.h" |
43 | | #include "monitoring/iostats_context_imp.h" |
44 | | #include "monitoring/thread_status_util.h" |
45 | | #include "options/configurable_helper.h" |
46 | | #include "options/options_helper.h" |
47 | | #include "port/port.h" |
48 | | #include "rocksdb/db.h" |
49 | | #include "rocksdb/env.h" |
50 | | #include "rocksdb/options.h" |
51 | | #include "rocksdb/statistics.h" |
52 | | #include "rocksdb/status.h" |
53 | | #include "rocksdb/table.h" |
54 | | #include "rocksdb/utilities/options_type.h" |
55 | | #include "table/format.h" |
56 | | #include "table/merging_iterator.h" |
57 | | #include "table/meta_blocks.h" |
58 | | #include "table/table_builder.h" |
59 | | #include "table/unique_id_impl.h" |
60 | | #include "test_util/sync_point.h" |
61 | | #include "util/hash_containers.h" |
62 | | #include "util/stop_watch.h" |
63 | | |
64 | | namespace ROCKSDB_NAMESPACE { |
65 | | |
66 | 4.85k | const char* GetCompactionReasonString(CompactionReason compaction_reason) { |
67 | 4.85k | switch (compaction_reason) { |
68 | 0 | case CompactionReason::kUnknown: |
69 | 0 | return "Unknown"; |
70 | 2.16k | case CompactionReason::kLevelL0FilesNum: |
71 | 2.16k | return "LevelL0FilesNum"; |
72 | 0 | case CompactionReason::kLevelMaxLevelSize: |
73 | 0 | return "LevelMaxLevelSize"; |
74 | 0 | case CompactionReason::kUniversalSizeAmplification: |
75 | 0 | return "UniversalSizeAmplification"; |
76 | 0 | case CompactionReason::kUniversalSizeRatio: |
77 | 0 | return "UniversalSizeRatio"; |
78 | 0 | case CompactionReason::kUniversalSortedRunNum: |
79 | 0 | return "UniversalSortedRunNum"; |
80 | 0 | case CompactionReason::kFIFOMaxSize: |
81 | 0 | return "FIFOMaxSize"; |
82 | 0 | case CompactionReason::kFIFOReduceNumFiles: |
83 | 0 | return "FIFOReduceNumFiles"; |
84 | 0 | case CompactionReason::kFIFOTtl: |
85 | 0 | return "FIFOTtl"; |
86 | 1.09k | case CompactionReason::kManualCompaction: |
87 | 1.09k | return "ManualCompaction"; |
88 | 0 | case CompactionReason::kFilesMarkedForCompaction: |
89 | 0 | return "FilesMarkedForCompaction"; |
90 | 1.59k | case CompactionReason::kBottommostFiles: |
91 | 1.59k | return "BottommostFiles"; |
92 | 0 | case CompactionReason::kTtl: |
93 | 0 | return "Ttl"; |
94 | 0 | case CompactionReason::kFlush: |
95 | 0 | return "Flush"; |
96 | 0 | case CompactionReason::kExternalSstIngestion: |
97 | 0 | return "ExternalSstIngestion"; |
98 | 0 | case CompactionReason::kPeriodicCompaction: |
99 | 0 | return "PeriodicCompaction"; |
100 | 0 | case CompactionReason::kChangeTemperature: |
101 | 0 | return "ChangeTemperature"; |
102 | 0 | case CompactionReason::kForcedBlobGC: |
103 | 0 | return "ForcedBlobGC"; |
104 | 0 | case CompactionReason::kRoundRobinTtl: |
105 | 0 | return "RoundRobinTtl"; |
106 | 0 | case CompactionReason::kRefitLevel: |
107 | 0 | return "RefitLevel"; |
108 | 0 | case CompactionReason::kReadTriggered: |
109 | 0 | return "ReadTriggered"; |
110 | 0 | case CompactionReason::kNumOfReasons: |
111 | | // fall through |
112 | 0 | default: |
113 | 0 | assert(false); |
114 | 0 | return "Invalid"; |
115 | 4.85k | } |
116 | 4.85k | } |
117 | | |
118 | | const char* GetCompactionProximalOutputRangeTypeString( |
119 | 0 | Compaction::ProximalOutputRangeType range_type) { |
120 | 0 | switch (range_type) { |
121 | 0 | case Compaction::ProximalOutputRangeType::kNotSupported: |
122 | 0 | return "NotSupported"; |
123 | 0 | case Compaction::ProximalOutputRangeType::kFullRange: |
124 | 0 | return "FullRange"; |
125 | 0 | case Compaction::ProximalOutputRangeType::kNonLastRange: |
126 | 0 | return "NonLastRange"; |
127 | 0 | case Compaction::ProximalOutputRangeType::kDisabled: |
128 | 0 | return "Disabled"; |
129 | 0 | default: |
130 | 0 | assert(false); |
131 | 0 | return "Invalid"; |
132 | 0 | } |
133 | 0 | } |
134 | | |
135 | | // Static constant for compaction abort flag - always false, used for |
136 | | // compaction service jobs that don't support abort signaling |
137 | | const std::atomic<int> CompactionJob::kCompactionAbortedFalse{0}; |
138 | | |
139 | | CompactionJob::CompactionJob( |
140 | | int job_id, Compaction* compaction, const ImmutableDBOptions& db_options, |
141 | | const MutableDBOptions& mutable_db_options, const FileOptions& file_options, |
142 | | VersionSet* versions, const std::atomic<bool>* shutting_down, |
143 | | LogBuffer* log_buffer, FSDirectory* db_directory, |
144 | | FSDirectory* output_directory, FSDirectory* blob_output_directory, |
145 | | Statistics* stats, InstrumentedMutex* db_mutex, |
146 | | ErrorHandler* db_error_handler, JobContext* job_context, |
147 | | std::shared_ptr<Cache> table_cache, EventLogger* event_logger, |
148 | | bool paranoid_file_checks, bool measure_io_stats, const std::string& dbname, |
149 | | CompactionJobStats* compaction_job_stats, Env::Priority thread_pri, |
150 | | const std::shared_ptr<IOTracer>& io_tracer, |
151 | | const std::atomic<bool>& manual_compaction_canceled, |
152 | | const std::atomic<int>& compaction_aborted, const std::string& db_id, |
153 | | const std::string& db_session_id, std::string full_history_ts_low, |
154 | | std::string trim_ts, BlobFileCompletionCallback* blob_callback, |
155 | | int* bg_compaction_scheduled, int* bg_bottom_compaction_scheduled) |
156 | 4.85k | : compact_(new CompactionState(compaction)), |
157 | 4.85k | internal_stats_(compaction->compaction_reason(), 1), |
158 | 4.85k | db_options_(db_options), |
159 | 4.85k | mutable_db_options_copy_(mutable_db_options), |
160 | 4.85k | log_buffer_(log_buffer), |
161 | 4.85k | output_directory_(output_directory), |
162 | 4.85k | stats_(stats), |
163 | 4.85k | bottommost_level_(false), |
164 | 4.85k | write_hint_(Env::WLTH_NOT_SET), |
165 | 4.85k | job_stats_(compaction_job_stats), |
166 | 4.85k | job_id_(job_id), |
167 | 4.85k | dbname_(dbname), |
168 | 4.85k | db_id_(db_id), |
169 | 4.85k | db_session_id_(db_session_id), |
170 | 4.85k | file_options_(file_options), |
171 | 4.85k | env_(db_options.env), |
172 | 4.85k | io_tracer_(io_tracer), |
173 | 4.85k | fs_(db_options.fs, io_tracer), |
174 | | file_options_for_read_( |
175 | 4.85k | fs_->OptimizeForCompactionTableRead(file_options, db_options_)), |
176 | 4.85k | versions_(versions), |
177 | 4.85k | shutting_down_(shutting_down), |
178 | 4.85k | manual_compaction_canceled_(manual_compaction_canceled), |
179 | 4.85k | compaction_aborted_(compaction_aborted), |
180 | 4.85k | db_directory_(db_directory), |
181 | 4.85k | blob_output_directory_(blob_output_directory), |
182 | 4.85k | db_mutex_(db_mutex), |
183 | 4.85k | db_error_handler_(db_error_handler), |
184 | | // job_context cannot be nullptr, but we will assert later in the body of |
185 | | // the constructor. |
186 | 4.85k | earliest_snapshot_(job_context |
187 | 4.85k | ? job_context->GetEarliestSnapshotSequence() |
188 | 4.85k | : kMaxSequenceNumber), |
189 | 4.85k | job_context_(job_context), |
190 | 4.85k | table_cache_(std::move(table_cache)), |
191 | 4.85k | event_logger_(event_logger), |
192 | 4.85k | paranoid_file_checks_(paranoid_file_checks), |
193 | 4.85k | measure_io_stats_(measure_io_stats), |
194 | 4.85k | thread_pri_(thread_pri), |
195 | 4.85k | full_history_ts_low_(std::move(full_history_ts_low)), |
196 | 4.85k | trim_ts_(std::move(trim_ts)), |
197 | 4.85k | blob_callback_(blob_callback), |
198 | 4.85k | extra_num_subcompaction_threads_reserved_(0), |
199 | 4.85k | bg_compaction_scheduled_(bg_compaction_scheduled), |
200 | 4.85k | bg_bottom_compaction_scheduled_(bg_bottom_compaction_scheduled) { |
201 | 4.85k | assert(job_stats_ != nullptr); |
202 | 4.85k | assert(log_buffer_ != nullptr); |
203 | 4.85k | assert(job_context); |
204 | 4.85k | assert(job_context->snapshot_context_initialized); |
205 | | |
206 | 4.85k | const auto* cfd = compact_->compaction->column_family_data(); |
207 | 4.85k | ThreadStatusUtil::SetEnableTracking(db_options_.enable_thread_tracking); |
208 | 4.85k | ThreadStatusUtil::SetColumnFamily(cfd); |
209 | 4.85k | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); |
210 | 4.85k | ReportStartedCompaction(compaction); |
211 | 4.85k | } |
212 | | |
213 | 4.85k | CompactionJob::~CompactionJob() { |
214 | 4.85k | assert(compact_ == nullptr); |
215 | 4.85k | ThreadStatusUtil::ResetThreadStatus(); |
216 | 4.85k | } |
217 | | |
218 | 4.85k | void CompactionJob::ReportStartedCompaction(Compaction* compaction) { |
219 | 4.85k | ThreadStatusUtil::SetThreadOperationProperty(ThreadStatus::COMPACTION_JOB_ID, |
220 | 4.85k | job_id_); |
221 | | |
222 | 4.85k | ThreadStatusUtil::SetThreadOperationProperty( |
223 | 4.85k | ThreadStatus::COMPACTION_INPUT_OUTPUT_LEVEL, |
224 | 4.85k | (static_cast<uint64_t>(compact_->compaction->start_level()) << 32) + |
225 | 4.85k | compact_->compaction->output_level()); |
226 | | |
227 | | // In the current design, a CompactionJob is always created |
228 | | // for non-trivial compaction. |
229 | 4.85k | assert(compaction->IsTrivialMove() == false || |
230 | 4.85k | compaction->is_manual_compaction() == true); |
231 | | |
232 | 4.85k | ThreadStatusUtil::SetThreadOperationProperty( |
233 | 4.85k | ThreadStatus::COMPACTION_PROP_FLAGS, |
234 | 4.85k | compaction->is_manual_compaction() + |
235 | 4.85k | (compaction->deletion_compaction() << 1)); |
236 | 4.85k | auto total_input_bytes = compaction->CalculateTotalInputSize(); |
237 | 4.85k | ThreadStatusUtil::SetThreadOperationProperty( |
238 | 4.85k | ThreadStatus::COMPACTION_TOTAL_INPUT_BYTES, total_input_bytes); |
239 | | |
240 | 4.85k | IOSTATS_RESET(bytes_written); |
241 | 4.85k | IOSTATS_RESET(bytes_read); |
242 | 4.85k | ThreadStatusUtil::SetThreadOperationProperty( |
243 | 4.85k | ThreadStatus::COMPACTION_BYTES_WRITTEN, 0); |
244 | 4.85k | ThreadStatusUtil::SetThreadOperationProperty( |
245 | 4.85k | ThreadStatus::COMPACTION_BYTES_READ, 0); |
246 | | |
247 | | // Set the thread operation after operation properties |
248 | | // to ensure GetThreadList() can always show them all together. |
249 | 4.85k | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OP_COMPACTION); |
250 | | |
251 | 4.85k | job_stats_->is_manual_compaction = compaction->is_manual_compaction(); |
252 | 4.85k | job_stats_->is_full_compaction = compaction->is_full_compaction(); |
253 | | // populate compaction stats num_input_files and total_num_of_bytes |
254 | 4.85k | size_t num_input_files = 0; |
255 | 4.85k | for (int input_level = 0; |
256 | 12.4k | input_level < static_cast<int>(compaction->num_input_levels()); |
257 | 7.57k | ++input_level) { |
258 | 7.57k | const LevelFilesBrief* flevel = compaction->input_levels(input_level); |
259 | 7.57k | num_input_files += flevel->num_files; |
260 | 7.57k | } |
261 | 4.85k | job_stats_->CompactionJobStats::num_input_files = num_input_files; |
262 | 4.85k | job_stats_->total_input_bytes = total_input_bytes; |
263 | 4.85k | } |
264 | | |
265 | | void CompactionJob::Prepare( |
266 | | std::optional<std::pair<std::optional<Slice>, std::optional<Slice>>> |
267 | | known_single_subcompact, |
268 | | const CompactionProgress& compaction_progress, |
269 | 4.85k | log::Writer* compaction_progress_writer) { |
270 | 4.85k | db_mutex_->AssertHeld(); |
271 | 4.85k | AutoThreadOperationStageUpdater stage_updater( |
272 | 4.85k | ThreadStatus::STAGE_COMPACTION_PREPARE); |
273 | | |
274 | | // Generate file_levels_ for compaction before making Iterator |
275 | 4.85k | auto* c = compact_->compaction; |
276 | 4.85k | [[maybe_unused]] ColumnFamilyData* cfd = c->column_family_data(); |
277 | 4.85k | assert(cfd != nullptr); |
278 | 4.85k | const VersionStorageInfo* storage_info = c->input_version()->storage_info(); |
279 | 4.85k | assert(storage_info); |
280 | 4.85k | assert(storage_info->NumLevelFiles(compact_->compaction->level()) > 0); |
281 | | |
282 | 4.85k | write_hint_ = storage_info->CalculateSSTWriteHint( |
283 | 4.85k | c->output_level(), db_options_.calculate_sst_write_lifetime_hint_set); |
284 | 4.85k | bottommost_level_ = c->bottommost_level(); |
285 | | |
286 | 4.85k | if (!known_single_subcompact.has_value() && c->ShouldFormSubcompactions()) { |
287 | 0 | StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME); |
288 | 0 | GenSubcompactionBoundaries(); |
289 | 0 | } |
290 | 4.85k | if (boundaries_.size() >= 1) { |
291 | 0 | assert(!known_single_subcompact.has_value()); |
292 | 0 | for (size_t i = 0; i <= boundaries_.size(); i++) { |
293 | 0 | compact_->sub_compact_states.emplace_back( |
294 | 0 | c, (i != 0) ? std::optional<Slice>(boundaries_[i - 1]) : std::nullopt, |
295 | 0 | (i != boundaries_.size()) ? std::optional<Slice>(boundaries_[i]) |
296 | 0 | : std::nullopt, |
297 | 0 | static_cast<uint32_t>(i)); |
298 | | // assert to validate that boundaries don't have same user keys (without |
299 | | // timestamp part). |
300 | 0 | assert(i == 0 || i == boundaries_.size() || |
301 | 0 | cfd->user_comparator()->CompareWithoutTimestamp( |
302 | 0 | boundaries_[i - 1], boundaries_[i]) < 0); |
303 | 0 | } |
304 | 0 | RecordInHistogram(stats_, NUM_SUBCOMPACTIONS_SCHEDULED, |
305 | 0 | compact_->sub_compact_states.size()); |
306 | 4.85k | } else { |
307 | 4.85k | std::optional<Slice> start_key; |
308 | 4.85k | std::optional<Slice> end_key; |
309 | 4.85k | if (known_single_subcompact.has_value()) { |
310 | 0 | start_key = known_single_subcompact.value().first; |
311 | 0 | end_key = known_single_subcompact.value().second; |
312 | 4.85k | } else { |
313 | 4.85k | assert(!start_key.has_value() && !end_key.has_value()); |
314 | 4.85k | } |
315 | 4.85k | compact_->sub_compact_states.emplace_back(c, start_key, end_key, |
316 | 4.85k | /*sub_job_id*/ 0); |
317 | 4.85k | } |
318 | | |
319 | 4.85k | MaybeAssignCompactionProgressAndWriter(compaction_progress, |
320 | 4.85k | compaction_progress_writer); |
321 | | |
322 | | // collect all seqno->time information from the input files which will be used |
323 | | // to encode seqno->time to the output files. |
324 | 4.85k | SequenceNumber preserve_time_min_seqno = kMaxSequenceNumber; |
325 | 4.85k | SequenceNumber preclude_last_level_min_seqno = kMaxSequenceNumber; |
326 | 4.85k | uint64_t preserve_time_duration = |
327 | 4.85k | MinAndMaxPreserveSeconds(c->mutable_cf_options()).max_preserve_seconds; |
328 | | |
329 | 4.85k | if (preserve_time_duration > 0) { |
330 | 0 | const ReadOptions read_options(Env::IOActivity::kCompaction); |
331 | | // Setup seqno_to_time_mapping_ with relevant time range. |
332 | 0 | seqno_to_time_mapping_.SetMaxTimeSpan(preserve_time_duration); |
333 | 0 | for (const auto& each_level : *c->inputs()) { |
334 | 0 | for (const auto& fmd : each_level.files) { |
335 | 0 | std::shared_ptr<const TableProperties> tp; |
336 | 0 | Status s = c->input_version()->GetTableProperties(read_options, &tp, |
337 | 0 | fmd, nullptr); |
338 | 0 | if (s.ok()) { |
339 | 0 | s = seqno_to_time_mapping_.DecodeFrom(tp->seqno_to_time_mapping); |
340 | 0 | } |
341 | 0 | if (!s.ok()) { |
342 | 0 | ROCKS_LOG_WARN( |
343 | 0 | db_options_.info_log, |
344 | 0 | "Problem reading or processing seqno-to-time mapping: %s", |
345 | 0 | s.ToString().c_str()); |
346 | 0 | } |
347 | 0 | } |
348 | 0 | } |
349 | |
|
350 | 0 | int64_t _current_time = 0; |
351 | 0 | Status s = db_options_.clock->GetCurrentTime(&_current_time); |
352 | 0 | if (!s.ok()) { |
353 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
354 | 0 | "Failed to get current time in compaction: Status: %s", |
355 | 0 | s.ToString().c_str()); |
356 | | // preserve all time information |
357 | 0 | preserve_time_min_seqno = 0; |
358 | 0 | preclude_last_level_min_seqno = 0; |
359 | 0 | seqno_to_time_mapping_.Enforce(); |
360 | 0 | } else { |
361 | 0 | seqno_to_time_mapping_.Enforce(_current_time); |
362 | 0 | seqno_to_time_mapping_.GetCurrentTieringCutoffSeqnos( |
363 | 0 | static_cast<uint64_t>(_current_time), |
364 | 0 | c->mutable_cf_options().preserve_internal_time_seconds, |
365 | 0 | c->mutable_cf_options().preclude_last_level_data_seconds, |
366 | 0 | &preserve_time_min_seqno, &preclude_last_level_min_seqno); |
367 | 0 | } |
368 | | // For accuracy of the GetProximalSeqnoBeforeTime queries above, we only |
369 | | // limit the capacity after them. |
370 | | // Here If we set capacity to the per-SST limit, we could be throwing away |
371 | | // fidelity when a compaction output file has a narrower seqno range than |
372 | | // all the inputs. If we only limit capacity for each compaction output, we |
373 | | // could be doing a lot of unnecessary recomputation in a large compaction |
374 | | // (up to quadratic in number of files). Thus, we do soemthing in the |
375 | | // middle: enforce a resonably large constant size limit substantially |
376 | | // larger than kMaxSeqnoTimePairsPerSST. |
377 | 0 | seqno_to_time_mapping_.SetCapacity(kMaxSeqnoToTimeEntries); |
378 | 0 | } |
379 | | #ifndef NDEBUG |
380 | | assert(preserve_time_min_seqno <= preclude_last_level_min_seqno); |
381 | | TEST_SYNC_POINT_CALLBACK( |
382 | | "CompactionJob::PrepareTimes():preclude_last_level_min_seqno", |
383 | | static_cast<void*>(&preclude_last_level_min_seqno)); |
384 | | // Restore the invariant asserted above, in case it was broken under the |
385 | | // callback |
386 | | preserve_time_min_seqno = |
387 | | std::min(preclude_last_level_min_seqno, preserve_time_min_seqno); |
388 | | #endif |
389 | | |
390 | | // Preserve sequence numbers for preserved write times and snapshots, though |
391 | | // the specific sequence number of the earliest snapshot can be zeroed. |
392 | 4.85k | preserve_seqno_after_ = |
393 | 4.85k | std::max(preserve_time_min_seqno, SequenceNumber{1}) - 1; |
394 | 4.85k | preserve_seqno_after_ = std::min(preserve_seqno_after_, earliest_snapshot_); |
395 | | // If using preclude feature, also preclude snapshots from last level, just |
396 | | // because they are heuristically more likely to be accessed than non-snapshot |
397 | | // data. |
398 | 4.85k | if (preclude_last_level_min_seqno < kMaxSequenceNumber && |
399 | 0 | earliest_snapshot_ < preclude_last_level_min_seqno) { |
400 | 0 | preclude_last_level_min_seqno = earliest_snapshot_; |
401 | 0 | } |
402 | | // Now combine what we would like to preclude from last level with what we |
403 | | // can safely support without dangerously moving data back up the LSM tree, |
404 | | // to get the final seqno threshold for proximal vs. last. In particular, |
405 | | // when the reserved output key range for the proximal level does not |
406 | | // include the entire last level input key range, we need to keep entries |
407 | | // already in the last level there. (Even allowing within-range entries to |
408 | | // move back up could cause problems with range tombstones. Perhaps it |
409 | | // would be better in some rare cases to keep entries in the last level |
410 | | // one-by-one rather than based on sequence number, but that would add extra |
411 | | // tracking and complexity to CompactionIterator that is probably not |
412 | | // worthwhile overall. Correctness is also more clear when splitting by |
413 | | // seqno threshold.) |
414 | 4.85k | proximal_after_seqno_ = std::max(preclude_last_level_min_seqno, |
415 | 4.85k | c->GetKeepInLastLevelThroughSeqno()); |
416 | | |
417 | 4.85k | options_file_number_ = versions_->options_file_number(); |
418 | 4.85k | } |
419 | | |
420 | | void CompactionJob::MaybeAssignCompactionProgressAndWriter( |
421 | | const CompactionProgress& compaction_progress, |
422 | 4.85k | log::Writer* compaction_progress_writer) { |
423 | | // LIMITATION: Only supports resuming single subcompaction for now |
424 | 4.85k | if (compact_->sub_compact_states.size() != 1) { |
425 | 0 | return; |
426 | 0 | } |
427 | | |
428 | 4.85k | if (!compaction_progress.empty()) { |
429 | 0 | assert(compaction_progress.size() == 1); |
430 | 0 | SubcompactionState* sub_compact = &compact_->sub_compact_states[0]; |
431 | 0 | const SubcompactionProgress& subcompaction_progress = |
432 | 0 | compaction_progress[0]; |
433 | 0 | sub_compact->SetSubcompactionProgress(subcompaction_progress); |
434 | 0 | } |
435 | | |
436 | 4.85k | compaction_progress_writer_ = compaction_progress_writer; |
437 | 4.85k | } |
438 | | |
439 | 0 | uint64_t CompactionJob::GetSubcompactionsLimit() { |
440 | 0 | return extra_num_subcompaction_threads_reserved_ + |
441 | 0 | std::max( |
442 | 0 | std::uint64_t(1), |
443 | 0 | static_cast<uint64_t>(compact_->compaction->max_subcompactions())); |
444 | 0 | } |
445 | | |
446 | | void CompactionJob::AcquireSubcompactionResources( |
447 | 0 | int num_extra_required_subcompactions) { |
448 | 0 | TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:0"); |
449 | 0 | TEST_SYNC_POINT("CompactionJob::AcquireSubcompactionResources:1"); |
450 | 0 | int max_db_compactions = |
451 | 0 | DBImpl::GetBGJobLimits( |
452 | 0 | mutable_db_options_copy_.max_background_flushes, |
453 | 0 | mutable_db_options_copy_.max_background_compactions, |
454 | 0 | mutable_db_options_copy_.max_background_jobs, |
455 | 0 | versions_->GetColumnFamilySet() |
456 | 0 | ->write_controller() |
457 | 0 | ->NeedSpeedupCompaction()) |
458 | 0 | .max_compactions; |
459 | 0 | InstrumentedMutexLock l(db_mutex_); |
460 | | // Apply min function first since We need to compute the extra subcompaction |
461 | | // against compaction limits. And then try to reserve threads for extra |
462 | | // subcompactions. The actual number of reserved threads could be less than |
463 | | // the desired number. |
464 | 0 | int available_bg_compactions_against_db_limit = |
465 | 0 | std::max(max_db_compactions - *bg_compaction_scheduled_ - |
466 | 0 | *bg_bottom_compaction_scheduled_, |
467 | 0 | 0); |
468 | | // Reservation only supports backgrdoun threads of which the priority is |
469 | | // between BOTTOM and HIGH. Need to degrade the priority to HIGH if the |
470 | | // origin thread_pri_ is higher than that. Similar to ReleaseThreads(). |
471 | 0 | extra_num_subcompaction_threads_reserved_ = |
472 | 0 | env_->ReserveThreads(std::min(num_extra_required_subcompactions, |
473 | 0 | available_bg_compactions_against_db_limit), |
474 | 0 | std::min(thread_pri_, Env::Priority::HIGH)); |
475 | | |
476 | | // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_ |
477 | | // depending on if this compaction has the bottommost priority |
478 | 0 | if (thread_pri_ == Env::Priority::BOTTOM) { |
479 | 0 | *bg_bottom_compaction_scheduled_ += |
480 | 0 | extra_num_subcompaction_threads_reserved_; |
481 | 0 | } else { |
482 | 0 | *bg_compaction_scheduled_ += extra_num_subcompaction_threads_reserved_; |
483 | 0 | } |
484 | 0 | } |
485 | | |
486 | 0 | void CompactionJob::ShrinkSubcompactionResources(uint64_t num_extra_resources) { |
487 | | // Do nothing when we have zero resources to shrink |
488 | 0 | if (num_extra_resources == 0) { |
489 | 0 | return; |
490 | 0 | } |
491 | 0 | db_mutex_->Lock(); |
492 | | // We cannot release threads more than what we reserved before |
493 | 0 | int extra_num_subcompaction_threads_released = env_->ReleaseThreads( |
494 | 0 | (int)num_extra_resources, std::min(thread_pri_, Env::Priority::HIGH)); |
495 | | // Update the number of reserved threads and the number of background |
496 | | // scheduled compactions for this compaction job |
497 | 0 | extra_num_subcompaction_threads_reserved_ -= |
498 | 0 | extra_num_subcompaction_threads_released; |
499 | | // TODO (zichen): design a test case with new subcompaction partitioning |
500 | | // when the number of actual partitions is less than the number of planned |
501 | | // partitions |
502 | 0 | assert(extra_num_subcompaction_threads_released == (int)num_extra_resources); |
503 | | // Update bg_compaction_scheduled_ or bg_bottom_compaction_scheduled_ |
504 | | // depending on if this compaction has the bottommost priority |
505 | 0 | if (thread_pri_ == Env::Priority::BOTTOM) { |
506 | 0 | *bg_bottom_compaction_scheduled_ -= |
507 | 0 | extra_num_subcompaction_threads_released; |
508 | 0 | } else { |
509 | 0 | *bg_compaction_scheduled_ -= extra_num_subcompaction_threads_released; |
510 | 0 | } |
511 | 0 | db_mutex_->Unlock(); |
512 | 0 | TEST_SYNC_POINT("CompactionJob::ShrinkSubcompactionResources:0"); |
513 | 0 | } |
514 | | |
515 | 4.85k | void CompactionJob::ReleaseSubcompactionResources() { |
516 | 4.85k | if (extra_num_subcompaction_threads_reserved_ == 0) { |
517 | 4.85k | return; |
518 | 4.85k | } |
519 | 0 | { |
520 | 0 | InstrumentedMutexLock l(db_mutex_); |
521 | | // The number of reserved threads becomes larger than 0 only if the |
522 | | // compaction prioity is round robin and there is no sufficient |
523 | | // sub-compactions available |
524 | | |
525 | | // The scheduled compaction must be no less than 1 + extra number |
526 | | // subcompactions using acquired resources since this compaction job has not |
527 | | // finished yet |
528 | 0 | assert(*bg_bottom_compaction_scheduled_ >= |
529 | 0 | 1 + extra_num_subcompaction_threads_reserved_ || |
530 | 0 | *bg_compaction_scheduled_ >= |
531 | 0 | 1 + extra_num_subcompaction_threads_reserved_); |
532 | 0 | } |
533 | 0 | ShrinkSubcompactionResources(extra_num_subcompaction_threads_reserved_); |
534 | 0 | } |
535 | | |
536 | 0 | void CompactionJob::GenSubcompactionBoundaries() { |
537 | | // The goal is to find some boundary keys so that we can evenly partition |
538 | | // the compaction input data into max_subcompactions ranges. |
539 | | // For every input file, we ask TableReader to estimate 128 anchor points |
540 | | // that evenly partition the input file into 128 ranges and the range |
541 | | // sizes. This can be calculated by scanning index blocks of the file. |
542 | | // Once we have the anchor points for all the input files, we merge them |
543 | | // together and try to find keys dividing ranges evenly. |
544 | | // For example, if we have two input files, and each returns following |
545 | | // ranges: |
546 | | // File1: (a1, 1000), (b1, 1200), (c1, 1100) |
547 | | // File2: (a2, 1100), (b2, 1000), (c2, 1000) |
548 | | // We total sort the keys to following: |
549 | | // (a1, 1000), (a2, 1100), (b1, 1200), (b2, 1000), (c1, 1100), (c2, 1000) |
550 | | // We calculate the total size by adding up all ranges' size, which is 6400. |
551 | | // If we would like to partition into 2 subcompactions, the target of the |
552 | | // range size is 3200. Based on the size, we take "b1" as the partition key |
553 | | // since the first three ranges would hit 3200. |
554 | | // |
555 | | // Note that the ranges are actually overlapping. For example, in the example |
556 | | // above, the range ending with "b1" is overlapping with the range ending with |
557 | | // "b2". So the size 1000+1100+1200 is an underestimation of data size up to |
558 | | // "b1". In extreme cases where we only compact N L0 files, a range can |
559 | | // overlap with N-1 other ranges. Since we requested a relatively large number |
560 | | // (128) of ranges from each input files, even N range overlapping would |
561 | | // cause relatively small inaccuracy. |
562 | 0 | ReadOptions read_options(Env::IOActivity::kCompaction); |
563 | 0 | read_options.rate_limiter_priority = GetRateLimiterPriority(); |
564 | 0 | auto* c = compact_->compaction; |
565 | 0 | if (c->mutable_cf_options().table_factory->Name() == |
566 | 0 | TableFactory::kPlainTableName()) { |
567 | 0 | return; |
568 | 0 | } |
569 | | |
570 | 0 | if (c->max_subcompactions() <= 1 && |
571 | 0 | !(c->immutable_options().compaction_pri == kRoundRobin && |
572 | 0 | c->immutable_options().compaction_style == kCompactionStyleLevel)) { |
573 | 0 | return; |
574 | 0 | } |
575 | 0 | auto* cfd = c->column_family_data(); |
576 | 0 | const Comparator* cfd_comparator = cfd->user_comparator(); |
577 | 0 | const InternalKeyComparator& icomp = cfd->internal_comparator(); |
578 | |
|
579 | 0 | auto* v = compact_->compaction->input_version(); |
580 | 0 | int base_level = v->storage_info()->base_level(); |
581 | 0 | InstrumentedMutexUnlock unlock_guard(db_mutex_); |
582 | |
|
583 | 0 | uint64_t total_size = 0; |
584 | 0 | std::vector<TableReader::Anchor> all_anchors; |
585 | 0 | int start_lvl = c->start_level(); |
586 | 0 | int out_lvl = c->output_level(); |
587 | |
|
588 | 0 | for (size_t lvl_idx = 0; lvl_idx < c->num_input_levels(); lvl_idx++) { |
589 | 0 | int lvl = c->level(lvl_idx); |
590 | 0 | if (lvl >= start_lvl && lvl <= out_lvl) { |
591 | 0 | const LevelFilesBrief* flevel = c->input_levels(lvl_idx); |
592 | 0 | size_t num_files = flevel->num_files; |
593 | |
|
594 | 0 | if (num_files == 0) { |
595 | 0 | continue; |
596 | 0 | } |
597 | | |
598 | 0 | for (size_t i = 0; i < num_files; i++) { |
599 | 0 | FileMetaData* f = flevel->files[i].file_metadata; |
600 | 0 | std::vector<TableReader::Anchor> my_anchors; |
601 | 0 | Status s = cfd->table_cache()->ApproximateKeyAnchors( |
602 | 0 | read_options, icomp, *f, c->mutable_cf_options(), my_anchors); |
603 | 0 | if (!s.ok() || my_anchors.empty()) { |
604 | 0 | my_anchors.emplace_back(f->largest.user_key(), f->fd.GetFileSize()); |
605 | 0 | } |
606 | 0 | for (auto& ac : my_anchors) { |
607 | | // Can be optimize to avoid this loop. |
608 | 0 | total_size += ac.range_size; |
609 | 0 | } |
610 | |
|
611 | 0 | all_anchors.insert(all_anchors.end(), my_anchors.begin(), |
612 | 0 | my_anchors.end()); |
613 | 0 | } |
614 | 0 | } |
615 | 0 | } |
616 | | // Here we total sort all the anchor points across all files and go through |
617 | | // them in the sorted order to find partitioning boundaries. |
618 | | // Not the most efficient implementation. A much more efficient algorithm |
619 | | // probably exists. But they are more complex. If performance turns out to |
620 | | // be a problem, we can optimize. |
621 | 0 | std::sort( |
622 | 0 | all_anchors.begin(), all_anchors.end(), |
623 | 0 | [cfd_comparator](TableReader::Anchor& a, TableReader::Anchor& b) -> bool { |
624 | 0 | return cfd_comparator->CompareWithoutTimestamp(a.user_key, b.user_key) < |
625 | 0 | 0; |
626 | 0 | }); |
627 | | |
628 | | // Remove duplicated entries from boundaries. |
629 | 0 | all_anchors.erase( |
630 | 0 | std::unique(all_anchors.begin(), all_anchors.end(), |
631 | 0 | [cfd_comparator](TableReader::Anchor& a, |
632 | 0 | TableReader::Anchor& b) -> bool { |
633 | 0 | return cfd_comparator->CompareWithoutTimestamp( |
634 | 0 | a.user_key, b.user_key) == 0; |
635 | 0 | }), |
636 | 0 | all_anchors.end()); |
637 | | |
638 | | // Get the number of planned subcompactions, may update reserve threads |
639 | | // and update extra_num_subcompaction_threads_reserved_ for round-robin |
640 | 0 | uint64_t num_planned_subcompactions; |
641 | 0 | if (c->immutable_options().compaction_pri == kRoundRobin && |
642 | 0 | c->immutable_options().compaction_style == kCompactionStyleLevel) { |
643 | | // For round-robin compaction prioity, we need to employ more |
644 | | // subcompactions (may exceed the max_subcompaction limit). The extra |
645 | | // subcompactions will be executed using reserved threads and taken into |
646 | | // account bg_compaction_scheduled or bg_bottom_compaction_scheduled. |
647 | | |
648 | | // Initialized by the number of input files |
649 | 0 | num_planned_subcompactions = static_cast<uint64_t>(c->num_input_files(0)); |
650 | 0 | uint64_t max_subcompactions_limit = GetSubcompactionsLimit(); |
651 | 0 | if (max_subcompactions_limit < num_planned_subcompactions) { |
652 | | // Assert two pointers are not empty so that we can use extra |
653 | | // subcompactions against db compaction limits |
654 | 0 | assert(bg_bottom_compaction_scheduled_ != nullptr); |
655 | 0 | assert(bg_compaction_scheduled_ != nullptr); |
656 | | // Reserve resources when max_subcompaction is not sufficient |
657 | 0 | AcquireSubcompactionResources( |
658 | 0 | (int)(num_planned_subcompactions - max_subcompactions_limit)); |
659 | | // Subcompactions limit changes after acquiring additional resources. |
660 | | // Need to call GetSubcompactionsLimit() again to update the number |
661 | | // of planned subcompactions |
662 | 0 | num_planned_subcompactions = |
663 | 0 | std::min(num_planned_subcompactions, GetSubcompactionsLimit()); |
664 | 0 | } else { |
665 | 0 | num_planned_subcompactions = max_subcompactions_limit; |
666 | 0 | } |
667 | 0 | } else { |
668 | 0 | num_planned_subcompactions = GetSubcompactionsLimit(); |
669 | 0 | } |
670 | |
|
671 | 0 | TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:0", |
672 | 0 | &num_planned_subcompactions); |
673 | 0 | if (num_planned_subcompactions == 1) { |
674 | 0 | return; |
675 | 0 | } |
676 | | |
677 | | // Group the ranges into subcompactions |
678 | 0 | uint64_t target_range_size = std::max( |
679 | 0 | total_size / num_planned_subcompactions, |
680 | 0 | MaxFileSizeForLevel( |
681 | 0 | c->mutable_cf_options(), out_lvl, |
682 | 0 | c->immutable_options().compaction_style, base_level, |
683 | 0 | c->immutable_options().level_compaction_dynamic_level_bytes)); |
684 | |
|
685 | 0 | if (target_range_size >= total_size) { |
686 | 0 | return; |
687 | 0 | } |
688 | | |
689 | 0 | uint64_t next_threshold = target_range_size; |
690 | 0 | uint64_t cumulative_size = 0; |
691 | 0 | uint64_t num_actual_subcompactions = 1U; |
692 | 0 | for (TableReader::Anchor& anchor : all_anchors) { |
693 | 0 | cumulative_size += anchor.range_size; |
694 | 0 | if (cumulative_size > next_threshold) { |
695 | 0 | next_threshold += target_range_size; |
696 | 0 | num_actual_subcompactions++; |
697 | 0 | boundaries_.push_back(anchor.user_key); |
698 | 0 | } |
699 | 0 | if (num_actual_subcompactions == num_planned_subcompactions) { |
700 | 0 | break; |
701 | 0 | } |
702 | 0 | } |
703 | 0 | TEST_SYNC_POINT_CALLBACK("CompactionJob::GenSubcompactionBoundaries:1", |
704 | 0 | &num_actual_subcompactions); |
705 | | // Shrink extra subcompactions resources when extra resrouces are acquired |
706 | 0 | ShrinkSubcompactionResources( |
707 | 0 | std::min((int)(num_planned_subcompactions - num_actual_subcompactions), |
708 | 0 | extra_num_subcompaction_threads_reserved_)); |
709 | 0 | } |
710 | | |
711 | 4.85k | void CompactionJob::InitializeCompactionRun() { |
712 | 4.85k | AutoThreadOperationStageUpdater stage_updater( |
713 | 4.85k | ThreadStatus::STAGE_COMPACTION_RUN); |
714 | 4.85k | TEST_SYNC_POINT("CompactionJob::Run():Start"); |
715 | 4.85k | log_buffer_->FlushBufferToLog(); |
716 | 4.85k | LogCompaction(); |
717 | 4.85k | } |
718 | | |
719 | 4.85k | void CompactionJob::RunSubcompactions() { |
720 | 4.85k | TEST_SYNC_POINT("CompactionJob::RunSubcompactions:BeforeStart"); |
721 | 4.85k | const size_t num_threads = compact_->sub_compact_states.size(); |
722 | 4.85k | assert(num_threads > 0); |
723 | 4.85k | compact_->compaction->GetOrInitInputTableProperties(); |
724 | | |
725 | | // Launch a thread for each of subcompactions 1...num_threads-1 |
726 | 4.85k | std::vector<port::Thread> thread_pool; |
727 | 4.85k | thread_pool.reserve(num_threads - 1); |
728 | 4.85k | for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { |
729 | 0 | thread_pool.emplace_back(&CompactionJob::ProcessKeyValueCompaction, this, |
730 | 0 | &compact_->sub_compact_states[i]); |
731 | 0 | } |
732 | | |
733 | | // Always schedule the first subcompaction (whether or not there are also |
734 | | // others) in the current thread to be efficient with resources |
735 | 4.85k | ProcessKeyValueCompaction(compact_->sub_compact_states.data()); |
736 | | |
737 | | // Wait for all other threads (if there are any) to finish execution |
738 | 4.85k | for (auto& thread : thread_pool) { |
739 | 0 | thread.join(); |
740 | 0 | } |
741 | 4.85k | RemoveEmptyOutputs(); |
742 | | |
743 | 4.85k | ReleaseSubcompactionResources(); |
744 | 4.85k | TEST_SYNC_POINT("CompactionJob::ReleaseSubcompactionResources"); |
745 | 4.85k | } |
746 | | |
747 | 4.85k | void CompactionJob::UpdateTimingStats(uint64_t start_micros) { |
748 | 4.85k | internal_stats_.SetMicros(db_options_.clock->NowMicros() - start_micros); |
749 | | |
750 | 4.85k | for (auto& state : compact_->sub_compact_states) { |
751 | 4.85k | internal_stats_.AddCpuMicros(state.compaction_job_stats.cpu_micros); |
752 | 4.85k | } |
753 | | |
754 | 4.85k | RecordTimeToHistogram(stats_, COMPACTION_TIME, |
755 | 4.85k | internal_stats_.output_level_stats.micros); |
756 | 4.85k | RecordTimeToHistogram(stats_, COMPACTION_CPU_TIME, |
757 | 4.85k | internal_stats_.output_level_stats.cpu_micros); |
758 | 4.85k | } |
759 | | |
760 | 4.85k | void CompactionJob::RemoveEmptyOutputs() { |
761 | 4.85k | for (auto& state : compact_->sub_compact_states) { |
762 | 4.85k | state.RemoveLastEmptyOutput(); |
763 | 4.85k | } |
764 | 4.85k | } |
765 | | |
766 | 0 | void CompactionJob::CleanupAbortedSubcompactions() { |
767 | 0 | ColumnFamilyData* cfd = compact_->compaction->column_family_data(); |
768 | |
|
769 | 0 | uint64_t total_sst_files_deleted = 0; |
770 | 0 | uint64_t total_blob_files_deleted = 0; |
771 | | |
772 | | // Track the first file deletion error to report at the end |
773 | 0 | Status first_error; |
774 | 0 | int deletion_errors = 0; |
775 | | |
776 | | // Mark all subcompactions as aborted and delete their output files |
777 | 0 | for (auto& sub_compact : compact_->sub_compact_states) { |
778 | | // Mark this subcompaction as aborted |
779 | 0 | sub_compact.status = |
780 | 0 | Status::Incomplete(Status::SubCode::kCompactionAborted); |
781 | | |
782 | | // Delete all files (SST and blob) tracked during compaction. |
783 | | // GetOutputFilePaths() contains ALL file paths created, including |
784 | | // in-progress files that may have been removed from outputs_ or |
785 | | // blob_file_additions_. |
786 | 0 | for (const bool is_proximal_level : {false, true}) { |
787 | 0 | if (is_proximal_level && |
788 | 0 | !compact_->compaction->SupportsPerKeyPlacement()) { |
789 | 0 | continue; |
790 | 0 | } |
791 | 0 | for (const std::string& file_path : |
792 | 0 | sub_compact.Outputs(is_proximal_level)->GetOutputFilePaths()) { |
793 | 0 | Status s = env_->DeleteFile(file_path); |
794 | 0 | if (s.ok()) { |
795 | | // Count SST vs blob files by checking extension |
796 | 0 | if (file_path.find(".sst") != std::string::npos) { |
797 | 0 | total_sst_files_deleted++; |
798 | 0 | } else if (file_path.find(".blob") != std::string::npos) { |
799 | 0 | total_blob_files_deleted++; |
800 | 0 | } |
801 | 0 | } else if (!s.IsNotFound()) { |
802 | 0 | if (first_error.ok()) { |
803 | 0 | first_error = s; |
804 | 0 | } |
805 | 0 | deletion_errors++; |
806 | 0 | } |
807 | 0 | } |
808 | 0 | } |
809 | 0 | sub_compact.CleanupOutputs(); |
810 | 0 | } |
811 | |
|
812 | 0 | if (stats_) { |
813 | 0 | RecordTick(stats_, COMPACTION_ABORTED); |
814 | 0 | } |
815 | |
|
816 | 0 | ROCKS_LOG_INFO(db_options_.info_log, |
817 | 0 | "[%s] [JOB %d] Compaction aborted: deleted %" PRIu64 |
818 | 0 | " SST files and %" PRIu64 " blob files", |
819 | 0 | cfd->GetName().c_str(), job_id_, total_sst_files_deleted, |
820 | 0 | total_blob_files_deleted); |
821 | |
|
822 | 0 | if (!first_error.ok()) { |
823 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, |
824 | 0 | "[%s] [JOB %d] Cleanup completed with %d file deletion " |
825 | 0 | "errors. First error: %s", |
826 | 0 | cfd->GetName().c_str(), job_id_, deletion_errors, |
827 | 0 | first_error.ToString().c_str()); |
828 | 0 | } |
829 | 0 | } |
830 | | |
831 | 2.72k | bool CompactionJob::HasNewBlobFiles() const { |
832 | 2.72k | for (const auto& state : compact_->sub_compact_states) { |
833 | 2.72k | if (state.Current().HasBlobFileAdditions()) { |
834 | 0 | return true; |
835 | 0 | } |
836 | 2.72k | } |
837 | 2.72k | return false; |
838 | 2.72k | } |
839 | | |
840 | 4.85k | Status CompactionJob::CollectSubcompactionErrors() { |
841 | 4.85k | Status status; |
842 | 4.85k | IOStatus io_s; |
843 | | |
844 | 4.85k | for (const auto& state : compact_->sub_compact_states) { |
845 | 4.85k | if (!state.status.ok()) { |
846 | 2.12k | status = state.status; |
847 | 2.12k | io_s = state.io_status; |
848 | 2.12k | break; |
849 | 2.12k | } |
850 | 4.85k | } |
851 | | |
852 | 4.85k | if (io_status_.ok()) { |
853 | 4.85k | io_status_ = io_s; |
854 | 4.85k | } |
855 | | |
856 | 4.85k | return status; |
857 | 4.85k | } |
858 | | |
859 | 2.72k | Status CompactionJob::SyncOutputDirectories() { |
860 | 2.72k | Status status; |
861 | 2.72k | IOStatus io_s; |
862 | 2.72k | constexpr IODebugContext* dbg = nullptr; |
863 | 2.72k | const bool wrote_new_blob_files = HasNewBlobFiles(); |
864 | 2.72k | if (output_directory_) { |
865 | 2.72k | io_s = output_directory_->FsyncWithDirOptions( |
866 | 2.72k | IOOptions(), dbg, |
867 | 2.72k | DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); |
868 | 2.72k | } |
869 | | |
870 | 2.72k | if (io_s.ok() && wrote_new_blob_files && blob_output_directory_ && |
871 | 0 | blob_output_directory_ != output_directory_) { |
872 | 0 | io_s = blob_output_directory_->FsyncWithDirOptions( |
873 | 0 | IOOptions(), dbg, |
874 | 0 | DirFsyncOptions(DirFsyncOptions::FsyncReason::kNewFileSynced)); |
875 | 0 | } |
876 | | |
877 | 2.72k | if (io_status_.ok()) { |
878 | 2.72k | io_status_ = io_s; |
879 | 2.72k | } |
880 | 2.72k | if (status.ok()) { |
881 | 2.72k | status = io_s; |
882 | 2.72k | } |
883 | | |
884 | 2.72k | return status; |
885 | 2.72k | } |
886 | | |
887 | 2.72k | Status CompactionJob::VerifyOutputFiles() { |
888 | 2.72k | Status status; |
889 | 2.72k | std::vector<port::Thread> thread_pool; |
890 | 2.72k | ColumnFamilyData* cfd = compact_->compaction->column_family_data(); |
891 | 2.72k | VerifyOutputFlags verify_output_flags = |
892 | 2.72k | compact_->compaction->mutable_cf_options().verify_output_flags; |
893 | | |
894 | | // For backward compatibility |
895 | 2.72k | if (paranoid_file_checks_) { |
896 | 0 | verify_output_flags |= VerifyOutputFlags::kVerifyIteration; |
897 | 0 | verify_output_flags |= VerifyOutputFlags::kEnableForLocalCompaction; |
898 | 0 | verify_output_flags |= VerifyOutputFlags::kEnableForRemoteCompaction; |
899 | 0 | } |
900 | | |
901 | 2.72k | auto verify_table = [&](SubcompactionState& subcompaction_state) { |
902 | | // Collect file open metadata during verification when fast_sst_open |
903 | | // is enabled, keyed by file number. |
904 | 2.72k | UnorderedMap<uint64_t, std::string> file_open_metadata_map; |
905 | | |
906 | 2.72k | for (const auto& output_file : subcompaction_state.GetOutputs()) { |
907 | | // Verify that the table is usable |
908 | | // We set for_compaction to false and don't |
909 | | // OptimizeForCompactionTableRead here because this is a special case |
910 | | // after we finish the table building No matter whether |
911 | | // use_direct_io_for_flush_and_compaction is true, we will regard this |
912 | | // verification as user reads since the goal is to cache it here for |
913 | | // further user reads |
914 | 1.57k | ReadOptions verification_read_options(Env::IOActivity::kCompaction); |
915 | 1.57k | verification_read_options.verify_checksums = true; |
916 | 1.57k | verification_read_options.readahead_size = |
917 | 1.57k | file_options_for_read_.compaction_readahead_size; |
918 | | |
919 | 1.57k | std::unique_ptr<TableReader> table_reader_guard; |
920 | 1.57k | TableReader* table_reader_ptr = table_reader_guard.get(); |
921 | 1.57k | verification_read_options.rate_limiter_priority = |
922 | 1.57k | GetRateLimiterPriority(); |
923 | 1.57k | std::string file_open_metadata; |
924 | 1.57k | std::string* file_open_metadata_ptr = |
925 | 1.57k | mutable_db_options_copy_.fast_sst_open ? &file_open_metadata |
926 | 1.57k | : nullptr; |
927 | 1.57k | InternalIterator* iter = cfd->table_cache()->NewIterator( |
928 | 1.57k | verification_read_options, file_options_, cfd->internal_comparator(), |
929 | 1.57k | output_file.meta, |
930 | 1.57k | /*range_del_agg=*/nullptr, compact_->compaction->mutable_cf_options(), |
931 | 1.57k | /*table_reader_ptr=*/&table_reader_ptr, |
932 | 1.57k | cfd->internal_stats()->GetFileReadHist( |
933 | 1.57k | compact_->compaction->output_level()), |
934 | 1.57k | TableReaderCaller::kCompactionRefill, /*arena=*/nullptr, |
935 | 1.57k | /*skip_filters=*/false, compact_->compaction->output_level(), |
936 | 1.57k | MaxFileSizeForL0MetaPin(compact_->compaction->mutable_cf_options()), |
937 | 1.57k | /*smallest_compaction_key=*/nullptr, |
938 | 1.57k | /*largest_compaction_key=*/nullptr, |
939 | 1.57k | /*allow_unprepared_value=*/false, |
940 | 1.57k | /*range_del_read_seqno=*/nullptr, |
941 | 1.57k | /*range_del_iter=*/nullptr, |
942 | 1.57k | /*maybe_pin_table_handle=*/false, file_open_metadata_ptr); |
943 | 1.57k | auto s = iter->status(); |
944 | 1.57k | if (s.ok()) { |
945 | | // Check for remote/local compaction and verify_output_flags flags |
946 | 1.57k | const bool should_verify = |
947 | 1.57k | (subcompaction_state.compaction_job_stats.is_remote_compaction && |
948 | 0 | !!(verify_output_flags & |
949 | 0 | VerifyOutputFlags::kEnableForRemoteCompaction)) || |
950 | 1.57k | (!subcompaction_state.compaction_job_stats.is_remote_compaction && |
951 | 1.57k | !!(verify_output_flags & |
952 | 1.57k | VerifyOutputFlags::kEnableForLocalCompaction)); |
953 | | |
954 | 1.57k | if (should_verify) { |
955 | 0 | const bool should_verify_block_checksum = |
956 | 0 | !!(verify_output_flags & VerifyOutputFlags::kVerifyBlockChecksum); |
957 | 0 | const bool should_verify_iteration = |
958 | 0 | !!(verify_output_flags & VerifyOutputFlags::kVerifyIteration); |
959 | 0 | const bool should_verify_file_checksum = |
960 | 0 | !!(verify_output_flags & |
961 | 0 | VerifyOutputFlags::kVerifyFileChecksum) && |
962 | 0 | db_options_.file_checksum_gen_factory != nullptr && |
963 | 0 | output_file.meta.file_checksum != kUnknownFileChecksum; |
964 | 0 | if (should_verify_block_checksum) { |
965 | 0 | assert(table_reader_ptr != nullptr); |
966 | | // If verifying iteration as well, verify meta blocks here only to |
967 | | // avoid redundant checks on data blocks |
968 | 0 | s = table_reader_ptr->VerifyChecksum( |
969 | 0 | verification_read_options, TableReaderCaller::kCompaction, |
970 | 0 | /*meta_blocks_only=*/should_verify_iteration); |
971 | 0 | } |
972 | 0 | if (s.ok() && should_verify_iteration) { |
973 | 0 | OutputValidator validator(cfd->internal_comparator(), |
974 | 0 | /*_enable_hash=*/true); |
975 | 0 | for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { |
976 | 0 | s = validator.Add(iter->key(), iter->value()); |
977 | 0 | if (!s.ok()) { |
978 | 0 | break; |
979 | 0 | } |
980 | 0 | } |
981 | 0 | if (s.ok()) { |
982 | 0 | s = iter->status(); |
983 | 0 | } |
984 | 0 | if (s.ok() && !validator.CompareValidator(output_file.validator)) { |
985 | 0 | s = Status::Corruption( |
986 | 0 | "Key-value checksum of compaction output doesn't match what " |
987 | 0 | "was computed when written"); |
988 | 0 | } |
989 | 0 | } |
990 | 0 | if (s.ok() && should_verify_file_checksum) { |
991 | 0 | std::string file_checksum; |
992 | 0 | std::string file_checksum_func_name; |
993 | 0 | std::string fname = |
994 | 0 | GetTableFileName(output_file.meta.fd.GetNumber()); |
995 | 0 | s = GenerateOneFileChecksum( |
996 | 0 | fs_.get(), fname, db_options_.file_checksum_gen_factory.get(), |
997 | 0 | output_file.meta.file_checksum_func_name, &file_checksum, |
998 | 0 | &file_checksum_func_name, |
999 | 0 | verification_read_options.readahead_size, |
1000 | 0 | db_options_.allow_mmap_reads, io_tracer_, |
1001 | 0 | db_options_.rate_limiter.get(), verification_read_options, |
1002 | 0 | stats_, db_options_.clock, file_options_for_read_); |
1003 | 0 | if (s.ok() && file_checksum != output_file.meta.file_checksum) { |
1004 | 0 | s = Status::Corruption( |
1005 | 0 | "File checksum mismatch for compaction output file " + fname); |
1006 | 0 | } |
1007 | 0 | } |
1008 | 0 | } |
1009 | 1.57k | } |
1010 | | |
1011 | 1.57k | delete iter; |
1012 | | |
1013 | 1.57k | if (!s.ok()) { |
1014 | 0 | subcompaction_state.status = s; |
1015 | 0 | break; |
1016 | 0 | } |
1017 | | |
1018 | 1.57k | if (!file_open_metadata.empty()) { |
1019 | 0 | file_open_metadata_map[output_file.meta.fd.GetNumber()] = |
1020 | 0 | std::move(file_open_metadata); |
1021 | 0 | } |
1022 | 1.57k | } |
1023 | | |
1024 | | // Apply collected file open metadata to mutable outputs |
1025 | 2.72k | if (!file_open_metadata_map.empty()) { |
1026 | 0 | auto apply_metadata = |
1027 | 0 | [&file_open_metadata_map]( |
1028 | 0 | std::vector<CompactionOutputs::Output>& outputs) { |
1029 | 0 | for (auto& output : outputs) { |
1030 | 0 | auto it = file_open_metadata_map.find(output.meta.fd.GetNumber()); |
1031 | 0 | if (it != file_open_metadata_map.end()) { |
1032 | 0 | output.meta.file_open_metadata = std::move(it->second); |
1033 | 0 | } |
1034 | 0 | } |
1035 | 0 | }; |
1036 | 0 | apply_metadata(subcompaction_state.GetMutableCompactionOutputs()); |
1037 | 0 | apply_metadata(subcompaction_state.GetMutableProximalOutputs()); |
1038 | 0 | } |
1039 | 2.72k | }; |
1040 | 2.72k | for (size_t i = 1; i < compact_->sub_compact_states.size(); i++) { |
1041 | 0 | thread_pool.emplace_back(verify_table, |
1042 | 0 | std::ref(compact_->sub_compact_states[i])); |
1043 | 0 | } |
1044 | 2.72k | verify_table(compact_->sub_compact_states[0]); |
1045 | 2.72k | for (auto& thread : thread_pool) { |
1046 | 0 | thread.join(); |
1047 | 0 | } |
1048 | | |
1049 | 2.72k | for (const auto& state : compact_->sub_compact_states) { |
1050 | 2.72k | if (!state.status.ok()) { |
1051 | 0 | status = state.status; |
1052 | 0 | break; |
1053 | 0 | } |
1054 | 2.72k | } |
1055 | | |
1056 | 2.72k | return status; |
1057 | 2.72k | } |
1058 | | |
1059 | 2.72k | void CompactionJob::SetOutputTableProperties() { |
1060 | 2.72k | for (const auto& state : compact_->sub_compact_states) { |
1061 | 2.72k | for (const auto& output : state.GetOutputs()) { |
1062 | 1.57k | auto fn = |
1063 | 1.57k | TableFileName(state.compaction->immutable_options().cf_paths, |
1064 | 1.57k | output.meta.fd.GetNumber(), output.meta.fd.GetPathId()); |
1065 | 1.57k | compact_->compaction->SetOutputTableProperties(fn, |
1066 | 1.57k | output.table_properties); |
1067 | 1.57k | } |
1068 | 2.72k | } |
1069 | 2.72k | } |
1070 | | |
1071 | 4.85k | void CompactionJob::AggregateSubcompactionOutputAndJobStats() { |
1072 | | // Before the compaction starts, is_remote_compaction was set to true if |
1073 | | // compaction_service is set. We now know whether each sub_compaction was |
1074 | | // done remotely or not. Reset is_remote_compaction back to false and allow |
1075 | | // AggregateCompactionStats() to set the right value. |
1076 | 4.85k | job_stats_->is_remote_compaction = false; |
1077 | | |
1078 | | // Finish up all bookkeeping to unify the subcompaction results. |
1079 | 4.85k | compact_->AggregateCompactionStats(internal_stats_, *job_stats_); |
1080 | 4.85k | } |
1081 | | |
1082 | | Status CompactionJob::VerifyCompactionRecordCounts( |
1083 | 2.72k | bool stats_built_from_input_table_prop, uint64_t num_input_range_del) { |
1084 | 2.72k | Status status; |
1085 | 2.72k | if (stats_built_from_input_table_prop && |
1086 | 2.72k | job_stats_->has_accurate_num_input_records) { |
1087 | 2.72k | status = VerifyInputRecordCount(num_input_range_del); |
1088 | 2.72k | if (!status.ok()) { |
1089 | 0 | return status; |
1090 | 0 | } |
1091 | 2.72k | } |
1092 | | |
1093 | 2.72k | const auto& mutable_cf_options = compact_->compaction->mutable_cf_options(); |
1094 | 2.72k | if ((mutable_cf_options.table_factory->IsInstanceOf( |
1095 | 2.72k | TableFactory::kBlockBasedTableName()) || |
1096 | 0 | mutable_cf_options.table_factory->IsInstanceOf( |
1097 | 2.72k | TableFactory::kPlainTableName()))) { |
1098 | 2.72k | status = VerifyOutputRecordCount(); |
1099 | 2.72k | if (!status.ok()) { |
1100 | 0 | return status; |
1101 | 0 | } |
1102 | 2.72k | } |
1103 | 2.72k | return status; |
1104 | 2.72k | } |
1105 | | |
1106 | | void CompactionJob::FinalizeCompactionRun( |
1107 | | const Status& input_status, bool stats_built_from_input_table_prop, |
1108 | 4.85k | uint64_t num_input_range_del) { |
1109 | 4.85k | if (stats_built_from_input_table_prop) { |
1110 | 4.85k | UpdateCompactionJobInputStatsFromInternalStats(internal_stats_, |
1111 | 4.85k | num_input_range_del); |
1112 | 4.85k | } |
1113 | 4.85k | UpdateCompactionJobOutputStatsFromInternalStats(input_status, |
1114 | 4.85k | internal_stats_); |
1115 | 4.85k | RecordCompactionIOStats(); |
1116 | | |
1117 | 4.85k | LogFlush(db_options_.info_log); |
1118 | 4.85k | TEST_SYNC_POINT("CompactionJob::Run():End"); |
1119 | 4.85k | compact_->status = input_status; |
1120 | 4.85k | TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():EndStatusSet", |
1121 | 4.85k | const_cast<Status*>(&input_status)); |
1122 | 4.85k | } |
1123 | | |
1124 | 4.85k | Status CompactionJob::Run() { |
1125 | 4.85k | InitializeCompactionRun(); |
1126 | | |
1127 | 4.85k | const uint64_t start_micros = db_options_.clock->NowMicros(); |
1128 | | |
1129 | 4.85k | RunSubcompactions(); |
1130 | | |
1131 | 4.85k | UpdateTimingStats(start_micros); |
1132 | | |
1133 | 4.85k | TEST_SYNC_POINT("CompactionJob::Run:BeforeVerify"); |
1134 | | |
1135 | 4.85k | Status status = CollectSubcompactionErrors(); |
1136 | | |
1137 | | // If compaction was aborted or manually paused, clean up any output files |
1138 | | // from completed subcompactions to prevent orphaned files on disk. |
1139 | | // Skip cleanup for resumable compaction (when progress writer is set) |
1140 | | // because the output files are needed for resumption. |
1141 | 4.85k | if ((status.IsCompactionAborted() || status.IsManualCompactionPaused()) && |
1142 | 0 | compaction_progress_writer_ == nullptr) { |
1143 | 0 | CleanupAbortedSubcompactions(); |
1144 | 0 | } |
1145 | | |
1146 | 4.85k | if (status.ok()) { |
1147 | 2.72k | status = SyncOutputDirectories(); |
1148 | 2.72k | } |
1149 | | |
1150 | 4.85k | if (status.ok()) { |
1151 | 2.72k | status = VerifyOutputFiles(); |
1152 | 2.72k | } |
1153 | | |
1154 | 4.85k | TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():AfterVerifyOutputFiles", |
1155 | 4.85k | &status); |
1156 | | |
1157 | 4.85k | if (status.ok()) { |
1158 | 2.72k | SetOutputTableProperties(); |
1159 | 2.72k | } |
1160 | | |
1161 | 4.85k | AggregateSubcompactionOutputAndJobStats(); |
1162 | | |
1163 | 4.85k | uint64_t num_input_range_del = 0; |
1164 | 4.85k | bool stats_built_from_input_table_prop = |
1165 | 4.85k | UpdateInternalStatsFromInputFiles(&num_input_range_del); |
1166 | | |
1167 | 4.85k | if (status.ok()) { |
1168 | 2.72k | status = VerifyCompactionRecordCounts(stats_built_from_input_table_prop, |
1169 | 2.72k | num_input_range_del); |
1170 | 2.72k | } |
1171 | | |
1172 | 4.85k | FinalizeCompactionRun(status, stats_built_from_input_table_prop, |
1173 | 4.85k | num_input_range_del); |
1174 | | |
1175 | 4.85k | return status; |
1176 | 4.85k | } |
1177 | | |
1178 | 4.85k | Status CompactionJob::Install(bool* compaction_released) { |
1179 | 4.85k | assert(compact_); |
1180 | | |
1181 | 4.85k | AutoThreadOperationStageUpdater stage_updater( |
1182 | 4.85k | ThreadStatus::STAGE_COMPACTION_INSTALL); |
1183 | 4.85k | db_mutex_->AssertHeld(); |
1184 | 4.85k | Status status = compact_->status; |
1185 | | |
1186 | 4.85k | ColumnFamilyData* cfd = compact_->compaction->column_family_data(); |
1187 | 4.85k | assert(cfd); |
1188 | | |
1189 | 4.85k | int output_level = compact_->compaction->output_level(); |
1190 | 4.85k | cfd->internal_stats()->AddCompactionStats(output_level, thread_pri_, |
1191 | 4.85k | internal_stats_); |
1192 | | |
1193 | 4.85k | if (status.ok()) { |
1194 | 2.72k | status = InstallCompactionResults(compaction_released); |
1195 | 2.72k | } |
1196 | 4.85k | if (!versions_->io_status().ok()) { |
1197 | 0 | io_status_ = versions_->io_status(); |
1198 | 0 | } |
1199 | | |
1200 | 4.85k | VersionStorageInfo::LevelSummaryStorage tmp; |
1201 | 4.85k | auto vstorage = cfd->current()->storage_info(); |
1202 | 4.85k | const auto& stats = internal_stats_.output_level_stats; |
1203 | | |
1204 | 4.85k | double read_write_amp = 0.0; |
1205 | 4.85k | double write_amp = 0.0; |
1206 | 4.85k | double bytes_read_per_sec = 0; |
1207 | 4.85k | double bytes_written_per_sec = 0; |
1208 | | |
1209 | 4.85k | const uint64_t bytes_read_non_output_and_blob = |
1210 | 4.85k | stats.bytes_read_non_output_levels + stats.bytes_read_blob; |
1211 | 4.85k | const uint64_t bytes_read_all = |
1212 | 4.85k | stats.bytes_read_output_level + bytes_read_non_output_and_blob; |
1213 | 4.85k | const uint64_t bytes_written_all = |
1214 | 4.85k | stats.bytes_written + stats.bytes_written_blob; |
1215 | | |
1216 | 4.85k | if (bytes_read_non_output_and_blob > 0) { |
1217 | 3.25k | read_write_amp = (bytes_written_all + bytes_read_all) / |
1218 | 3.25k | static_cast<double>(bytes_read_non_output_and_blob); |
1219 | 3.25k | write_amp = |
1220 | 3.25k | bytes_written_all / static_cast<double>(bytes_read_non_output_and_blob); |
1221 | 3.25k | } |
1222 | 4.85k | if (stats.micros > 0) { |
1223 | 4.85k | bytes_read_per_sec = bytes_read_all / static_cast<double>(stats.micros); |
1224 | 4.85k | bytes_written_per_sec = |
1225 | 4.85k | bytes_written_all / static_cast<double>(stats.micros); |
1226 | 4.85k | } |
1227 | | |
1228 | 4.85k | const std::string& column_family_name = cfd->GetName(); |
1229 | | |
1230 | 4.85k | constexpr double kMB = 1048576.0; |
1231 | | |
1232 | 4.85k | ROCKS_LOG_BUFFER( |
1233 | 4.85k | log_buffer_, |
1234 | 4.85k | "[%s] compacted to: %s, MB/sec: %.1f rd, %.1f wr, level %d, " |
1235 | 4.85k | "files in(%d, %d) filtered(%d, %d) out(%d +%d blob) " |
1236 | 4.85k | "MB in(%.1f, %.1f +%.1f blob) filtered(%.1f, %.1f) out(%.1f +%.1f blob), " |
1237 | 4.85k | "read-write-amplify(%.1f) write-amplify(%.1f) %s, records in: %" PRIu64 |
1238 | 4.85k | ", records dropped: %" PRIu64 " output_compression: %s\n", |
1239 | 4.85k | column_family_name.c_str(), vstorage->LevelSummary(&tmp), |
1240 | 4.85k | bytes_read_per_sec, bytes_written_per_sec, |
1241 | 4.85k | compact_->compaction->output_level(), |
1242 | 4.85k | stats.num_input_files_in_non_output_levels, |
1243 | 4.85k | stats.num_input_files_in_output_level, |
1244 | 4.85k | stats.num_filtered_input_files_in_non_output_levels, |
1245 | 4.85k | stats.num_filtered_input_files_in_output_level, stats.num_output_files, |
1246 | 4.85k | stats.num_output_files_blob, stats.bytes_read_non_output_levels / kMB, |
1247 | 4.85k | stats.bytes_read_output_level / kMB, stats.bytes_read_blob / kMB, |
1248 | 4.85k | stats.bytes_skipped_non_output_levels / kMB, |
1249 | 4.85k | stats.bytes_skipped_output_level / kMB, stats.bytes_written / kMB, |
1250 | 4.85k | stats.bytes_written_blob / kMB, read_write_amp, write_amp, |
1251 | 4.85k | status.ToString().c_str(), stats.num_input_records, |
1252 | 4.85k | stats.num_dropped_records, |
1253 | 4.85k | CompressionTypeToString(compact_->compaction->output_compression()) |
1254 | 4.85k | .c_str()); |
1255 | | |
1256 | 4.85k | const auto& blob_files = vstorage->GetBlobFiles(); |
1257 | 4.85k | if (!blob_files.empty()) { |
1258 | 0 | assert(blob_files.front()); |
1259 | 0 | assert(blob_files.back()); |
1260 | |
|
1261 | 0 | ROCKS_LOG_BUFFER( |
1262 | 0 | log_buffer_, |
1263 | 0 | "[%s] Blob file summary: head=%" PRIu64 ", tail=%" PRIu64 "\n", |
1264 | 0 | column_family_name.c_str(), blob_files.front()->GetBlobFileNumber(), |
1265 | 0 | blob_files.back()->GetBlobFileNumber()); |
1266 | 0 | } |
1267 | | |
1268 | 4.85k | if (internal_stats_.has_proximal_level_output) { |
1269 | 0 | ROCKS_LOG_BUFFER(log_buffer_, |
1270 | 0 | "[%s] has Proximal Level output: %" PRIu64 |
1271 | 0 | ", level %d, number of files: %" PRIu64 |
1272 | 0 | ", number of records: %" PRIu64, |
1273 | 0 | column_family_name.c_str(), |
1274 | 0 | internal_stats_.proximal_level_stats.bytes_written, |
1275 | 0 | compact_->compaction->GetProximalLevel(), |
1276 | 0 | internal_stats_.proximal_level_stats.num_output_files, |
1277 | 0 | internal_stats_.proximal_level_stats.num_output_records); |
1278 | 0 | } |
1279 | | |
1280 | 4.85k | TEST_SYNC_POINT_CALLBACK( |
1281 | 4.85k | "CompactionJob::Install:AfterUpdateCompactionJobStats", job_stats_); |
1282 | | |
1283 | 4.85k | auto stream = event_logger_->LogToBuffer(log_buffer_, 8192); |
1284 | 4.85k | stream << "job" << job_id_ << "event" << "compaction_finished" |
1285 | 4.85k | << "compaction_time_micros" << stats.micros |
1286 | 4.85k | << "compaction_time_cpu_micros" << stats.cpu_micros << "output_level" |
1287 | 4.85k | << compact_->compaction->output_level() << "num_output_files" |
1288 | 4.85k | << stats.num_output_files << "total_output_size" |
1289 | 4.85k | << stats.bytes_written; |
1290 | | |
1291 | 4.85k | if (stats.num_output_files_blob > 0) { |
1292 | 0 | stream << "num_blob_output_files" << stats.num_output_files_blob |
1293 | 0 | << "total_blob_output_size" << stats.bytes_written_blob; |
1294 | 0 | } |
1295 | | |
1296 | 4.85k | stream << "num_input_records" << stats.num_input_records |
1297 | 4.85k | << "num_output_records" << stats.num_output_records |
1298 | 4.85k | << "num_subcompactions" << compact_->sub_compact_states.size() |
1299 | 4.85k | << "output_compression" |
1300 | 4.85k | << CompressionTypeToString(compact_->compaction->output_compression()); |
1301 | | |
1302 | 4.85k | stream << "num_single_delete_mismatches" |
1303 | 4.85k | << job_stats_->num_single_del_mismatch; |
1304 | 4.85k | stream << "num_single_delete_fallthrough" |
1305 | 4.85k | << job_stats_->num_single_del_fallthru; |
1306 | | |
1307 | 4.85k | if (measure_io_stats_) { |
1308 | 0 | stream << "file_write_nanos" << job_stats_->file_write_nanos; |
1309 | 0 | stream << "file_range_sync_nanos" << job_stats_->file_range_sync_nanos; |
1310 | 0 | stream << "file_fsync_nanos" << job_stats_->file_fsync_nanos; |
1311 | 0 | stream << "file_prepare_write_nanos" |
1312 | 0 | << job_stats_->file_prepare_write_nanos; |
1313 | 0 | } |
1314 | | |
1315 | 4.85k | stream << "lsm_state"; |
1316 | 4.85k | stream.StartArray(); |
1317 | 38.8k | for (int level = 0; level < vstorage->num_levels(); ++level) { |
1318 | 33.9k | stream << vstorage->NumLevelFiles(level); |
1319 | 33.9k | } |
1320 | 4.85k | stream.EndArray(); |
1321 | | |
1322 | 4.85k | if (!blob_files.empty()) { |
1323 | 0 | assert(blob_files.front()); |
1324 | 0 | stream << "blob_file_head" << blob_files.front()->GetBlobFileNumber(); |
1325 | |
|
1326 | 0 | assert(blob_files.back()); |
1327 | 0 | stream << "blob_file_tail" << blob_files.back()->GetBlobFileNumber(); |
1328 | 0 | } |
1329 | | |
1330 | 4.85k | if (internal_stats_.has_proximal_level_output) { |
1331 | 0 | InternalStats::CompactionStats& pl_stats = |
1332 | 0 | internal_stats_.proximal_level_stats; |
1333 | 0 | stream << "proximal_level_num_output_files" << pl_stats.num_output_files; |
1334 | 0 | stream << "proximal_level_bytes_written" << pl_stats.bytes_written; |
1335 | 0 | stream << "proximal_level_num_output_records" |
1336 | 0 | << pl_stats.num_output_records; |
1337 | 0 | stream << "proximal_level_num_output_files_blob" |
1338 | 0 | << pl_stats.num_output_files_blob; |
1339 | 0 | stream << "proximal_level_bytes_written_blob" |
1340 | 0 | << pl_stats.bytes_written_blob; |
1341 | 0 | } |
1342 | | |
1343 | | // Propagate Install failure to compact_->status so that |
1344 | | // CleanupCompaction() -> SubcompactionState::Cleanup() sees the failure and |
1345 | | // calls ReleaseObsolete on output files' table cache entries. Without this, |
1346 | | // if Run() succeeds but InstallCompactionResults() fails, Cleanup would see |
1347 | | // overall_status = OK and skip ReleaseObsolete, leaking entries for output |
1348 | | // files that were never installed into any Version. |
1349 | 4.85k | if (!status.ok() && compact_->status.ok()) { |
1350 | 0 | compact_->status = status; |
1351 | 0 | } |
1352 | | |
1353 | 4.85k | CleanupCompaction(); |
1354 | 4.85k | return status; |
1355 | 4.85k | } |
1356 | | |
1357 | | void CompactionJob::NotifyOnSubcompactionBegin( |
1358 | 4.85k | SubcompactionState* sub_compact) { |
1359 | 4.85k | Compaction* c = compact_->compaction; |
1360 | | |
1361 | 4.85k | if (db_options_.listeners.empty()) { |
1362 | 4.85k | return; |
1363 | 4.85k | } |
1364 | 0 | if (shutting_down_->load(std::memory_order_acquire)) { |
1365 | 0 | return; |
1366 | 0 | } |
1367 | 0 | if (c->is_manual_compaction() && |
1368 | 0 | manual_compaction_canceled_.load(std::memory_order_acquire)) { |
1369 | 0 | return; |
1370 | 0 | } |
1371 | | |
1372 | 0 | sub_compact->notify_on_subcompaction_completion = true; |
1373 | |
|
1374 | 0 | SubcompactionJobInfo info{}; |
1375 | 0 | sub_compact->BuildSubcompactionJobInfo(info); |
1376 | 0 | info.job_id = static_cast<int>(job_id_); |
1377 | 0 | info.thread_id = env_->GetThreadID(); |
1378 | |
|
1379 | 0 | for (const auto& listener : db_options_.listeners) { |
1380 | 0 | listener->OnSubcompactionBegin(info); |
1381 | 0 | } |
1382 | 0 | info.status.PermitUncheckedError(); |
1383 | 0 | } |
1384 | | |
1385 | | void CompactionJob::NotifyOnSubcompactionCompleted( |
1386 | 4.85k | SubcompactionState* sub_compact) { |
1387 | 4.85k | if (db_options_.listeners.empty()) { |
1388 | 4.85k | return; |
1389 | 4.85k | } |
1390 | 0 | if (shutting_down_->load(std::memory_order_acquire)) { |
1391 | 0 | return; |
1392 | 0 | } |
1393 | | |
1394 | 0 | if (sub_compact->notify_on_subcompaction_completion == false) { |
1395 | 0 | return; |
1396 | 0 | } |
1397 | | |
1398 | 0 | SubcompactionJobInfo info{}; |
1399 | 0 | sub_compact->BuildSubcompactionJobInfo(info); |
1400 | 0 | info.job_id = static_cast<int>(job_id_); |
1401 | 0 | info.thread_id = env_->GetThreadID(); |
1402 | |
|
1403 | 0 | for (const auto& listener : db_options_.listeners) { |
1404 | 0 | listener->OnSubcompactionCompleted(info); |
1405 | 0 | } |
1406 | 0 | } |
1407 | | |
1408 | 4.85k | bool CompactionJob::ShouldUseLocalCompaction(SubcompactionState* sub_compact) { |
1409 | 4.85k | if (db_options_.compaction_service) { |
1410 | 0 | CompactionServiceJobStatus comp_status = |
1411 | 0 | ProcessKeyValueCompactionWithCompactionService(sub_compact); |
1412 | 0 | if (comp_status != CompactionServiceJobStatus::kUseLocal) { |
1413 | 0 | return false; |
1414 | 0 | } |
1415 | | // fallback to local compaction |
1416 | 0 | assert(comp_status == CompactionServiceJobStatus::kUseLocal); |
1417 | 0 | sub_compact->compaction_job_stats.is_remote_compaction = false; |
1418 | 0 | } |
1419 | 4.85k | return true; |
1420 | 4.85k | } |
1421 | | |
1422 | 4.85k | CompactionJob::CompactionIOStatsSnapshot CompactionJob::InitializeIOStats() { |
1423 | 4.85k | CompactionIOStatsSnapshot io_stats; |
1424 | | |
1425 | 4.85k | if (measure_io_stats_) { |
1426 | 0 | io_stats.prev_perf_level = GetPerfLevel(); |
1427 | 0 | SetPerfLevel(PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); |
1428 | 0 | io_stats.prev_write_nanos = IOSTATS(write_nanos); |
1429 | 0 | io_stats.prev_fsync_nanos = IOSTATS(fsync_nanos); |
1430 | 0 | io_stats.prev_range_sync_nanos = IOSTATS(range_sync_nanos); |
1431 | 0 | io_stats.prev_prepare_write_nanos = IOSTATS(prepare_write_nanos); |
1432 | 0 | io_stats.prev_cpu_write_nanos = IOSTATS(cpu_write_nanos); |
1433 | 0 | io_stats.prev_cpu_read_nanos = IOSTATS(cpu_read_nanos); |
1434 | 0 | } |
1435 | | |
1436 | 4.85k | return io_stats; |
1437 | 4.85k | } |
1438 | | |
1439 | | Status CompactionJob::SetupAndValidateCompactionFilter( |
1440 | | SubcompactionState* sub_compact, |
1441 | | const CompactionFilter* configured_compaction_filter, |
1442 | | const CompactionFilter*& compaction_filter, |
1443 | 4.85k | std::unique_ptr<CompactionFilter>& compaction_filter_from_factory) { |
1444 | 4.85k | compaction_filter = configured_compaction_filter; |
1445 | | |
1446 | 4.85k | if (compaction_filter == nullptr) { |
1447 | 4.85k | compaction_filter_from_factory = |
1448 | 4.85k | sub_compact->compaction->CreateCompactionFilter(); |
1449 | 4.85k | compaction_filter = compaction_filter_from_factory.get(); |
1450 | 4.85k | } |
1451 | | |
1452 | 4.85k | if (compaction_filter != nullptr && !compaction_filter->IgnoreSnapshots()) { |
1453 | 0 | return Status::NotSupported( |
1454 | 0 | "CompactionFilter::IgnoreSnapshots() = false is not supported " |
1455 | 0 | "anymore."); |
1456 | 0 | } |
1457 | | |
1458 | 4.85k | return Status::OK(); |
1459 | 4.85k | } |
1460 | | |
1461 | | void CompactionJob::InitializeReadOptionsAndBoundaries( |
1462 | | const size_t ts_sz, ReadOptions& read_options, |
1463 | 4.85k | SubcompactionKeyBoundaries& boundaries) { |
1464 | 4.85k | read_options.verify_checksums = true; |
1465 | 4.85k | read_options.fill_cache = false; |
1466 | 4.85k | read_options.rate_limiter_priority = GetRateLimiterPriority(); |
1467 | 4.85k | read_options.io_activity = Env::IOActivity::kCompaction; |
1468 | | // Compaction iterators shouldn't be confined to a single prefix. |
1469 | | // Compactions use Seek() for |
1470 | | // (a) concurrent compactions, |
1471 | | // (b) CompactionFilter::Decision::kRemoveAndSkipUntil. |
1472 | 4.85k | read_options.total_order_seek = true; |
1473 | | |
1474 | | // Remove the timestamps from boundaries because boundaries created in |
1475 | | // GenSubcompactionBoundaries doesn't strip away the timestamp. |
1476 | 4.85k | if (boundaries.start.has_value()) { |
1477 | 0 | read_options.iterate_lower_bound = &(*boundaries.start); |
1478 | 0 | if (ts_sz > 0) { |
1479 | 0 | boundaries.start_without_ts = |
1480 | 0 | StripTimestampFromUserKey(*boundaries.start, ts_sz); |
1481 | 0 | read_options.iterate_lower_bound = &(*boundaries.start_without_ts); |
1482 | 0 | } |
1483 | 0 | } |
1484 | 4.85k | if (boundaries.end.has_value()) { |
1485 | 0 | read_options.iterate_upper_bound = &(*boundaries.end); |
1486 | 0 | if (ts_sz > 0) { |
1487 | 0 | boundaries.end_without_ts = |
1488 | 0 | StripTimestampFromUserKey(*boundaries.end, ts_sz); |
1489 | 0 | read_options.iterate_upper_bound = &(*boundaries.end_without_ts); |
1490 | 0 | } |
1491 | 0 | } |
1492 | | |
1493 | 4.85k | if (ts_sz > 0) { |
1494 | 0 | if (ts_sz <= strlen(boundaries.kMaxTs)) { |
1495 | 0 | boundaries.ts_slice = Slice(boundaries.kMaxTs, ts_sz); |
1496 | 0 | } else { |
1497 | 0 | boundaries.max_ts = std::string(ts_sz, '\xff'); |
1498 | 0 | boundaries.ts_slice = Slice(boundaries.max_ts); |
1499 | 0 | } |
1500 | 0 | } |
1501 | 4.85k | if (boundaries.start.has_value()) { |
1502 | 0 | boundaries.start_ikey.SetInternalKey(*boundaries.start, kMaxSequenceNumber, |
1503 | 0 | kValueTypeForSeek); |
1504 | 0 | if (ts_sz > 0) { |
1505 | 0 | boundaries.start_ikey.UpdateInternalKey( |
1506 | 0 | kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice); |
1507 | 0 | } |
1508 | 0 | boundaries.start_internal_key = boundaries.start_ikey.GetInternalKey(); |
1509 | 0 | boundaries.start_user_key = boundaries.start_ikey.GetUserKey(); |
1510 | 0 | } |
1511 | 4.85k | if (boundaries.end.has_value()) { |
1512 | 0 | boundaries.end_ikey.SetInternalKey(*boundaries.end, kMaxSequenceNumber, |
1513 | 0 | kValueTypeForSeek); |
1514 | 0 | if (ts_sz > 0) { |
1515 | 0 | boundaries.end_ikey.UpdateInternalKey( |
1516 | 0 | kMaxSequenceNumber, kValueTypeForSeek, &boundaries.ts_slice); |
1517 | 0 | } |
1518 | 0 | boundaries.end_internal_key = boundaries.end_ikey.GetInternalKey(); |
1519 | 0 | boundaries.end_user_key = boundaries.end_ikey.GetUserKey(); |
1520 | 0 | } |
1521 | 4.85k | } |
1522 | | |
1523 | | InternalIterator* CompactionJob::CreateInputIterator( |
1524 | | SubcompactionState* sub_compact, ColumnFamilyData* cfd, |
1525 | | SubcompactionInternalIterators& iterators, |
1526 | 4.85k | SubcompactionKeyBoundaries& boundaries, ReadOptions& read_options) { |
1527 | 4.85k | const size_t ts_sz = cfd->user_comparator()->timestamp_size(); |
1528 | 4.85k | InitializeReadOptionsAndBoundaries(ts_sz, read_options, boundaries); |
1529 | | |
1530 | | // This is assigned after creation of SubcompactionState to simplify that |
1531 | | // creation across both CompactionJob and CompactionServiceCompactionJob |
1532 | 4.85k | sub_compact->AssignRangeDelAggregator( |
1533 | 4.85k | std::make_unique<CompactionRangeDelAggregator>( |
1534 | 4.85k | &cfd->internal_comparator(), job_context_->snapshot_seqs, |
1535 | 4.85k | &full_history_ts_low_, &trim_ts_)); |
1536 | | |
1537 | | // Although the v2 aggregator is what the level iterator(s) know about, |
1538 | | // the AddTombstones calls will be propagated down to the v1 aggregator. |
1539 | 4.85k | iterators.raw_input = |
1540 | 4.85k | std::unique_ptr<InternalIterator>(versions_->MakeInputIterator( |
1541 | 4.85k | read_options, sub_compact->compaction, sub_compact->RangeDelAgg(), |
1542 | 4.85k | file_options_for_read_, boundaries.start, boundaries.end)); |
1543 | 4.85k | InternalIterator* input = iterators.raw_input.get(); |
1544 | | |
1545 | 4.85k | if (boundaries.start.has_value() || boundaries.end.has_value()) { |
1546 | 0 | iterators.clip = std::make_unique<ClippingIterator>( |
1547 | 0 | iterators.raw_input.get(), |
1548 | 0 | boundaries.start.has_value() ? &boundaries.start_internal_key : nullptr, |
1549 | 0 | boundaries.end.has_value() ? &boundaries.end_internal_key : nullptr, |
1550 | 0 | &cfd->internal_comparator()); |
1551 | 0 | input = iterators.clip.get(); |
1552 | 0 | } |
1553 | | |
1554 | 4.85k | if (sub_compact->compaction->DoesInputReferenceBlobFiles()) { |
1555 | 0 | BlobGarbageMeter* meter = sub_compact->Current().CreateBlobGarbageMeter(); |
1556 | 0 | iterators.blob_counter = |
1557 | 0 | std::make_unique<BlobCountingIterator>(input, meter); |
1558 | 0 | input = iterators.blob_counter.get(); |
1559 | 0 | } |
1560 | | |
1561 | 4.85k | if (ts_sz > 0 && !trim_ts_.empty()) { |
1562 | 0 | iterators.trim_history_iter = std::make_unique<HistoryTrimmingIterator>( |
1563 | 0 | input, cfd->user_comparator(), trim_ts_); |
1564 | 0 | input = iterators.trim_history_iter.get(); |
1565 | 0 | } |
1566 | | |
1567 | 4.85k | return input; |
1568 | 4.85k | } |
1569 | | |
1570 | | void CompactionJob::CreateBlobFileBuilder( |
1571 | | SubcompactionState* sub_compact, ColumnFamilyData* cfd, |
1572 | | std::unique_ptr<BlobFileBuilder>& blob_file_builder, |
1573 | 4.85k | const WriteOptions& write_options) { |
1574 | 4.85k | const auto& mutable_cf_options = |
1575 | 4.85k | sub_compact->compaction->mutable_cf_options(); |
1576 | | |
1577 | | // TODO: BlobDB to support output_to_proximal_level compaction, which needs |
1578 | | // 2 builders, so may need to move to `CompactionOutputs` |
1579 | 4.85k | if (mutable_cf_options.enable_blob_files && |
1580 | 0 | sub_compact->compaction->output_level() >= |
1581 | 0 | mutable_cf_options.blob_file_starting_level) { |
1582 | 0 | blob_file_builder = std::make_unique<BlobFileBuilder>( |
1583 | 0 | versions_, fs_.get(), &sub_compact->compaction->immutable_options(), |
1584 | 0 | &mutable_cf_options, &file_options_, &write_options, db_id_, |
1585 | 0 | db_session_id_, job_id_, cfd->GetID(), cfd->GetName(), write_hint_, |
1586 | 0 | io_tracer_, blob_callback_, BlobFileCreationReason::kCompaction, |
1587 | 0 | sub_compact->Current().GetOutputFilePathsPtr(), |
1588 | 0 | sub_compact->Current().GetBlobFileAdditionsPtr()); |
1589 | 4.85k | } else { |
1590 | 4.85k | blob_file_builder = nullptr; |
1591 | 4.85k | } |
1592 | 4.85k | } |
1593 | | |
1594 | | std::unique_ptr<CompactionIterator> CompactionJob::CreateCompactionIterator( |
1595 | | SubcompactionState* sub_compact, ColumnFamilyData* cfd, |
1596 | | InternalIterator* input, const CompactionFilter* compaction_filter, |
1597 | | MergeHelper& merge, std::unique_ptr<BlobFileBuilder>& blob_file_builder, |
1598 | 4.85k | const WriteOptions& write_options) { |
1599 | 4.85k | CreateBlobFileBuilder(sub_compact, cfd, blob_file_builder, write_options); |
1600 | | |
1601 | 4.85k | const std::string* const full_history_ts_low = |
1602 | 4.85k | full_history_ts_low_.empty() ? nullptr : &full_history_ts_low_; |
1603 | 4.85k | assert(job_context_); |
1604 | | |
1605 | 4.85k | return std::make_unique<CompactionIterator>( |
1606 | 4.85k | input, cfd->user_comparator(), &merge, versions_->LastSequence(), |
1607 | 4.85k | &(job_context_->snapshot_seqs), earliest_snapshot_, |
1608 | 4.85k | job_context_->earliest_write_conflict_snapshot, |
1609 | 4.85k | job_context_->GetJobSnapshotSequence(), job_context_->snapshot_checker, |
1610 | 4.85k | env_, ShouldReportDetailedTime(env_, stats_), sub_compact->RangeDelAgg(), |
1611 | 4.85k | blob_file_builder.get(), db_options_.allow_data_in_errors, |
1612 | 4.85k | db_options_.enforce_single_del_contracts, manual_compaction_canceled_, |
1613 | 4.85k | sub_compact->compaction |
1614 | 4.85k | ->DoesInputReferenceBlobFiles() /* must_count_input_entries */, |
1615 | 4.85k | sub_compact->compaction, compaction_filter, shutting_down_, |
1616 | 4.85k | db_options_.info_log, full_history_ts_low, preserve_seqno_after_); |
1617 | 4.85k | } |
1618 | | |
1619 | | std::pair<CompactionFileOpenFunc, CompactionFileCloseFunc> |
1620 | | CompactionJob::CreateFileHandlers(SubcompactionState* sub_compact, |
1621 | 4.85k | SubcompactionKeyBoundaries& boundaries) { |
1622 | 4.85k | const CompactionFileOpenFunc open_file_func = |
1623 | 4.85k | [this, sub_compact](CompactionOutputs& outputs) { |
1624 | 2.28k | return this->OpenCompactionOutputFile(sub_compact, outputs); |
1625 | 2.28k | }; |
1626 | | |
1627 | 4.85k | const Slice* start_user_key = |
1628 | 4.85k | sub_compact->start.has_value() ? &boundaries.start_user_key : nullptr; |
1629 | 4.85k | const Slice* end_user_key = |
1630 | 4.85k | sub_compact->end.has_value() ? &boundaries.end_user_key : nullptr; |
1631 | | |
1632 | 4.85k | const CompactionFileCloseFunc close_file_func = |
1633 | 4.85k | [this, sub_compact, start_user_key, end_user_key]( |
1634 | 4.85k | const Status& status, |
1635 | 4.85k | const ParsedInternalKey& prev_iter_output_internal_key, |
1636 | 4.85k | const Slice& next_table_min_key, const CompactionIterator* c_iter, |
1637 | 4.85k | CompactionOutputs& outputs) { |
1638 | 2.28k | return this->FinishCompactionOutputFile( |
1639 | 2.28k | status, prev_iter_output_internal_key, next_table_min_key, |
1640 | 2.28k | start_user_key, end_user_key, c_iter, sub_compact, outputs); |
1641 | 2.28k | }; |
1642 | | |
1643 | 4.85k | return {open_file_func, close_file_func}; |
1644 | 4.85k | } |
1645 | | |
1646 | | Status CompactionJob::ProcessKeyValue( |
1647 | | SubcompactionState* sub_compact, ColumnFamilyData* cfd, |
1648 | | CompactionIterator* c_iter, const CompactionFileOpenFunc& open_file_func, |
1649 | 4.85k | const CompactionFileCloseFunc& close_file_func, uint64_t& prev_cpu_micros) { |
1650 | | // Cron interval for periodic operations: stats update, abort check, |
1651 | | // and sync points. Uses 1024 (power of 2) for efficient bitwise check. |
1652 | 4.85k | const uint64_t kCronEveryMask = (1 << 10) - 1; |
1653 | 4.85k | [[maybe_unused]] const std::optional<const Slice> end = sub_compact->end; |
1654 | | |
1655 | | // Check for abort signal before starting key processing |
1656 | 4.85k | if (compaction_aborted_.load(std::memory_order_acquire) > 0) { |
1657 | 0 | return Status::Incomplete(Status::SubCode::kCompactionAborted); |
1658 | 0 | } |
1659 | | |
1660 | 4.85k | Status status; |
1661 | 4.85k | IterKey prev_iter_output_key; |
1662 | 4.85k | ParsedInternalKey prev_iter_output_internal_key; |
1663 | | |
1664 | 4.85k | TEST_SYNC_POINT_CALLBACK( |
1665 | 4.85k | "CompactionJob::ProcessKeyValueCompaction()::Processing", |
1666 | 4.85k | static_cast<void*>(const_cast<Compaction*>(sub_compact->compaction))); |
1667 | | |
1668 | 7.48k | while (status.ok() && !cfd->IsDropped() && c_iter->Valid() && |
1669 | 2.63k | c_iter->status().ok()) { |
1670 | 2.63k | assert(!end.has_value() || |
1671 | 2.63k | cfd->user_comparator()->Compare(c_iter->user_key(), *end) < 0); |
1672 | | |
1673 | 2.63k | const uint64_t num_records = c_iter->iter_stats().num_input_records; |
1674 | | |
1675 | | // Periodic cron operations: stats update, abort check. |
1676 | 2.63k | if ((num_records & kCronEveryMask) == kCronEveryMask) { |
1677 | | // Check for abort signal periodically |
1678 | 0 | if (compaction_aborted_.load(std::memory_order_acquire) > 0) { |
1679 | 0 | status = Status::Incomplete(Status::SubCode::kCompactionAborted); |
1680 | 0 | break; |
1681 | 0 | } |
1682 | | |
1683 | 0 | UpdateSubcompactionJobStatsIncrementally( |
1684 | 0 | c_iter, &sub_compact->compaction_job_stats, |
1685 | 0 | db_options_.clock->CPUMicros(), prev_cpu_micros); |
1686 | 0 | } |
1687 | | |
1688 | 2.63k | const auto& ikey = c_iter->ikey(); |
1689 | 2.63k | bool use_proximal_output = ikey.sequence > proximal_after_seqno_; |
1690 | | |
1691 | | #ifndef NDEBUG |
1692 | | if (sub_compact->compaction->SupportsPerKeyPlacement()) { |
1693 | | PerKeyPlacementContext context(sub_compact->compaction->output_level(), |
1694 | | ikey.user_key, c_iter->value(), |
1695 | | ikey.sequence, use_proximal_output); |
1696 | | TEST_SYNC_POINT_CALLBACK("CompactionIterator::PrepareOutput.context", |
1697 | | &context); |
1698 | | if (use_proximal_output) { |
1699 | | // Verify that entries sent to the proximal level are within the |
1700 | | // allowed range (because the input key range of the last level could |
1701 | | // be larger than the allowed output key range of the proximal |
1702 | | // level). This check uses user keys (ignores sequence numbers) because |
1703 | | // compaction boundaries are a "clean cut" between user keys (see |
1704 | | // CompactionPicker::ExpandInputsToCleanCut()), which is especially |
1705 | | // important when preferred sequence numbers has been swapped in for |
1706 | | // kTypeValuePreferredSeqno / TimedPut. |
1707 | | sub_compact->compaction->TEST_AssertWithinProximalLevelOutputRange( |
1708 | | c_iter->user_key()); |
1709 | | } |
1710 | | } else { |
1711 | | assert(proximal_after_seqno_ == kMaxSequenceNumber); |
1712 | | assert(!use_proximal_output); |
1713 | | } |
1714 | | #endif // NDEBUG |
1715 | | |
1716 | | // Add current compaction_iterator key to target compaction output, if the |
1717 | | // output file needs to be close or open, it will call the `open_file_func` |
1718 | | // and `close_file_func`. |
1719 | | // TODO: it would be better to have the compaction file open/close moved |
1720 | | // into `CompactionOutputs` which has the output file information. |
1721 | 2.63k | status = sub_compact->AddToOutput(*c_iter, use_proximal_output, |
1722 | 2.63k | open_file_func, close_file_func, |
1723 | 2.63k | prev_iter_output_internal_key); |
1724 | 2.63k | if (!status.ok()) { |
1725 | 0 | break; |
1726 | 0 | } |
1727 | | |
1728 | 2.63k | TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:2", |
1729 | 2.63k | static_cast<void*>(const_cast<std::atomic<bool>*>( |
1730 | 2.63k | &manual_compaction_canceled_))); |
1731 | | |
1732 | 2.63k | prev_iter_output_key.SetInternalKey(c_iter->key(), |
1733 | 2.63k | &prev_iter_output_internal_key); |
1734 | 2.63k | prev_iter_output_internal_key.sequence = ikey.sequence; |
1735 | 2.63k | prev_iter_output_internal_key.type = ikey.type; |
1736 | 2.63k | c_iter->Next(); |
1737 | | |
1738 | | #ifndef NDEBUG |
1739 | | bool stop = false; |
1740 | | TEST_SYNC_POINT_CALLBACK("CompactionJob::ProcessKeyValueCompaction()::stop", |
1741 | | static_cast<void*>(&stop)); |
1742 | | if (stop) { |
1743 | | break; |
1744 | | } |
1745 | | #endif // NDEBUG |
1746 | 2.63k | } |
1747 | | |
1748 | 4.85k | return status; |
1749 | 4.85k | } |
1750 | | |
1751 | | void CompactionJob::UpdateSubcompactionJobStatsIncrementally( |
1752 | | CompactionIterator* c_iter, CompactionJobStats* compaction_job_stats, |
1753 | 4.85k | uint64_t cur_cpu_micros, uint64_t& prev_cpu_micros) { |
1754 | 4.85k | RecordDroppedKeys(c_iter->iter_stats(), compaction_job_stats); |
1755 | 4.85k | c_iter->ResetRecordCounts(); |
1756 | 4.85k | RecordCompactionIOStats(); |
1757 | | |
1758 | 4.85k | assert(cur_cpu_micros >= prev_cpu_micros); |
1759 | 4.85k | RecordTick(stats_, COMPACTION_CPU_TOTAL_TIME, |
1760 | 4.85k | cur_cpu_micros - prev_cpu_micros); |
1761 | 4.85k | prev_cpu_micros = cur_cpu_micros; |
1762 | 4.85k | } |
1763 | | |
1764 | | void CompactionJob::FinalizeSubcompactionJobStats( |
1765 | | SubcompactionState* sub_compact, CompactionIterator* c_iter, |
1766 | | uint64_t start_cpu_micros, uint64_t prev_cpu_micros, |
1767 | 4.85k | const CompactionIOStatsSnapshot& io_stats) { |
1768 | 4.85k | const CompactionIterationStats& c_iter_stats = c_iter->iter_stats(); |
1769 | | |
1770 | 4.85k | assert(!sub_compact->compaction->DoesInputReferenceBlobFiles() || |
1771 | 4.85k | c_iter->HasNumInputEntryScanned()); |
1772 | 4.85k | sub_compact->compaction_job_stats.has_accurate_num_input_records &= |
1773 | 4.85k | c_iter->HasNumInputEntryScanned(); |
1774 | 4.85k | sub_compact->compaction_job_stats.num_input_records += |
1775 | 4.85k | c_iter->NumInputEntryScanned(); |
1776 | 4.85k | sub_compact->compaction_job_stats.num_blobs_read = |
1777 | 4.85k | c_iter_stats.num_blobs_read; |
1778 | 4.85k | sub_compact->compaction_job_stats.total_blob_bytes_read = |
1779 | 4.85k | c_iter_stats.total_blob_bytes_read; |
1780 | 4.85k | sub_compact->compaction_job_stats.num_input_deletion_records = |
1781 | 4.85k | c_iter_stats.num_input_deletion_records; |
1782 | 4.85k | sub_compact->compaction_job_stats.num_corrupt_keys = |
1783 | 4.85k | c_iter_stats.num_input_corrupt_records; |
1784 | 4.85k | sub_compact->compaction_job_stats.num_single_del_fallthru = |
1785 | 4.85k | c_iter_stats.num_single_del_fallthru; |
1786 | 4.85k | sub_compact->compaction_job_stats.num_single_del_mismatch = |
1787 | 4.85k | c_iter_stats.num_single_del_mismatch; |
1788 | 4.85k | sub_compact->compaction_job_stats.total_input_raw_key_bytes += |
1789 | 4.85k | c_iter_stats.total_input_raw_key_bytes; |
1790 | 4.85k | sub_compact->compaction_job_stats.total_input_raw_value_bytes += |
1791 | 4.85k | c_iter_stats.total_input_raw_value_bytes; |
1792 | | |
1793 | 4.85k | RecordTick(stats_, FILTER_OPERATION_TOTAL_TIME, |
1794 | 4.85k | c_iter_stats.total_filter_time); |
1795 | | |
1796 | 4.85k | if (c_iter_stats.num_blobs_relocated > 0) { |
1797 | 0 | RecordTick(stats_, BLOB_DB_GC_NUM_KEYS_RELOCATED, |
1798 | 0 | c_iter_stats.num_blobs_relocated); |
1799 | 0 | } |
1800 | 4.85k | if (c_iter_stats.total_blob_bytes_relocated > 0) { |
1801 | 0 | RecordTick(stats_, BLOB_DB_GC_BYTES_RELOCATED, |
1802 | 0 | c_iter_stats.total_blob_bytes_relocated); |
1803 | 0 | } |
1804 | | |
1805 | 4.85k | uint64_t cur_cpu_micros = db_options_.clock->CPUMicros(); |
1806 | | |
1807 | | // Record final compaction statistics including dropped keys, I/O stats, |
1808 | | // and CPU time delta from the last periodic measurement |
1809 | 4.85k | UpdateSubcompactionJobStatsIncrementally(c_iter, |
1810 | 4.85k | &sub_compact->compaction_job_stats, |
1811 | 4.85k | cur_cpu_micros, prev_cpu_micros); |
1812 | | |
1813 | | // Finalize timing and I/O statistics |
1814 | 4.85k | sub_compact->compaction_job_stats.cpu_micros = |
1815 | 4.85k | cur_cpu_micros - start_cpu_micros + sub_compact->GetWorkerCPUMicros(); |
1816 | | |
1817 | 4.85k | if (measure_io_stats_) { |
1818 | 0 | sub_compact->compaction_job_stats.file_write_nanos += |
1819 | 0 | IOSTATS(write_nanos) - io_stats.prev_write_nanos; |
1820 | 0 | sub_compact->compaction_job_stats.file_fsync_nanos += |
1821 | 0 | IOSTATS(fsync_nanos) - io_stats.prev_fsync_nanos; |
1822 | 0 | sub_compact->compaction_job_stats.file_range_sync_nanos += |
1823 | 0 | IOSTATS(range_sync_nanos) - io_stats.prev_range_sync_nanos; |
1824 | 0 | sub_compact->compaction_job_stats.file_prepare_write_nanos += |
1825 | 0 | IOSTATS(prepare_write_nanos) - io_stats.prev_prepare_write_nanos; |
1826 | 0 | sub_compact->compaction_job_stats.cpu_micros -= |
1827 | 0 | (IOSTATS(cpu_write_nanos) - io_stats.prev_cpu_write_nanos + |
1828 | 0 | IOSTATS(cpu_read_nanos) - io_stats.prev_cpu_read_nanos) / |
1829 | 0 | 1000; |
1830 | 0 | if (io_stats.prev_perf_level != |
1831 | 0 | PerfLevel::kEnableTimeAndCPUTimeExceptForMutex) { |
1832 | 0 | SetPerfLevel(io_stats.prev_perf_level); |
1833 | 0 | } |
1834 | 0 | } |
1835 | 4.85k | } |
1836 | | |
1837 | | Status CompactionJob::FinalizeProcessKeyValueStatus( |
1838 | | ColumnFamilyData* cfd, InternalIterator* input_iter, |
1839 | 4.85k | CompactionIterator* c_iter, Status status) { |
1840 | 4.85k | if (status.ok() && cfd->IsDropped()) { |
1841 | 0 | status = |
1842 | 0 | Status::ColumnFamilyDropped("Column family dropped during compaction"); |
1843 | 0 | } |
1844 | 4.85k | if (status.ok() && shutting_down_->load(std::memory_order_relaxed)) { |
1845 | 2.12k | status = Status::ShutdownInProgress("Database shutdown"); |
1846 | 2.12k | } |
1847 | 4.85k | if (status.ok() && |
1848 | 2.72k | (manual_compaction_canceled_.load(std::memory_order_relaxed))) { |
1849 | 0 | status = Status::Incomplete(Status::SubCode::kManualCompactionPaused); |
1850 | 0 | } |
1851 | 4.85k | if (status.ok()) { |
1852 | 2.72k | status = input_iter->status(); |
1853 | 2.72k | } |
1854 | 4.85k | if (status.ok()) { |
1855 | 2.72k | status = c_iter->status(); |
1856 | 2.72k | } |
1857 | | |
1858 | 4.85k | return status; |
1859 | 4.85k | } |
1860 | | |
1861 | | Status CompactionJob::CleanupCompactionFiles( |
1862 | | SubcompactionState* sub_compact, Status status, |
1863 | | const CompactionFileOpenFunc& open_file_func, |
1864 | 4.85k | const CompactionFileCloseFunc& close_file_func) { |
1865 | | // Call FinishCompactionOutputFile() even if status is not ok: it needs to |
1866 | | // close the output files. Open file function is also passed, in case there's |
1867 | | // only range-dels, no file was opened, to save the range-dels, it need to |
1868 | | // create a new output file. |
1869 | 4.85k | return sub_compact->CloseCompactionFiles(status, open_file_func, |
1870 | 4.85k | close_file_func); |
1871 | 4.85k | } |
1872 | | |
1873 | | Status CompactionJob::FinalizeBlobFiles(SubcompactionState* sub_compact, |
1874 | | BlobFileBuilder* blob_file_builder, |
1875 | 4.85k | Status status) { |
1876 | 4.85k | if (blob_file_builder) { |
1877 | 0 | if (status.ok()) { |
1878 | 0 | status = blob_file_builder->Finish(); |
1879 | 0 | } else { |
1880 | 0 | blob_file_builder->Abandon(status); |
1881 | 0 | } |
1882 | 0 | sub_compact->Current().UpdateBlobStats(); |
1883 | 0 | } |
1884 | | |
1885 | 4.85k | return status; |
1886 | 4.85k | } |
1887 | | |
1888 | 4.85k | void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) { |
1889 | 4.85k | TEST_SYNC_POINT("CompactionJob::ProcessKeyValueCompaction:Start"); |
1890 | 4.85k | assert(sub_compact); |
1891 | 4.85k | assert(sub_compact->compaction); |
1892 | | |
1893 | 4.85k | if (!ShouldUseLocalCompaction(sub_compact)) { |
1894 | 0 | return; |
1895 | 0 | } |
1896 | | |
1897 | 4.85k | AutoThreadOperationStageUpdater stage_updater( |
1898 | 4.85k | ThreadStatus::STAGE_COMPACTION_PROCESS_KV); |
1899 | | |
1900 | 4.85k | const uint64_t start_cpu_micros = db_options_.clock->CPUMicros(); |
1901 | 4.85k | uint64_t prev_cpu_micros = start_cpu_micros; |
1902 | 4.85k | const CompactionIOStatsSnapshot io_stats = InitializeIOStats(); |
1903 | 4.85k | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
1904 | 4.85k | const CompactionFilter* compaction_filter; |
1905 | 4.85k | std::unique_ptr<CompactionFilter> compaction_filter_from_factory = nullptr; |
1906 | 4.85k | Status filter_status = SetupAndValidateCompactionFilter( |
1907 | 4.85k | sub_compact, cfd->ioptions().compaction_filter, compaction_filter, |
1908 | 4.85k | compaction_filter_from_factory); |
1909 | 4.85k | if (!filter_status.ok()) { |
1910 | 0 | sub_compact->status = filter_status; |
1911 | 0 | return; |
1912 | 0 | } |
1913 | | |
1914 | 4.85k | NotifyOnSubcompactionBegin(sub_compact); |
1915 | | |
1916 | 4.85k | SubcompactionKeyBoundaries boundaries(sub_compact->start, sub_compact->end); |
1917 | 4.85k | SubcompactionInternalIterators iterators; |
1918 | 4.85k | ReadOptions read_options; |
1919 | 4.85k | const WriteOptions write_options(Env::IOPriority::IO_LOW, |
1920 | 4.85k | Env::IOActivity::kCompaction); |
1921 | | |
1922 | 4.85k | InternalIterator* input_iter = CreateInputIterator( |
1923 | 4.85k | sub_compact, cfd, iterators, boundaries, read_options); |
1924 | | |
1925 | 4.85k | assert(input_iter); |
1926 | | |
1927 | 4.85k | Status status = |
1928 | 4.85k | MaybeResumeSubcompactionProgressOnInputIterator(sub_compact, input_iter); |
1929 | | |
1930 | 4.85k | if (status.IsNotFound()) { |
1931 | 4.85k | input_iter->SeekToFirst(); |
1932 | 4.85k | } else if (!status.ok()) { |
1933 | 0 | sub_compact->status = status; |
1934 | 0 | return; |
1935 | 0 | } |
1936 | | |
1937 | 4.85k | MergeHelper merge( |
1938 | 4.85k | env_, cfd->user_comparator(), cfd->ioptions().merge_operator.get(), |
1939 | 4.85k | compaction_filter, db_options_.info_log.get(), |
1940 | 4.85k | false /* internal key corruption is expected */, |
1941 | 4.85k | job_context_->GetLatestSnapshotSequence(), job_context_->snapshot_checker, |
1942 | 4.85k | compact_->compaction->level(), db_options_.stats); |
1943 | 4.85k | std::unique_ptr<BlobFileBuilder> blob_file_builder; |
1944 | | |
1945 | 4.85k | auto c_iter = |
1946 | 4.85k | CreateCompactionIterator(sub_compact, cfd, input_iter, compaction_filter, |
1947 | 4.85k | merge, blob_file_builder, write_options); |
1948 | 4.85k | assert(c_iter); |
1949 | 4.85k | c_iter->SeekToFirst(); |
1950 | | |
1951 | 4.85k | TEST_SYNC_POINT("CompactionJob::Run():Inprogress"); |
1952 | 4.85k | TEST_SYNC_POINT_CALLBACK("CompactionJob::Run():PausingManualCompaction:1", |
1953 | 4.85k | static_cast<void*>(const_cast<std::atomic<bool>*>( |
1954 | 4.85k | &manual_compaction_canceled_))); |
1955 | | |
1956 | 4.85k | auto [open_file_func, close_file_func] = |
1957 | 4.85k | CreateFileHandlers(sub_compact, boundaries); |
1958 | | |
1959 | 4.85k | status = ProcessKeyValue(sub_compact, cfd, c_iter.get(), open_file_func, |
1960 | 4.85k | close_file_func, prev_cpu_micros); |
1961 | | |
1962 | 4.85k | status = FinalizeProcessKeyValueStatus(cfd, input_iter, c_iter.get(), status); |
1963 | | |
1964 | 4.85k | FinalizeSubcompaction(sub_compact, status, open_file_func, close_file_func, |
1965 | 4.85k | blob_file_builder.get(), c_iter.get(), input_iter, |
1966 | 4.85k | start_cpu_micros, prev_cpu_micros, io_stats); |
1967 | | |
1968 | 4.85k | NotifyOnSubcompactionCompleted(sub_compact); |
1969 | 4.85k | } |
1970 | | |
1971 | | void CompactionJob::FinalizeSubcompaction( |
1972 | | SubcompactionState* sub_compact, Status status, |
1973 | | const CompactionFileOpenFunc& open_file_func, |
1974 | | const CompactionFileCloseFunc& close_file_func, |
1975 | | BlobFileBuilder* blob_file_builder, CompactionIterator* c_iter, |
1976 | | [[maybe_unused]] InternalIterator* input_iter, uint64_t start_cpu_micros, |
1977 | 4.85k | uint64_t prev_cpu_micros, const CompactionIOStatsSnapshot& io_stats) { |
1978 | 4.85k | status = CleanupCompactionFiles(sub_compact, status, open_file_func, |
1979 | 4.85k | close_file_func); |
1980 | 4.85k | status = FinalizeBlobFiles(sub_compact, blob_file_builder, status); |
1981 | | |
1982 | 4.85k | FinalizeSubcompactionJobStats(sub_compact, c_iter, start_cpu_micros, |
1983 | 4.85k | prev_cpu_micros, io_stats); |
1984 | | |
1985 | | #ifdef ROCKSDB_ASSERT_STATUS_CHECKED |
1986 | | if (!status.ok()) { |
1987 | | if (c_iter) { |
1988 | | c_iter->status().PermitUncheckedError(); |
1989 | | } |
1990 | | if (input_iter) { |
1991 | | input_iter->status().PermitUncheckedError(); |
1992 | | } |
1993 | | } |
1994 | | #endif // ROCKSDB_ASSERT_STATUS_CHECKED |
1995 | | |
1996 | 4.85k | sub_compact->status = status; |
1997 | 4.85k | } |
1998 | | |
1999 | 0 | uint64_t CompactionJob::GetCompactionId(SubcompactionState* sub_compact) const { |
2000 | 0 | return (uint64_t)job_id_ << 32 | sub_compact->sub_job_id; |
2001 | 0 | } |
2002 | | |
2003 | | void CompactionJob::RecordDroppedKeys( |
2004 | | const CompactionIterationStats& c_iter_stats, |
2005 | 6.42k | CompactionJobStats* compaction_job_stats) { |
2006 | 6.42k | if (c_iter_stats.num_record_drop_user > 0) { |
2007 | 0 | RecordTick(stats_, COMPACTION_KEY_DROP_USER, |
2008 | 0 | c_iter_stats.num_record_drop_user); |
2009 | 0 | } |
2010 | 6.42k | if (c_iter_stats.num_record_drop_hidden > 0) { |
2011 | 2.13k | RecordTick(stats_, COMPACTION_KEY_DROP_NEWER_ENTRY, |
2012 | 2.13k | c_iter_stats.num_record_drop_hidden); |
2013 | 2.13k | if (compaction_job_stats) { |
2014 | 2.13k | compaction_job_stats->num_records_replaced += |
2015 | 2.13k | c_iter_stats.num_record_drop_hidden; |
2016 | 2.13k | } |
2017 | 2.13k | } |
2018 | 6.42k | if (c_iter_stats.num_record_drop_obsolete > 0) { |
2019 | 1.75k | RecordTick(stats_, COMPACTION_KEY_DROP_OBSOLETE, |
2020 | 1.75k | c_iter_stats.num_record_drop_obsolete); |
2021 | 1.75k | if (compaction_job_stats) { |
2022 | 1.75k | compaction_job_stats->num_expired_deletion_records += |
2023 | 1.75k | c_iter_stats.num_record_drop_obsolete; |
2024 | 1.75k | } |
2025 | 1.75k | } |
2026 | 6.42k | if (c_iter_stats.num_record_drop_range_del > 0) { |
2027 | 0 | RecordTick(stats_, COMPACTION_KEY_DROP_RANGE_DEL, |
2028 | 0 | c_iter_stats.num_record_drop_range_del); |
2029 | 0 | } |
2030 | 6.42k | if (c_iter_stats.num_range_del_drop_obsolete > 0) { |
2031 | 0 | RecordTick(stats_, COMPACTION_RANGE_DEL_DROP_OBSOLETE, |
2032 | 0 | c_iter_stats.num_range_del_drop_obsolete); |
2033 | 0 | } |
2034 | 6.42k | if (c_iter_stats.num_optimized_del_drop_obsolete > 0) { |
2035 | 0 | RecordTick(stats_, COMPACTION_OPTIMIZED_DEL_DROP_OBSOLETE, |
2036 | 0 | c_iter_stats.num_optimized_del_drop_obsolete); |
2037 | 0 | } |
2038 | 6.42k | } |
2039 | | |
2040 | | Status CompactionJob::FinishCompactionOutputFile( |
2041 | | const Status& input_status, |
2042 | | const ParsedInternalKey& prev_iter_output_internal_key, |
2043 | | const Slice& next_table_min_key, const Slice* comp_start_user_key, |
2044 | | const Slice* comp_end_user_key, const CompactionIterator* c_iter, |
2045 | 2.28k | SubcompactionState* sub_compact, CompactionOutputs& outputs) { |
2046 | 2.28k | AutoThreadOperationStageUpdater stage_updater( |
2047 | 2.28k | ThreadStatus::STAGE_COMPACTION_SYNC_FILE); |
2048 | 2.28k | assert(sub_compact != nullptr); |
2049 | 2.28k | assert(outputs.HasBuilder()); |
2050 | | |
2051 | 2.28k | FileMetaData* meta = outputs.GetMetaData(); |
2052 | 2.28k | uint64_t output_number = meta->fd.GetNumber(); |
2053 | 2.28k | assert(output_number != 0); |
2054 | | |
2055 | 2.28k | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
2056 | 2.28k | std::string file_checksum = kUnknownFileChecksum; |
2057 | 2.28k | std::string file_checksum_func_name = kUnknownFileChecksumFuncName; |
2058 | | |
2059 | | // Check for iterator errors |
2060 | 2.28k | Status s = input_status; |
2061 | | |
2062 | | // Add range tombstones |
2063 | 2.28k | if (s.ok()) { |
2064 | | // Inclusive lower bound, exclusive upper bound |
2065 | 1.57k | std::pair<SequenceNumber, SequenceNumber> keep_seqno_range{ |
2066 | 1.57k | 0, kMaxSequenceNumber}; |
2067 | 1.57k | if (sub_compact->compaction->SupportsPerKeyPlacement()) { |
2068 | | // Point entries are routed to proximal output only when their seqno is |
2069 | | // strictly greater than `proximal_after_seqno_`. Range tombstones use a |
2070 | | // [lower, upper) filter, so split them at the next seqno to preserve the |
2071 | | // same boundary. |
2072 | 0 | SequenceNumber range_del_split_seqno = proximal_after_seqno_; |
2073 | 0 | if (range_del_split_seqno < kMaxSequenceNumber) { |
2074 | 0 | range_del_split_seqno++; |
2075 | 0 | } |
2076 | 0 | if (outputs.IsProximalLevel()) { |
2077 | 0 | keep_seqno_range.first = range_del_split_seqno; |
2078 | 0 | } else { |
2079 | 0 | keep_seqno_range.second = range_del_split_seqno; |
2080 | 0 | } |
2081 | 0 | } |
2082 | 1.57k | CompactionIterationStats range_del_out_stats; |
2083 | | // NOTE1: Use `bottommost_level_ = true` for both bottommost and |
2084 | | // output_to_proximal_level compaction here, as it's only used to decide |
2085 | | // if range dels could be dropped. (Logically, we are taking a single sorted |
2086 | | // run returned from CompactionIterator and physically splitting it between |
2087 | | // two output levels.) |
2088 | | // NOTE2: with per-key placement, range tombstones will be filtered on |
2089 | | // each output level based on sequence number (traversed twice). This is |
2090 | | // CPU-inefficient for a large number of range tombstones, but that would |
2091 | | // be an unusual work load. |
2092 | 1.57k | if (sub_compact->HasRangeDel()) { |
2093 | 0 | s = outputs.AddRangeDels(*sub_compact->RangeDelAgg(), comp_start_user_key, |
2094 | 0 | comp_end_user_key, range_del_out_stats, |
2095 | 0 | bottommost_level_, cfd->internal_comparator(), |
2096 | 0 | earliest_snapshot_, keep_seqno_range, |
2097 | 0 | next_table_min_key, full_history_ts_low_); |
2098 | 0 | } |
2099 | 1.57k | RecordDroppedKeys(range_del_out_stats, &sub_compact->compaction_job_stats); |
2100 | 1.57k | TEST_SYNC_POINT("CompactionJob::FinishCompactionOutputFile1"); |
2101 | 1.57k | } |
2102 | | |
2103 | 2.28k | const uint64_t current_entries = outputs.NumEntries(); |
2104 | | |
2105 | 2.28k | s = outputs.Finish(s, seqno_to_time_mapping_); |
2106 | 2.28k | TEST_SYNC_POINT_CALLBACK( |
2107 | 2.28k | "CompactionJob::FinishCompactionOutputFile()::AfterFinish", &s); |
2108 | | |
2109 | 2.28k | if (s.ok()) { |
2110 | | // With accurate smallest and largest key, we can get a slightly more |
2111 | | // accurate oldest ancester time. |
2112 | | // This makes oldest ancester time in manifest more accurate than in |
2113 | | // table properties. Not sure how to resolve it. |
2114 | 1.57k | if (meta->smallest.size() > 0 && meta->largest.size() > 0) { |
2115 | 1.57k | uint64_t refined_oldest_ancester_time; |
2116 | 1.57k | Slice new_smallest = meta->smallest.user_key(); |
2117 | 1.57k | Slice new_largest = meta->largest.user_key(); |
2118 | 1.57k | if (!new_largest.empty() && !new_smallest.empty()) { |
2119 | 439 | refined_oldest_ancester_time = |
2120 | 439 | sub_compact->compaction->MinInputFileOldestAncesterTime( |
2121 | 439 | &(meta->smallest), &(meta->largest)); |
2122 | 439 | if (refined_oldest_ancester_time != |
2123 | 439 | std::numeric_limits<uint64_t>::max()) { |
2124 | 127 | meta->oldest_ancester_time = refined_oldest_ancester_time; |
2125 | 127 | } |
2126 | 439 | } |
2127 | 1.57k | } |
2128 | 1.57k | } |
2129 | | |
2130 | | // Finish and check for file errors |
2131 | 2.28k | IOStatus io_s = outputs.WriterSyncClose(s, db_options_.clock, stats_, |
2132 | 2.28k | db_options_.use_fsync); |
2133 | | |
2134 | 2.28k | if (s.ok() && io_s.ok()) { |
2135 | 1.57k | file_checksum = meta->file_checksum; |
2136 | 1.57k | file_checksum_func_name = meta->file_checksum_func_name; |
2137 | 1.57k | } |
2138 | | |
2139 | 2.28k | if (s.ok()) { |
2140 | 1.57k | s = io_s; |
2141 | 1.57k | } |
2142 | 2.28k | if (sub_compact->io_status.ok()) { |
2143 | 2.28k | sub_compact->io_status = io_s; |
2144 | | // Since this error is really a copy of the |
2145 | | // "normal" status, it does not also need to be checked |
2146 | 2.28k | sub_compact->io_status.PermitUncheckedError(); |
2147 | 2.28k | } |
2148 | | |
2149 | 2.28k | TableProperties tp; |
2150 | 2.28k | if (s.ok()) { |
2151 | 1.57k | tp = outputs.GetTableProperties(); |
2152 | 1.57k | } |
2153 | 2.28k | if (s.ok() && current_entries == 0 && tp.num_range_deletions == 0) { |
2154 | | // If there is nothing to output, no necessary to generate a sst file. |
2155 | | // This happens when the output level is bottom level, at the same time |
2156 | | // the sub_compact output nothing. |
2157 | 0 | std::string fname = GetTableFileName(meta->fd.GetNumber()); |
2158 | | |
2159 | | // TODO(AR) it is not clear if there are any larger implications if |
2160 | | // DeleteFile fails here |
2161 | 0 | Status ds = env_->DeleteFile(fname); |
2162 | 0 | if (!ds.ok()) { |
2163 | 0 | ROCKS_LOG_WARN( |
2164 | 0 | db_options_.info_log, |
2165 | 0 | "[%s] [JOB %d] Unable to remove SST file for table #%" PRIu64 |
2166 | 0 | " at bottom level%s", |
2167 | 0 | cfd->GetName().c_str(), job_id_, output_number, |
2168 | 0 | meta->marked_for_compaction ? " (need compaction)" : ""); |
2169 | 0 | } |
2170 | | |
2171 | | // Also need to remove the file from outputs, or it will be added to the |
2172 | | // VersionEdit. |
2173 | 0 | outputs.RemoveLastOutput(); |
2174 | 0 | meta = nullptr; |
2175 | 0 | } |
2176 | | |
2177 | 2.28k | if (s.ok() && (current_entries > 0 || tp.num_range_deletions > 0)) { |
2178 | | // Output to event logger and fire events. |
2179 | 1.57k | outputs.UpdateTableProperties(); |
2180 | 1.57k | ROCKS_LOG_INFO(db_options_.info_log, |
2181 | 1.57k | "[%s] [JOB %d] Generated table #%" PRIu64 ": %" PRIu64 |
2182 | 1.57k | " keys, %" PRIu64 " bytes%s, temperature: %s", |
2183 | 1.57k | cfd->GetName().c_str(), job_id_, output_number, |
2184 | 1.57k | current_entries, meta->fd.file_size, |
2185 | 1.57k | meta->marked_for_compaction ? " (need compaction)" : "", |
2186 | 1.57k | temperature_to_string[meta->temperature].c_str()); |
2187 | 1.57k | } |
2188 | 2.28k | std::string fname; |
2189 | 2.28k | FileDescriptor output_fd; |
2190 | 2.28k | uint64_t oldest_blob_file_number = kInvalidBlobFileNumber; |
2191 | 2.28k | Status status_for_listener = s; |
2192 | 2.28k | if (meta != nullptr) { |
2193 | 2.28k | fname = GetTableFileName(meta->fd.GetNumber()); |
2194 | 2.28k | output_fd = meta->fd; |
2195 | 2.28k | oldest_blob_file_number = meta->oldest_blob_file_number; |
2196 | 2.28k | } else { |
2197 | 0 | fname = "(nil)"; |
2198 | 0 | if (s.ok()) { |
2199 | 0 | status_for_listener = Status::Aborted("Empty SST file not kept"); |
2200 | 0 | } |
2201 | 0 | } |
2202 | 2.28k | EventHelpers::LogAndNotifyTableFileCreationFinished( |
2203 | 2.28k | event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(), fname, |
2204 | 2.28k | job_id_, output_fd, oldest_blob_file_number, tp, |
2205 | 2.28k | TableFileCreationReason::kCompaction, status_for_listener, file_checksum, |
2206 | 2.28k | file_checksum_func_name); |
2207 | | |
2208 | | // Report new file to SstFileManagerImpl |
2209 | 2.28k | auto sfm = |
2210 | 2.28k | static_cast<SstFileManagerImpl*>(db_options_.sst_file_manager.get()); |
2211 | 2.28k | if (sfm && meta != nullptr && meta->fd.GetPathId() == 0) { |
2212 | 2.28k | Status add_s = sfm->OnAddFile(fname); |
2213 | 2.28k | if (!add_s.ok() && s.ok()) { |
2214 | 0 | s = add_s; |
2215 | 0 | } |
2216 | 2.28k | if (sfm->IsMaxAllowedSpaceReached()) { |
2217 | | // TODO(ajkr): should we return OK() if max space was reached by the final |
2218 | | // compaction output file (similarly to how flush works when full)? |
2219 | 0 | s = Status::SpaceLimit("Max allowed space was reached"); |
2220 | 0 | TEST_SYNC_POINT( |
2221 | 0 | "CompactionJob::FinishCompactionOutputFile:MaxAllowedSpaceReached"); |
2222 | 0 | InstrumentedMutexLock l(db_mutex_); |
2223 | 0 | db_error_handler_->SetBGError(s, BackgroundErrorReason::kCompaction); |
2224 | 0 | } |
2225 | 2.28k | } |
2226 | | |
2227 | 2.28k | if (s.ok() && ShouldUpdateSubcompactionProgress(sub_compact, c_iter, |
2228 | 1.57k | prev_iter_output_internal_key, |
2229 | 1.57k | next_table_min_key, meta)) { |
2230 | 0 | UpdateSubcompactionProgress(c_iter, next_table_min_key, sub_compact); |
2231 | 0 | s = PersistSubcompactionProgress(sub_compact); |
2232 | 0 | } |
2233 | 2.28k | outputs.ResetBuilder(); |
2234 | 2.28k | return s; |
2235 | 2.28k | } |
2236 | | |
2237 | | bool CompactionJob::ShouldUpdateSubcompactionProgress( |
2238 | | const SubcompactionState* sub_compact, const CompactionIterator* c_iter, |
2239 | | const ParsedInternalKey& prev_iter_output_internal_key, |
2240 | 1.57k | const Slice& next_table_min_internal_key, const FileMetaData* meta) const { |
2241 | 1.57k | const auto* cfd = sub_compact->compaction->column_family_data(); |
2242 | | // No need to update when the progress will not get persisted |
2243 | 1.57k | if (compaction_progress_writer_ == nullptr) { |
2244 | 1.57k | return false; |
2245 | 1.57k | } |
2246 | | |
2247 | | // No need to update for a new empty output |
2248 | 0 | if (meta == nullptr) { |
2249 | 0 | return false; |
2250 | 0 | } |
2251 | | |
2252 | | // TODO(hx235): save progress even on the last output file |
2253 | 0 | if (next_table_min_internal_key.empty()) { |
2254 | 0 | return false; |
2255 | 0 | } |
2256 | | |
2257 | | // LIMITATION: Persisting compaction progress with timestamp |
2258 | | // is not supported since the feature of persisting timestamp of the key in |
2259 | | // SST files itself is still experimental |
2260 | 0 | size_t ts_sz = cfd->user_comparator()->timestamp_size(); |
2261 | 0 | if (ts_sz > 0) { |
2262 | 0 | return false; |
2263 | 0 | } |
2264 | | |
2265 | | // LIMITATION: Compaction progress persistence disabled for file boundaries |
2266 | | // containing range deletions. Range deletions can span file boundaries, |
2267 | | // making it difficult to ensure adjacent output tables have different user |
2268 | | // keys. See the last check for why different users keys of adjacent output |
2269 | | // tables are needed |
2270 | 0 | const ValueType next_table_min_internal_key_type = |
2271 | 0 | ExtractValueType(next_table_min_internal_key); |
2272 | 0 | const ValueType prev_iter_output_internal_key_type = |
2273 | 0 | prev_iter_output_internal_key.user_key.empty() |
2274 | 0 | ? ValueType::kTypeValue |
2275 | 0 | : prev_iter_output_internal_key.type; |
2276 | | |
2277 | | // Range deletes truncated to align with file boundaries may be output by the |
2278 | | // compaction iterator with `ValueType::kTypeMaxValid` instead of the original |
2279 | | // type. |
2280 | 0 | if ((next_table_min_internal_key_type == ValueType::kTypeRangeDeletion || |
2281 | 0 | next_table_min_internal_key_type == ValueType::kTypeMaxValid) || |
2282 | 0 | (prev_iter_output_internal_key_type == ValueType::kTypeRangeDeletion || |
2283 | 0 | prev_iter_output_internal_key_type == ValueType::kTypeMaxValid)) { |
2284 | 0 | return false; |
2285 | 0 | } |
2286 | | |
2287 | | // LIMITATION: Compaction progress persistence disabled when adjacent output |
2288 | | // tables share the same user key at boundaries. This ensures a simple Seek() |
2289 | | // of the next key when resuming can process all versions of a user key |
2290 | 0 | const Slice next_table_min_user_key = |
2291 | 0 | ExtractUserKey(next_table_min_internal_key); |
2292 | 0 | const Slice prev_table_last_user_key = |
2293 | 0 | prev_iter_output_internal_key.user_key.empty() |
2294 | 0 | ? Slice() |
2295 | 0 | : prev_iter_output_internal_key.user_key; |
2296 | |
|
2297 | 0 | if (cfd->user_comparator()->EqualWithoutTimestamp(next_table_min_user_key, |
2298 | 0 | prev_table_last_user_key)) { |
2299 | 0 | return false; |
2300 | 0 | } |
2301 | | |
2302 | | // LIMITATION: Don't save progress if the current key has already been scanned |
2303 | | // (looked ahead) in the input but not yet output. This can happen with merge |
2304 | | // operations, single deletes, and deletes at the bottommost level where |
2305 | | // CompactionIterator needs to look ahead to process multiple entries for the |
2306 | | // same user key before outputting a result. If we saved progress and resumed |
2307 | | // at this boundary, the resumed session would see and process the same input |
2308 | | // key again through Seek(), leading to incorrect double-counting in |
2309 | | // number of processed input entries and input count verification failure |
2310 | | // |
2311 | | // TODO(hx235): Offset num_processed_input_records to avoid double counting |
2312 | | // instead of disabling progress persistence. |
2313 | 0 | if (c_iter->IsCurrentKeyAlreadyScanned()) { |
2314 | 0 | return false; |
2315 | 0 | } |
2316 | | |
2317 | 0 | return true; |
2318 | 0 | } |
2319 | | |
2320 | 2.72k | Status CompactionJob::InstallCompactionResults(bool* compaction_released) { |
2321 | 2.72k | assert(compact_); |
2322 | | |
2323 | 2.72k | db_mutex_->AssertHeld(); |
2324 | | |
2325 | 2.72k | const ReadOptions read_options(Env::IOActivity::kCompaction); |
2326 | 2.72k | const WriteOptions write_options(Env::IOActivity::kCompaction); |
2327 | | |
2328 | 2.72k | auto* compaction = compact_->compaction; |
2329 | 2.72k | assert(compaction); |
2330 | | |
2331 | 2.72k | { |
2332 | 2.72k | Compaction::InputLevelSummaryBuffer inputs_summary; |
2333 | 2.72k | if (internal_stats_.has_proximal_level_output) { |
2334 | 0 | ROCKS_LOG_BUFFER( |
2335 | 0 | log_buffer_, |
2336 | 0 | "[%s] [JOB %d] Compacted %s => output_to_proximal_level: %" PRIu64 |
2337 | 0 | " bytes + last: %" PRIu64 " bytes. Total: %" PRIu64 " bytes", |
2338 | 0 | compaction->column_family_data()->GetName().c_str(), job_id_, |
2339 | 0 | compaction->InputLevelSummary(&inputs_summary), |
2340 | 0 | internal_stats_.proximal_level_stats.bytes_written, |
2341 | 0 | internal_stats_.output_level_stats.bytes_written, |
2342 | 0 | internal_stats_.TotalBytesWritten()); |
2343 | 2.72k | } else { |
2344 | 2.72k | ROCKS_LOG_BUFFER(log_buffer_, |
2345 | 2.72k | "[%s] [JOB %d] Compacted %s => %" PRIu64 " bytes", |
2346 | 2.72k | compaction->column_family_data()->GetName().c_str(), |
2347 | 2.72k | job_id_, compaction->InputLevelSummary(&inputs_summary), |
2348 | 2.72k | internal_stats_.TotalBytesWritten()); |
2349 | 2.72k | } |
2350 | 2.72k | } |
2351 | | |
2352 | 2.72k | VersionEdit* const edit = compaction->edit(); |
2353 | 2.72k | assert(edit); |
2354 | | |
2355 | | // Add compaction inputs |
2356 | 2.72k | compaction->AddInputDeletions(edit); |
2357 | | |
2358 | 2.72k | std::unordered_map<uint64_t, BlobGarbageMeter::BlobStats> blob_total_garbage; |
2359 | | |
2360 | 2.72k | for (const auto& sub_compact : compact_->sub_compact_states) { |
2361 | 2.72k | sub_compact.AddOutputsEdit(edit); |
2362 | | |
2363 | 2.72k | for (const auto& blob : sub_compact.Current().GetBlobFileAdditions()) { |
2364 | 0 | edit->AddBlobFile(blob); |
2365 | 0 | } |
2366 | | |
2367 | 2.72k | if (sub_compact.Current().GetBlobGarbageMeter()) { |
2368 | 0 | const auto& flows = sub_compact.Current().GetBlobGarbageMeter()->flows(); |
2369 | |
|
2370 | 0 | for (const auto& pair : flows) { |
2371 | 0 | const uint64_t blob_file_number = pair.first; |
2372 | 0 | const BlobGarbageMeter::BlobInOutFlow& flow = pair.second; |
2373 | |
|
2374 | 0 | assert(flow.IsValid()); |
2375 | 0 | if (flow.HasGarbage()) { |
2376 | 0 | blob_total_garbage[blob_file_number].Add(flow.GetGarbageCount(), |
2377 | 0 | flow.GetGarbageBytes()); |
2378 | 0 | } |
2379 | 0 | } |
2380 | 0 | } |
2381 | 2.72k | } |
2382 | | |
2383 | 2.72k | for (const auto& pair : blob_total_garbage) { |
2384 | 0 | const uint64_t blob_file_number = pair.first; |
2385 | 0 | const BlobGarbageMeter::BlobStats& stats = pair.second; |
2386 | |
|
2387 | 0 | edit->AddBlobFileGarbage(blob_file_number, stats.GetCount(), |
2388 | 0 | stats.GetBytes()); |
2389 | 0 | } |
2390 | | |
2391 | 2.72k | if ((compaction->compaction_reason() == |
2392 | 2.72k | CompactionReason::kLevelMaxLevelSize || |
2393 | 2.72k | compaction->compaction_reason() == CompactionReason::kRoundRobinTtl) && |
2394 | 0 | compaction->immutable_options().compaction_pri == kRoundRobin) { |
2395 | 0 | int start_level = compaction->start_level(); |
2396 | 0 | if (start_level > 0) { |
2397 | 0 | auto vstorage = compaction->input_version()->storage_info(); |
2398 | 0 | edit->AddCompactCursor(start_level, |
2399 | 0 | vstorage->GetNextCompactCursor( |
2400 | 0 | start_level, compaction->num_input_files(0))); |
2401 | 0 | } |
2402 | 0 | } |
2403 | | |
2404 | 2.72k | auto manifest_wcb = [&compaction, &compaction_released](const Status& s) { |
2405 | 2.72k | compaction->ReleaseCompactionFiles(s); |
2406 | 2.72k | *compaction_released = true; |
2407 | 2.72k | }; |
2408 | | |
2409 | 2.72k | Status s; |
2410 | 2.72k | TEST_SYNC_POINT_CALLBACK( |
2411 | 2.72k | "CompactionJob::InstallCompactionResults:BeforeLogAndApply", &s); |
2412 | 2.72k | if (s.ok()) { |
2413 | 2.72k | s = versions_->LogAndApply(compaction->column_family_data(), read_options, |
2414 | 2.72k | write_options, edit, db_mutex_, db_directory_, |
2415 | 2.72k | /*new_descriptor_log=*/false, |
2416 | 2.72k | /*column_family_options=*/nullptr, manifest_wcb); |
2417 | 2.72k | } |
2418 | 2.72k | return s; |
2419 | 2.72k | } |
2420 | | |
2421 | 9.70k | void CompactionJob::RecordCompactionIOStats() { |
2422 | 9.70k | RecordTick(stats_, COMPACT_READ_BYTES, IOSTATS(bytes_read)); |
2423 | 9.70k | RecordTick(stats_, COMPACT_WRITE_BYTES, IOSTATS(bytes_written)); |
2424 | 9.70k | CompactionReason compaction_reason = |
2425 | 9.70k | compact_->compaction->compaction_reason(); |
2426 | 9.70k | if (compaction_reason == CompactionReason::kFilesMarkedForCompaction) { |
2427 | 0 | RecordTick(stats_, COMPACT_READ_BYTES_MARKED, IOSTATS(bytes_read)); |
2428 | 0 | RecordTick(stats_, COMPACT_WRITE_BYTES_MARKED, IOSTATS(bytes_written)); |
2429 | 9.70k | } else if (compaction_reason == CompactionReason::kPeriodicCompaction) { |
2430 | 0 | RecordTick(stats_, COMPACT_READ_BYTES_PERIODIC, IOSTATS(bytes_read)); |
2431 | 0 | RecordTick(stats_, COMPACT_WRITE_BYTES_PERIODIC, IOSTATS(bytes_written)); |
2432 | 9.70k | } else if (compaction_reason == CompactionReason::kTtl) { |
2433 | 0 | RecordTick(stats_, COMPACT_READ_BYTES_TTL, IOSTATS(bytes_read)); |
2434 | 0 | RecordTick(stats_, COMPACT_WRITE_BYTES_TTL, IOSTATS(bytes_written)); |
2435 | 0 | } |
2436 | 9.70k | ThreadStatusUtil::IncreaseThreadOperationProperty( |
2437 | 9.70k | ThreadStatus::COMPACTION_BYTES_READ, IOSTATS(bytes_read)); |
2438 | 9.70k | IOSTATS_RESET(bytes_read); |
2439 | 9.70k | ThreadStatusUtil::IncreaseThreadOperationProperty( |
2440 | 9.70k | ThreadStatus::COMPACTION_BYTES_WRITTEN, IOSTATS(bytes_written)); |
2441 | 9.70k | IOSTATS_RESET(bytes_written); |
2442 | 9.70k | } |
2443 | | |
2444 | | Status CompactionJob::OpenCompactionOutputFile(SubcompactionState* sub_compact, |
2445 | 2.28k | CompactionOutputs& outputs) { |
2446 | 2.28k | assert(sub_compact != nullptr); |
2447 | | |
2448 | | // no need to lock because VersionSet::next_file_number_ is atomic |
2449 | 2.28k | uint64_t file_number = versions_->NewFileNumber(); |
2450 | | #ifndef NDEBUG |
2451 | | TEST_SYNC_POINT_CALLBACK( |
2452 | | "CompactionJob::OpenCompactionOutputFile::NewFileNumber", &file_number); |
2453 | | #endif |
2454 | 2.28k | std::string fname = GetTableFileName(file_number); |
2455 | | // Fire events. |
2456 | 2.28k | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
2457 | 2.28k | EventHelpers::NotifyTableFileCreationStarted( |
2458 | 2.28k | cfd->ioptions().listeners, dbname_, cfd->GetName(), fname, job_id_, |
2459 | 2.28k | TableFileCreationReason::kCompaction); |
2460 | | // Make the output file |
2461 | 2.28k | std::unique_ptr<FSWritableFile> writable_file; |
2462 | | #ifndef NDEBUG |
2463 | | bool syncpoint_arg = file_options_.use_direct_writes; |
2464 | | TEST_SYNC_POINT_CALLBACK("CompactionJob::OpenCompactionOutputFile", |
2465 | | &syncpoint_arg); |
2466 | | #endif |
2467 | | |
2468 | | // Pass temperature of the last level files to FileSystem. |
2469 | 2.28k | FileOptions fo_copy = file_options_; |
2470 | 2.28k | auto temperature = |
2471 | 2.28k | sub_compact->compaction->GetOutputTemperature(outputs.IsProximalLevel()); |
2472 | 2.28k | fo_copy.temperature = temperature; |
2473 | 2.28k | fo_copy.write_hint = write_hint_; |
2474 | | |
2475 | 2.28k | Status s; |
2476 | 2.28k | IOStatus io_s = NewWritableFile(fs_.get(), fname, &writable_file, fo_copy); |
2477 | 2.28k | s = io_s; |
2478 | 2.28k | if (io_s.ok()) { |
2479 | | // Track the SST file path for cleanup on abort. |
2480 | 2.28k | outputs.AddOutputFilePath(fname); |
2481 | 2.28k | } |
2482 | 2.28k | if (sub_compact->io_status.ok()) { |
2483 | 2.28k | sub_compact->io_status = io_s; |
2484 | | // Since this error is really a copy of the io_s that is checked below as s, |
2485 | | // it does not also need to be checked. |
2486 | 2.28k | sub_compact->io_status.PermitUncheckedError(); |
2487 | 2.28k | } |
2488 | 2.28k | if (!s.ok()) { |
2489 | 0 | ROCKS_LOG_ERROR( |
2490 | 0 | db_options_.info_log, |
2491 | 0 | "[%s] [JOB %d] OpenCompactionOutputFiles for table #%" PRIu64 |
2492 | 0 | " fails at NewWritableFile with status %s", |
2493 | 0 | sub_compact->compaction->column_family_data()->GetName().c_str(), |
2494 | 0 | job_id_, file_number, s.ToString().c_str()); |
2495 | 0 | LogFlush(db_options_.info_log); |
2496 | 0 | EventHelpers::LogAndNotifyTableFileCreationFinished( |
2497 | 0 | event_logger_, cfd->ioptions().listeners, dbname_, cfd->GetName(), |
2498 | 0 | fname, job_id_, FileDescriptor(), kInvalidBlobFileNumber, |
2499 | 0 | TableProperties(), TableFileCreationReason::kCompaction, s, |
2500 | 0 | kUnknownFileChecksum, kUnknownFileChecksumFuncName); |
2501 | 0 | return s; |
2502 | 0 | } |
2503 | | |
2504 | | // Try to figure out the output file's oldest ancester time. |
2505 | 2.28k | int64_t temp_current_time = 0; |
2506 | 2.28k | auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time); |
2507 | | // Safe to proceed even if GetCurrentTime fails. So, log and proceed. |
2508 | 2.28k | if (!get_time_status.ok()) { |
2509 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
2510 | 0 | "Failed to get current time. Status: %s", |
2511 | 0 | get_time_status.ToString().c_str()); |
2512 | 0 | } |
2513 | 2.28k | uint64_t current_time = static_cast<uint64_t>(temp_current_time); |
2514 | 2.28k | InternalKey tmp_start, tmp_end; |
2515 | 2.28k | if (sub_compact->start.has_value()) { |
2516 | 0 | tmp_start.SetMinPossibleForUserKey(*(sub_compact->start)); |
2517 | 0 | } |
2518 | 2.28k | if (sub_compact->end.has_value()) { |
2519 | 0 | tmp_end.SetMinPossibleForUserKey(*(sub_compact->end)); |
2520 | 0 | } |
2521 | 2.28k | uint64_t oldest_ancester_time = |
2522 | 2.28k | sub_compact->compaction->MinInputFileOldestAncesterTime( |
2523 | 2.28k | sub_compact->start.has_value() ? &tmp_start : nullptr, |
2524 | 2.28k | sub_compact->end.has_value() ? &tmp_end : nullptr); |
2525 | 2.28k | if (oldest_ancester_time == std::numeric_limits<uint64_t>::max()) { |
2526 | | // TODO: fix DBSSTTest.GetTotalSstFilesSize and use |
2527 | | // kUnknownOldestAncesterTime |
2528 | 0 | oldest_ancester_time = current_time; |
2529 | 0 | } |
2530 | | |
2531 | 2.28k | uint64_t newest_key_time = sub_compact->compaction->MaxInputFileNewestKeyTime( |
2532 | 2.28k | sub_compact->start.has_value() ? &tmp_start : nullptr, |
2533 | 2.28k | sub_compact->end.has_value() ? &tmp_end : nullptr); |
2534 | | |
2535 | | // Initialize a SubcompactionState::Output and add it to sub_compact->outputs |
2536 | 2.28k | uint64_t epoch_number = sub_compact->compaction->MinInputFileEpochNumber(); |
2537 | 2.28k | { |
2538 | 2.28k | FileMetaData meta; |
2539 | 2.28k | meta.fd = FileDescriptor(file_number, |
2540 | 2.28k | sub_compact->compaction->output_path_id(), 0); |
2541 | 2.28k | meta.oldest_ancester_time = oldest_ancester_time; |
2542 | 2.28k | meta.file_creation_time = current_time; |
2543 | 2.28k | meta.epoch_number = epoch_number; |
2544 | 2.28k | meta.temperature = temperature; |
2545 | 2.28k | assert(!db_id_.empty()); |
2546 | 2.28k | assert(!db_session_id_.empty()); |
2547 | 2.28k | s = GetSstInternalUniqueId(db_id_, db_session_id_, meta.fd.GetNumber(), |
2548 | 2.28k | &meta.unique_id); |
2549 | 2.28k | if (!s.ok()) { |
2550 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, |
2551 | 0 | "[%s] [JOB %d] file #%" PRIu64 |
2552 | 0 | " failed to generate unique id: %s.", |
2553 | 0 | cfd->GetName().c_str(), job_id_, meta.fd.GetNumber(), |
2554 | 0 | s.ToString().c_str()); |
2555 | 0 | return s; |
2556 | 0 | } |
2557 | | |
2558 | | // Enable hash computation if paranoid_file_checks is on or if |
2559 | | // verify_output_flags includes kVerifyIteration, so that |
2560 | | // VerifyOutputFiles() can compare the hash of the written data |
2561 | | // against a re-read of the output file. |
2562 | 2.28k | bool enable_output_hash = |
2563 | 2.28k | paranoid_file_checks_ || |
2564 | 2.28k | !!(sub_compact->compaction->mutable_cf_options().verify_output_flags & |
2565 | 2.28k | VerifyOutputFlags::kVerifyIteration); |
2566 | 2.28k | outputs.AddOutput(std::move(meta), cfd->internal_comparator(), |
2567 | 2.28k | enable_output_hash); |
2568 | 2.28k | } |
2569 | | |
2570 | 0 | writable_file->SetIOPriority(GetRateLimiterPriority()); |
2571 | | // Subsequent attempts to override the hint via SetWriteLifeTimeHint |
2572 | | // with the very same value will be ignored by the fs. |
2573 | 2.28k | writable_file->SetWriteLifeTimeHint(fo_copy.write_hint); |
2574 | 2.28k | FileTypeSet tmp_set = db_options_.checksum_handoff_file_types; |
2575 | 2.28k | writable_file->SetPreallocationBlockSize(static_cast<size_t>( |
2576 | 2.28k | sub_compact->compaction->OutputFilePreallocationSize())); |
2577 | 2.28k | const auto& listeners = |
2578 | 2.28k | sub_compact->compaction->immutable_options().listeners; |
2579 | 2.28k | outputs.AssignFileWriter(new WritableFileWriter( |
2580 | 2.28k | std::move(writable_file), fname, fo_copy, db_options_.clock, io_tracer_, |
2581 | 2.28k | db_options_.stats, Histograms::SST_WRITE_MICROS, listeners, |
2582 | 2.28k | db_options_.file_checksum_gen_factory.get(), |
2583 | 2.28k | tmp_set.Contains(FileType::kTableFile), false)); |
2584 | | |
2585 | | // TODO(hx235): pass in the correct `oldest_key_time` instead of `0` |
2586 | 2.28k | const ReadOptions read_options(Env::IOActivity::kCompaction); |
2587 | 2.28k | const WriteOptions write_options(Env::IOActivity::kCompaction); |
2588 | 2.28k | TableBuilderOptions tboptions( |
2589 | 2.28k | cfd->ioptions(), sub_compact->compaction->mutable_cf_options(), |
2590 | 2.28k | read_options, write_options, cfd->internal_comparator(), |
2591 | 2.28k | cfd->internal_tbl_prop_coll_factories(), |
2592 | 2.28k | sub_compact->compaction->output_compression(), |
2593 | 2.28k | sub_compact->compaction->output_compression_opts(), cfd->GetID(), |
2594 | 2.28k | cfd->GetName(), sub_compact->compaction->output_level(), newest_key_time, |
2595 | 2.28k | bottommost_level_, TableFileCreationReason::kCompaction, |
2596 | 2.28k | 0 /* oldest_key_time */, current_time, db_id_, db_session_id_, |
2597 | 2.28k | sub_compact->compaction->max_output_file_size(), file_number, |
2598 | 2.28k | proximal_after_seqno_ /*last_level_inclusive_max_seqno_threshold*/); |
2599 | | |
2600 | 2.28k | outputs.NewBuilder(tboptions); |
2601 | | |
2602 | 2.28k | LogFlush(db_options_.info_log); |
2603 | 2.28k | return s; |
2604 | 2.28k | } |
2605 | | |
2606 | 4.85k | void CompactionJob::CleanupCompaction() { |
2607 | 4.85k | for (SubcompactionState& sub_compact : compact_->sub_compact_states) { |
2608 | 4.85k | sub_compact.Cleanup(table_cache_.get(), compact_->status); |
2609 | 4.85k | } |
2610 | 4.85k | delete compact_; |
2611 | 4.85k | compact_ = nullptr; |
2612 | 4.85k | } |
2613 | | |
2614 | | namespace { |
2615 | 3.15k | void CopyPrefix(const Slice& src, size_t prefix_length, std::string* dst) { |
2616 | 3.15k | assert(prefix_length > 0); |
2617 | 3.15k | size_t length = src.size() > prefix_length ? prefix_length : src.size(); |
2618 | 3.15k | dst->assign(src.data(), length); |
2619 | 3.15k | } |
2620 | | } // namespace |
2621 | | |
2622 | | bool CompactionJob::UpdateInternalStatsFromInputFiles( |
2623 | 4.85k | uint64_t* num_input_range_del) { |
2624 | 4.85k | assert(compact_); |
2625 | | |
2626 | 4.85k | Compaction* compaction = compact_->compaction; |
2627 | 4.85k | internal_stats_.output_level_stats.num_input_files_in_non_output_levels = 0; |
2628 | 4.85k | internal_stats_.output_level_stats.num_input_files_in_output_level = 0; |
2629 | | |
2630 | 4.85k | bool has_error = false; |
2631 | 4.85k | const ReadOptions read_options(Env::IOActivity::kCompaction); |
2632 | 4.85k | const auto& input_table_properties = compaction->GetInputTableProperties(); |
2633 | | |
2634 | | // Check all input files for old block-based SST format_version. Why? Old |
2635 | | // block-based SST files from roughly version 5.0 to 5.18 could produce |
2636 | | // inaccurate num_entries counts due to the evolution of its handling along |
2637 | | // with num_range_deletions. We have to disable some paranoid checks when |
2638 | | // compacting files from such an old release. However, we don't have great |
2639 | | // information to identify those files, so we heuristically over-approximate |
2640 | | // that set of files using |
2641 | | // (a) format_version < 5, which will be true for any files from RocksDB < |
2642 | | // 6.6.0 and should not be true for any recent production files |
2643 | | // (b) to avoid including non-block-based SST files (which still use older |
2644 | | // format_version markers, and do not support DeleteRange), we also require |
2645 | | // the presence of the user property "rocksdb.block.based.table.index.type", |
2646 | | // which was added in RocksDB 2.8 and is always present in block-based tables. |
2647 | 22.4k | for (const auto& tp_pair : input_table_properties) { |
2648 | 22.4k | if (tp_pair.second && tp_pair.second->format_version < 5) { |
2649 | | // Check for block-based table by looking for its index type property |
2650 | 0 | const auto& user_props = tp_pair.second->user_collected_properties; |
2651 | 0 | if (user_props.find(BlockBasedTablePropertyNames::kIndexType) != |
2652 | 0 | user_props.end()) { |
2653 | 0 | job_stats_->has_accurate_num_input_records = false; |
2654 | 0 | break; |
2655 | 0 | } |
2656 | 0 | } |
2657 | 22.4k | } |
2658 | | |
2659 | 4.85k | for (int input_level = 0; |
2660 | 12.4k | input_level < static_cast<int>(compaction->num_input_levels()); |
2661 | 7.57k | ++input_level) { |
2662 | 7.57k | const LevelFilesBrief* flevel = compaction->input_levels(input_level); |
2663 | 7.57k | size_t num_input_files = flevel->num_files; |
2664 | 7.57k | uint64_t* bytes_read; |
2665 | 7.57k | if (compaction->level(input_level) != compaction->output_level()) { |
2666 | 3.25k | internal_stats_.output_level_stats.num_input_files_in_non_output_levels += |
2667 | 3.25k | static_cast<int>(num_input_files); |
2668 | 3.25k | bytes_read = |
2669 | 3.25k | &internal_stats_.output_level_stats.bytes_read_non_output_levels; |
2670 | 4.31k | } else { |
2671 | 4.31k | internal_stats_.output_level_stats.num_input_files_in_output_level += |
2672 | 4.31k | static_cast<int>(num_input_files); |
2673 | 4.31k | bytes_read = &internal_stats_.output_level_stats.bytes_read_output_level; |
2674 | 4.31k | } |
2675 | 30.0k | for (size_t i = 0; i < num_input_files; ++i) { |
2676 | 22.4k | const FileMetaData* file_meta = flevel->files[i].file_metadata; |
2677 | 22.4k | *bytes_read += file_meta->fd.GetFileSize(); |
2678 | 22.4k | uint64_t file_input_entries = file_meta->num_entries; |
2679 | 22.4k | uint64_t file_num_range_del = file_meta->num_range_deletions; |
2680 | 22.4k | if (file_input_entries == 0) { |
2681 | 0 | uint64_t file_number = file_meta->fd.GetNumber(); |
2682 | | // Try getting info from table property |
2683 | 0 | std::string fn = TableFileName(compaction->immutable_options().cf_paths, |
2684 | 0 | file_number, file_meta->fd.GetPathId()); |
2685 | 0 | const auto& tp = input_table_properties.find(fn); |
2686 | 0 | if (tp != input_table_properties.end()) { |
2687 | 0 | file_input_entries = tp->second->num_entries; |
2688 | 0 | file_num_range_del = tp->second->num_range_deletions; |
2689 | 0 | } else { |
2690 | 0 | has_error = true; |
2691 | 0 | } |
2692 | 0 | } |
2693 | 22.4k | internal_stats_.output_level_stats.num_input_records += |
2694 | 22.4k | file_input_entries; |
2695 | 22.4k | if (num_input_range_del) { |
2696 | 22.4k | *num_input_range_del += file_num_range_del; |
2697 | 22.4k | } |
2698 | 22.4k | } |
2699 | | |
2700 | 7.57k | const std::vector<FileMetaData*>& filtered_flevel = |
2701 | 7.57k | compaction->filtered_input_levels(input_level); |
2702 | 7.57k | size_t num_filtered_input_files = filtered_flevel.size(); |
2703 | 7.57k | uint64_t* bytes_skipped; |
2704 | 7.57k | if (compaction->level(input_level) != compaction->output_level()) { |
2705 | 3.25k | internal_stats_.output_level_stats |
2706 | 3.25k | .num_filtered_input_files_in_non_output_levels += |
2707 | 3.25k | static_cast<int>(num_filtered_input_files); |
2708 | 3.25k | bytes_skipped = |
2709 | 3.25k | &internal_stats_.output_level_stats.bytes_skipped_non_output_levels; |
2710 | 4.31k | } else { |
2711 | 4.31k | internal_stats_.output_level_stats |
2712 | 4.31k | .num_filtered_input_files_in_output_level += |
2713 | 4.31k | static_cast<int>(num_filtered_input_files); |
2714 | 4.31k | bytes_skipped = |
2715 | 4.31k | &internal_stats_.output_level_stats.bytes_skipped_output_level; |
2716 | 4.31k | } |
2717 | 7.57k | for (const FileMetaData* filtered_file_meta : filtered_flevel) { |
2718 | 0 | *bytes_skipped += filtered_file_meta->fd.GetFileSize(); |
2719 | 0 | } |
2720 | 7.57k | } |
2721 | | |
2722 | | // TODO - find a better place to set these two |
2723 | 4.85k | assert(job_stats_); |
2724 | 4.85k | internal_stats_.output_level_stats.bytes_read_blob = |
2725 | 4.85k | job_stats_->total_blob_bytes_read; |
2726 | 4.85k | internal_stats_.output_level_stats.num_dropped_records = |
2727 | 4.85k | internal_stats_.DroppedRecords(); |
2728 | 4.85k | return !has_error; |
2729 | 4.85k | } |
2730 | | |
2731 | | void CompactionJob::UpdateCompactionJobInputStatsFromInternalStats( |
2732 | | const InternalStats::CompactionStatsFull& internal_stats, |
2733 | 4.85k | uint64_t num_input_range_del) const { |
2734 | 4.85k | assert(job_stats_); |
2735 | | // input information |
2736 | 4.85k | job_stats_->total_input_bytes = |
2737 | 4.85k | internal_stats.output_level_stats.bytes_read_non_output_levels + |
2738 | 4.85k | internal_stats.output_level_stats.bytes_read_output_level; |
2739 | 4.85k | job_stats_->num_input_records = |
2740 | 4.85k | internal_stats.output_level_stats.num_input_records - num_input_range_del; |
2741 | 4.85k | job_stats_->num_input_files = |
2742 | 4.85k | internal_stats.output_level_stats.num_input_files_in_non_output_levels + |
2743 | 4.85k | internal_stats.output_level_stats.num_input_files_in_output_level; |
2744 | 4.85k | job_stats_->num_input_files_at_output_level = |
2745 | 4.85k | internal_stats.output_level_stats.num_input_files_in_output_level; |
2746 | 4.85k | job_stats_->num_filtered_input_files = |
2747 | 4.85k | internal_stats.output_level_stats |
2748 | 4.85k | .num_filtered_input_files_in_non_output_levels + |
2749 | 4.85k | internal_stats.output_level_stats |
2750 | 4.85k | .num_filtered_input_files_in_output_level; |
2751 | 4.85k | job_stats_->num_filtered_input_files_at_output_level = |
2752 | 4.85k | internal_stats.output_level_stats |
2753 | 4.85k | .num_filtered_input_files_in_output_level; |
2754 | 4.85k | job_stats_->total_skipped_input_bytes = |
2755 | 4.85k | internal_stats.output_level_stats.bytes_skipped_non_output_levels + |
2756 | 4.85k | internal_stats.output_level_stats.bytes_skipped_output_level; |
2757 | | |
2758 | 4.85k | if (internal_stats.has_proximal_level_output) { |
2759 | 0 | job_stats_->total_input_bytes += |
2760 | 0 | internal_stats.proximal_level_stats.bytes_read_non_output_levels + |
2761 | 0 | internal_stats.proximal_level_stats.bytes_read_output_level; |
2762 | 0 | job_stats_->num_input_records += |
2763 | 0 | internal_stats.proximal_level_stats.num_input_records; |
2764 | 0 | job_stats_->num_input_files += |
2765 | 0 | internal_stats.proximal_level_stats |
2766 | 0 | .num_input_files_in_non_output_levels + |
2767 | 0 | internal_stats.proximal_level_stats.num_input_files_in_output_level; |
2768 | 0 | job_stats_->num_input_files_at_output_level += |
2769 | 0 | internal_stats.proximal_level_stats.num_input_files_in_output_level; |
2770 | 0 | job_stats_->num_filtered_input_files += |
2771 | 0 | internal_stats.proximal_level_stats |
2772 | 0 | .num_filtered_input_files_in_non_output_levels + |
2773 | 0 | internal_stats.proximal_level_stats |
2774 | 0 | .num_filtered_input_files_in_output_level; |
2775 | 0 | job_stats_->num_filtered_input_files_at_output_level += |
2776 | 0 | internal_stats.proximal_level_stats |
2777 | 0 | .num_filtered_input_files_in_output_level; |
2778 | 0 | job_stats_->total_skipped_input_bytes += |
2779 | 0 | internal_stats.proximal_level_stats.bytes_skipped_non_output_levels + |
2780 | 0 | internal_stats.proximal_level_stats.bytes_skipped_output_level; |
2781 | 0 | } |
2782 | 4.85k | } |
2783 | | |
2784 | | void CompactionJob::UpdateCompactionJobOutputStatsFromInternalStats( |
2785 | | const Status& status, |
2786 | 4.85k | const InternalStats::CompactionStatsFull& internal_stats) const { |
2787 | 4.85k | assert(job_stats_); |
2788 | 4.85k | job_stats_->elapsed_micros = internal_stats.output_level_stats.micros; |
2789 | 4.85k | job_stats_->cpu_micros = internal_stats.output_level_stats.cpu_micros; |
2790 | | |
2791 | | // output information |
2792 | 4.85k | job_stats_->total_output_bytes = |
2793 | 4.85k | internal_stats.output_level_stats.bytes_written; |
2794 | 4.85k | job_stats_->total_output_bytes_blob = |
2795 | 4.85k | internal_stats.output_level_stats.bytes_written_blob; |
2796 | 4.85k | job_stats_->num_output_records = |
2797 | 4.85k | internal_stats.output_level_stats.num_output_records; |
2798 | 4.85k | job_stats_->num_output_files = |
2799 | 4.85k | internal_stats.output_level_stats.num_output_files; |
2800 | 4.85k | job_stats_->num_output_files_blob = |
2801 | 4.85k | internal_stats.output_level_stats.num_output_files_blob; |
2802 | | |
2803 | 4.85k | if (internal_stats.has_proximal_level_output) { |
2804 | 0 | job_stats_->total_output_bytes += |
2805 | 0 | internal_stats.proximal_level_stats.bytes_written; |
2806 | 0 | job_stats_->total_output_bytes_blob += |
2807 | 0 | internal_stats.proximal_level_stats.bytes_written_blob; |
2808 | 0 | job_stats_->num_output_records += |
2809 | 0 | internal_stats.proximal_level_stats.num_output_records; |
2810 | 0 | job_stats_->num_output_files += |
2811 | 0 | internal_stats.proximal_level_stats.num_output_files; |
2812 | 0 | job_stats_->num_output_files_blob += |
2813 | 0 | internal_stats.proximal_level_stats.num_output_files_blob; |
2814 | 0 | } |
2815 | | |
2816 | 4.85k | if (status.ok() && job_stats_->num_output_files > 0) { |
2817 | 1.57k | CopyPrefix(compact_->SmallestUserKey(), |
2818 | 1.57k | CompactionJobStats::kMaxPrefixLength, |
2819 | 1.57k | &job_stats_->smallest_output_key_prefix); |
2820 | 1.57k | CopyPrefix(compact_->LargestUserKey(), CompactionJobStats::kMaxPrefixLength, |
2821 | 1.57k | &job_stats_->largest_output_key_prefix); |
2822 | 1.57k | } |
2823 | 4.85k | } |
2824 | | |
2825 | 4.85k | void CompactionJob::LogCompaction() { |
2826 | 4.85k | Compaction* compaction = compact_->compaction; |
2827 | 4.85k | ColumnFamilyData* cfd = compaction->column_family_data(); |
2828 | | // Let's check if anything will get logged. Don't prepare all the info if |
2829 | | // we're not logging |
2830 | 4.85k | if (db_options_.info_log_level <= InfoLogLevel::INFO_LEVEL) { |
2831 | 4.85k | Compaction::InputLevelSummaryBuffer inputs_summary; |
2832 | 4.85k | ROCKS_LOG_INFO( |
2833 | 4.85k | db_options_.info_log, "[%s] [JOB %d] Compacting %s, score %.2f", |
2834 | 4.85k | cfd->GetName().c_str(), job_id_, |
2835 | 4.85k | compaction->InputLevelSummary(&inputs_summary), compaction->score()); |
2836 | 4.85k | char scratch[2345]; |
2837 | 4.85k | compaction->Summary(scratch, sizeof(scratch)); |
2838 | 4.85k | ROCKS_LOG_INFO(db_options_.info_log, "[%s]: Compaction start summary: %s\n", |
2839 | 4.85k | cfd->GetName().c_str(), scratch); |
2840 | | // build event logger report |
2841 | 4.85k | auto stream = event_logger_->Log(); |
2842 | 4.85k | stream << "job" << job_id_ << "event" << "compaction_started" << "cf_name" |
2843 | 4.85k | << cfd->GetName() << "compaction_reason" |
2844 | 4.85k | << GetCompactionReasonString(compaction->compaction_reason()); |
2845 | 12.4k | for (size_t i = 0; i < compaction->num_input_levels(); ++i) { |
2846 | 7.57k | stream << ("files_L" + std::to_string(compaction->level(i))); |
2847 | 7.57k | stream.StartArray(); |
2848 | 22.4k | for (auto f : *compaction->inputs(i)) { |
2849 | 22.4k | stream << f->fd.GetNumber(); |
2850 | 22.4k | } |
2851 | 7.57k | stream.EndArray(); |
2852 | 7.57k | } |
2853 | 4.85k | stream << "score" << compaction->score() << "input_data_size" |
2854 | 4.85k | << compaction->CalculateTotalInputSize() << "oldest_snapshot_seqno" |
2855 | 4.85k | << (job_context_->snapshot_seqs.empty() |
2856 | 4.85k | ? int64_t{-1} // Use -1 for "none" |
2857 | 4.85k | : static_cast<int64_t>( |
2858 | 7 | job_context_->GetEarliestSnapshotSequence())); |
2859 | 4.85k | if (compaction->SupportsPerKeyPlacement()) { |
2860 | 0 | stream << "proximal_after_seqno" << proximal_after_seqno_; |
2861 | 0 | stream << "preserve_seqno_after" << preserve_seqno_after_; |
2862 | 0 | stream << "proximal_output_level" << compaction->GetProximalLevel(); |
2863 | 0 | stream << "proximal_output_range" |
2864 | 0 | << GetCompactionProximalOutputRangeTypeString( |
2865 | 0 | compaction->GetProximalOutputRangeType()); |
2866 | |
|
2867 | 0 | if (compaction->GetProximalOutputRangeType() == |
2868 | 0 | Compaction::ProximalOutputRangeType::kDisabled) { |
2869 | 0 | ROCKS_LOG_WARN( |
2870 | 0 | db_options_.info_log, |
2871 | 0 | "[%s] [JOB %d] Proximal level output is disabled, likely " |
2872 | 0 | "because of the range conflict in the proximal level", |
2873 | 0 | cfd->GetName().c_str(), job_id_); |
2874 | 0 | } |
2875 | 0 | } |
2876 | 4.85k | } |
2877 | 4.85k | } |
2878 | | |
2879 | 4.57k | std::string CompactionJob::GetTableFileName(uint64_t file_number) { |
2880 | 4.57k | return TableFileName(compact_->compaction->immutable_options().cf_paths, |
2881 | 4.57k | file_number, compact_->compaction->output_path_id()); |
2882 | 4.57k | } |
2883 | | |
2884 | 8.71k | Env::IOPriority CompactionJob::GetRateLimiterPriority() { |
2885 | 8.71k | if (versions_ && versions_->GetColumnFamilySet() && |
2886 | 8.71k | versions_->GetColumnFamilySet()->write_controller()) { |
2887 | 8.71k | WriteController* write_controller = |
2888 | 8.71k | versions_->GetColumnFamilySet()->write_controller(); |
2889 | 8.71k | if (write_controller->NeedsDelay() || write_controller->IsStopped()) { |
2890 | 61 | return Env::IO_USER; |
2891 | 61 | } |
2892 | 8.71k | } |
2893 | | |
2894 | 8.65k | return Env::IO_LOW; |
2895 | 8.71k | } |
2896 | | |
2897 | | Status CompactionJob::ReadTablePropertiesDirectly( |
2898 | | const ImmutableOptions& ioptions, const MutableCFOptions& moptions, |
2899 | | const FileMetaData* file_meta, const ReadOptions& read_options, |
2900 | 0 | std::shared_ptr<const TableProperties>* tp) { |
2901 | 0 | std::unique_ptr<FSRandomAccessFile> file; |
2902 | 0 | std::string file_name = GetTableFileName(file_meta->fd.GetNumber()); |
2903 | 0 | FileOptions fopts = file_options_; |
2904 | 0 | fopts.file_checksum = file_meta->file_checksum; |
2905 | 0 | fopts.file_checksum_func_name = file_meta->file_checksum_func_name; |
2906 | 0 | Status s = ioptions.fs->NewRandomAccessFile(file_name, fopts, &file, |
2907 | 0 | nullptr /* dbg */); |
2908 | 0 | if (!s.ok()) { |
2909 | 0 | return s; |
2910 | 0 | } |
2911 | | |
2912 | 0 | std::unique_ptr<RandomAccessFileReader> file_reader( |
2913 | 0 | new RandomAccessFileReader( |
2914 | 0 | std::move(file), file_name, ioptions.clock, io_tracer_, |
2915 | 0 | ioptions.stats, Histograms::SST_READ_MICROS /* hist_type */, |
2916 | 0 | nullptr /* file_read_hist */, ioptions.rate_limiter.get(), |
2917 | 0 | ioptions.listeners)); |
2918 | |
|
2919 | 0 | std::unique_ptr<TableProperties> props; |
2920 | |
|
2921 | 0 | uint64_t magic_number = kBlockBasedTableMagicNumber; |
2922 | |
|
2923 | 0 | const auto* table_factory = moptions.table_factory.get(); |
2924 | 0 | if (table_factory == nullptr) { |
2925 | 0 | return Status::Incomplete("Table factory is not set"); |
2926 | 0 | } else { |
2927 | 0 | const auto& table_factory_name = table_factory->Name(); |
2928 | 0 | if (table_factory_name == TableFactory::kPlainTableName()) { |
2929 | 0 | magic_number = kPlainTableMagicNumber; |
2930 | 0 | } else if (table_factory_name == TableFactory::kCuckooTableName()) { |
2931 | 0 | magic_number = kCuckooTableMagicNumber; |
2932 | 0 | } |
2933 | 0 | } |
2934 | | |
2935 | 0 | s = ReadTableProperties(file_reader.get(), file_meta->fd.GetFileSize(), |
2936 | 0 | magic_number, ioptions, read_options, &props); |
2937 | 0 | if (!s.ok()) { |
2938 | 0 | return s; |
2939 | 0 | } |
2940 | | |
2941 | 0 | *tp = std::move(props); |
2942 | 0 | return s; |
2943 | 0 | } |
2944 | | |
2945 | | Status CompactionJob::ReadOutputFilesTableProperties( |
2946 | | const autovector<FileMetaData>& output_files, |
2947 | | const ReadOptions& read_options, |
2948 | | std::vector<std::shared_ptr<const TableProperties>>& |
2949 | | output_files_table_properties, |
2950 | 0 | bool is_proximal_level) { |
2951 | 0 | assert(!output_files.empty()); |
2952 | |
|
2953 | 0 | static const char* level_type = |
2954 | 0 | is_proximal_level ? "proximal output" : "output"; |
2955 | |
|
2956 | 0 | output_files_table_properties.reserve(output_files.size()); |
2957 | |
|
2958 | 0 | Status s; |
2959 | |
|
2960 | 0 | for (const FileMetaData& metadata : output_files) { |
2961 | 0 | std::shared_ptr<const TableProperties> tp; |
2962 | 0 | s = ReadTablePropertiesDirectly(compact_->compaction->immutable_options(), |
2963 | 0 | compact_->compaction->mutable_cf_options(), |
2964 | 0 | &metadata, read_options, &tp); |
2965 | 0 | if (!s.ok()) { |
2966 | 0 | ROCKS_LOG_ERROR( |
2967 | 0 | db_options_.info_log, |
2968 | 0 | "Failed to read table properties for %s level output file #%" PRIu64 |
2969 | 0 | ": %s", |
2970 | 0 | level_type, metadata.fd.GetNumber(), s.ToString().c_str()); |
2971 | 0 | return s; |
2972 | 0 | } |
2973 | | |
2974 | 0 | if (tp == nullptr) { |
2975 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, |
2976 | 0 | "Empty table property for %s level output file #%" PRIu64 |
2977 | 0 | "", |
2978 | 0 | level_type, metadata.fd.GetNumber()); |
2979 | |
|
2980 | 0 | s = Status::Corruption("Empty table property for " + |
2981 | 0 | std::string(level_type) + |
2982 | 0 | " level output files during resuming"); |
2983 | 0 | return s; |
2984 | 0 | } |
2985 | 0 | output_files_table_properties.push_back(tp); |
2986 | 0 | } |
2987 | 0 | return s; |
2988 | 0 | } |
2989 | | |
2990 | | void CompactionJob::RestoreCompactionOutputs( |
2991 | | const ColumnFamilyData* cfd, |
2992 | | const std::vector<std::shared_ptr<const TableProperties>>& |
2993 | | output_files_table_properties, |
2994 | | SubcompactionProgressPerLevel& subcompaction_progress_per_level, |
2995 | 0 | CompactionOutputs* outputs_to_restore) { |
2996 | 0 | assert(outputs_to_restore->GetOutputs().size() == 0); |
2997 | |
|
2998 | 0 | const auto& output_files = subcompaction_progress_per_level.GetOutputFiles(); |
2999 | |
|
3000 | 0 | const bool enable_output_hash = |
3001 | 0 | paranoid_file_checks_ || |
3002 | 0 | !!(compact_->compaction->mutable_cf_options().verify_output_flags & |
3003 | 0 | VerifyOutputFlags::kVerifyIteration); |
3004 | |
|
3005 | 0 | for (size_t i = 0; i < output_files.size(); i++) { |
3006 | 0 | FileMetaData file_copy = output_files[i]; |
3007 | |
|
3008 | 0 | outputs_to_restore->AddOutput(std::move(file_copy), |
3009 | 0 | cfd->internal_comparator(), |
3010 | 0 | enable_output_hash, true /* finished */); |
3011 | |
|
3012 | 0 | outputs_to_restore->UpdateTableProperties( |
3013 | 0 | *output_files_table_properties[i]); |
3014 | 0 | } |
3015 | |
|
3016 | 0 | outputs_to_restore->SetNumOutputRecords( |
3017 | 0 | subcompaction_progress_per_level.GetNumProcessedOutputRecords()); |
3018 | 0 | } |
3019 | | |
3020 | | // Attempt to resume compaction from a previously persisted compaction progress. |
3021 | | // |
3022 | | // RETURNS: |
3023 | | // - Status::OK(): |
3024 | | // * Input iterator positioned at next unprocessed key |
3025 | | // * CompactionOutputs objects fully restored for both output and proximal |
3026 | | // output levels in SubcompactionState |
3027 | | // * Compaction job statistics accurately reflect input and output records |
3028 | | // processed for record count verification |
3029 | | // * File number generation advanced to prevent conflicts with existing outputs |
3030 | | // - Status::NotFound(): No valid progress to resume from |
3031 | | // - Status::Corruption(): Resume key is invalid, beyond input range, or output |
3032 | | // restoration failed |
3033 | | Status CompactionJob::MaybeResumeSubcompactionProgressOnInputIterator( |
3034 | 4.85k | SubcompactionState* sub_compact, InternalIterator* input_iter) { |
3035 | 4.85k | const ReadOptions read_options(Env::IOActivity::kCompaction); |
3036 | 4.85k | ColumnFamilyData* cfd = sub_compact->compaction->column_family_data(); |
3037 | 4.85k | SubcompactionProgress& subcompaction_progress = |
3038 | 4.85k | sub_compact->GetSubcompactionProgressRef(); |
3039 | | |
3040 | 4.85k | if (subcompaction_progress.output_level_progress |
3041 | 4.85k | .GetNumProcessedOutputRecords() == 0 && |
3042 | 4.85k | subcompaction_progress.proximal_output_level_progress |
3043 | 4.85k | .GetNumProcessedOutputRecords() == 0) { |
3044 | 4.85k | return Status::NotFound("No subcompaction progress to resume"); |
3045 | 4.85k | } |
3046 | | |
3047 | 0 | ROCKS_LOG_INFO(db_options_.info_log, "[%s] [JOB %d] Resuming compaction : %s", |
3048 | 0 | cfd->GetName().c_str(), job_id_, |
3049 | 0 | subcompaction_progress.ToString().c_str()); |
3050 | |
|
3051 | 0 | input_iter->Seek(subcompaction_progress.next_internal_key_to_compact); |
3052 | |
|
3053 | 0 | if (!input_iter->Valid()) { |
3054 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, |
3055 | 0 | "[%s] [JOB %d] Iterator is invalid after " |
3056 | 0 | "seeking to the key to resume. This indicates the key is " |
3057 | 0 | "incorrectly beyond the input data range.", |
3058 | 0 | cfd->GetName().c_str(), job_id_); |
3059 | 0 | return Status::Corruption( |
3060 | 0 | "The key to resume is beyond the input data range"); |
3061 | 0 | } else if (!input_iter->status().ok()) { |
3062 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, |
3063 | 0 | "[%s] [JOB %d] Iterator has error after seeking to " |
3064 | 0 | "the key to resume: %s", |
3065 | 0 | cfd->GetName().c_str(), job_id_, |
3066 | 0 | input_iter->status().ToString().c_str()); |
3067 | 0 | return Status::Corruption( |
3068 | 0 | "Iterator has error status after seeking to the key: " + |
3069 | 0 | input_iter->status().ToString()); |
3070 | 0 | } |
3071 | | |
3072 | 0 | sub_compact->compaction_job_stats.has_accurate_num_input_records = |
3073 | 0 | subcompaction_progress.num_processed_input_records != 0; |
3074 | |
|
3075 | 0 | sub_compact->compaction_job_stats.num_input_records = |
3076 | 0 | subcompaction_progress.num_processed_input_records; |
3077 | |
|
3078 | 0 | for (const bool& is_proximal_level : {false, true}) { |
3079 | 0 | if (is_proximal_level && |
3080 | 0 | !sub_compact->compaction->SupportsPerKeyPlacement()) { |
3081 | 0 | continue; |
3082 | 0 | } |
3083 | | |
3084 | 0 | Status s; |
3085 | 0 | SubcompactionProgressPerLevel& subcompaction_progress_per_level = |
3086 | 0 | is_proximal_level |
3087 | 0 | ? subcompaction_progress.proximal_output_level_progress |
3088 | 0 | : subcompaction_progress.output_level_progress; |
3089 | |
|
3090 | 0 | const auto& output_files = |
3091 | 0 | subcompaction_progress_per_level.GetOutputFiles(); |
3092 | |
|
3093 | 0 | std::vector<std::shared_ptr<const TableProperties>> |
3094 | 0 | output_files_table_properties; |
3095 | | |
3096 | | // TODO(hx235): investigate if we can skip reading properties to save read |
3097 | | // IO |
3098 | 0 | s = ReadOutputFilesTableProperties(output_files, read_options, |
3099 | 0 | output_files_table_properties); |
3100 | 0 | if (!s.ok()) { |
3101 | 0 | ROCKS_LOG_ERROR( |
3102 | 0 | db_options_.info_log, |
3103 | 0 | "[%s] [JOB %d] Failed to read table properties for %s output level" |
3104 | 0 | "files " |
3105 | 0 | "during resume: %s.", |
3106 | 0 | cfd->GetName().c_str(), job_id_, is_proximal_level ? "proximal" : "", |
3107 | 0 | s.ToString().c_str()); |
3108 | 0 | return Status::Corruption( |
3109 | 0 | "Not able to resume due to table property reading error " + |
3110 | 0 | s.ToString()); |
3111 | 0 | } |
3112 | | |
3113 | 0 | RestoreCompactionOutputs(cfd, output_files_table_properties, |
3114 | 0 | subcompaction_progress_per_level, |
3115 | 0 | sub_compact->Outputs(is_proximal_level)); |
3116 | | |
3117 | | // Skip past all the used file numbers to avoid creating new output files |
3118 | | // after resumption that conflict with the existing output files |
3119 | 0 | for (const auto& file_meta : output_files) { |
3120 | 0 | uint64_t file_number = file_meta.fd.GetNumber(); |
3121 | 0 | while (versions_->NewFileNumber() <= file_number) { |
3122 | 0 | versions_->FetchAddFileNumber(1); |
3123 | 0 | } |
3124 | 0 | } |
3125 | 0 | } |
3126 | | |
3127 | 0 | return Status::OK(); |
3128 | 0 | } |
3129 | | |
3130 | | void CompactionJob::UpdateSubcompactionProgress( |
3131 | | const CompactionIterator* c_iter, const Slice next_table_min_key, |
3132 | 0 | SubcompactionState* sub_compact) { |
3133 | 0 | assert(c_iter); |
3134 | 0 | SubcompactionProgress& subcompaction_progress = |
3135 | 0 | sub_compact->GetSubcompactionProgressRef(); |
3136 | |
|
3137 | 0 | IterKey next_ikey_to_compact; |
3138 | 0 | next_ikey_to_compact.SetInternalKey(ExtractUserKey(next_table_min_key), |
3139 | 0 | kMaxSequenceNumber, kValueTypeForSeek); |
3140 | 0 | subcompaction_progress.next_internal_key_to_compact = |
3141 | 0 | next_ikey_to_compact.GetInternalKey().ToString(); |
3142 | | |
3143 | | // Track total processed input records for progress reporting by combining: |
3144 | | // - Resumed count: records already processed before compaction was |
3145 | | // interrupted |
3146 | | // - Current count: records scanned in the current compaction session |
3147 | | // Only update when both tracking mechanisms provide accurate counts to ensure |
3148 | | // reliability. |
3149 | 0 | subcompaction_progress.num_processed_input_records = |
3150 | 0 | c_iter->HasNumInputEntryScanned() && |
3151 | 0 | sub_compact->compaction_job_stats.has_accurate_num_input_records |
3152 | 0 | ? c_iter->NumInputEntryScanned() + |
3153 | 0 | sub_compact->compaction_job_stats.num_input_records |
3154 | 0 | : 0; |
3155 | |
|
3156 | 0 | UpdateSubcompactionProgressPerLevel( |
3157 | 0 | sub_compact, false /* is_proximal_level */, subcompaction_progress); |
3158 | |
|
3159 | 0 | if (sub_compact->compaction->SupportsPerKeyPlacement()) { |
3160 | 0 | UpdateSubcompactionProgressPerLevel( |
3161 | 0 | sub_compact, true /* is_proximal_level */, subcompaction_progress); |
3162 | 0 | } |
3163 | 0 | } |
3164 | | |
3165 | | void CompactionJob::UpdateSubcompactionProgressPerLevel( |
3166 | | SubcompactionState* sub_compact, bool is_proximal_level, |
3167 | 0 | SubcompactionProgress& subcompaction_progress) { |
3168 | 0 | SubcompactionProgressPerLevel& subcompaction_progress_per_level = |
3169 | 0 | is_proximal_level ? subcompaction_progress.proximal_output_level_progress |
3170 | 0 | : subcompaction_progress.output_level_progress; |
3171 | |
|
3172 | 0 | subcompaction_progress_per_level.SetNumProcessedOutputRecords( |
3173 | 0 | sub_compact->OutputStats(is_proximal_level)->num_output_records); |
3174 | |
|
3175 | 0 | const auto& prev_output_files = |
3176 | 0 | subcompaction_progress_per_level.GetOutputFiles(); |
3177 | |
|
3178 | 0 | const auto& current_output_files = |
3179 | 0 | sub_compact->Outputs(is_proximal_level)->GetOutputs(); |
3180 | |
|
3181 | 0 | for (size_t i = prev_output_files.size(); i < current_output_files.size(); |
3182 | 0 | i++) { |
3183 | 0 | subcompaction_progress_per_level.AddToOutputFiles( |
3184 | 0 | current_output_files[i].meta); |
3185 | 0 | } |
3186 | 0 | } |
3187 | | |
3188 | | Status CompactionJob::PersistSubcompactionProgress( |
3189 | 0 | SubcompactionState* sub_compact) { |
3190 | 0 | SubcompactionProgress& subcompaction_progress = |
3191 | 0 | sub_compact->GetSubcompactionProgressRef(); |
3192 | |
|
3193 | 0 | assert(compaction_progress_writer_); |
3194 | |
|
3195 | 0 | VersionEdit edit; |
3196 | 0 | edit.SetSubcompactionProgress(subcompaction_progress); |
3197 | |
|
3198 | 0 | std::string record; |
3199 | 0 | if (!edit.EncodeTo(&record)) { |
3200 | 0 | ROCKS_LOG_ERROR( |
3201 | 0 | db_options_.info_log, |
3202 | 0 | "[%s] [JOB %d] Failed to encode subcompaction " |
3203 | 0 | "progress", |
3204 | 0 | compact_->compaction->column_family_data()->GetName().c_str(), job_id_); |
3205 | 0 | return Status::Corruption("Failed to encode subcompaction progress"); |
3206 | 0 | } |
3207 | | |
3208 | 0 | WriteOptions write_options(Env::IOActivity::kCompaction); |
3209 | 0 | Status s = compaction_progress_writer_->AddRecord(write_options, record); |
3210 | 0 | IOOptions opts; |
3211 | 0 | if (s.ok()) { |
3212 | 0 | s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
3213 | 0 | } |
3214 | 0 | if (s.ok()) { |
3215 | 0 | s = compaction_progress_writer_->file()->Sync(opts, db_options_.use_fsync); |
3216 | 0 | } |
3217 | |
|
3218 | 0 | if (!s.ok()) { |
3219 | 0 | ROCKS_LOG_ERROR( |
3220 | 0 | db_options_.info_log, |
3221 | 0 | "[%s] [JOB %d] Failed to persist subcompaction " |
3222 | 0 | "progress: %s", |
3223 | 0 | compact_->compaction->column_family_data()->GetName().c_str(), job_id_, |
3224 | 0 | s.ToString().c_str()); |
3225 | 0 | return s; |
3226 | 0 | } |
3227 | | |
3228 | 0 | subcompaction_progress.output_level_progress |
3229 | 0 | .UpdateLastPersistedOutputFilesCount(); |
3230 | |
|
3231 | 0 | subcompaction_progress.proximal_output_level_progress |
3232 | 0 | .UpdateLastPersistedOutputFilesCount(); |
3233 | |
|
3234 | 0 | return Status::OK(); |
3235 | 0 | } |
3236 | | |
3237 | | Status CompactionJob::VerifyInputRecordCount( |
3238 | 2.72k | uint64_t num_input_range_del) const { |
3239 | 2.72k | size_t ts_sz = compact_->compaction->column_family_data() |
3240 | 2.72k | ->user_comparator() |
3241 | 2.72k | ->timestamp_size(); |
3242 | | // When trim_ts_ is non-empty, CompactionIterator takes |
3243 | | // HistoryTrimmingIterator as input iterator and sees a trimmed view of |
3244 | | // input keys. So the number of keys it processed is not suitable for |
3245 | | // verification here. |
3246 | | // TODO: support verification when trim_ts_ is non-empty. |
3247 | 2.72k | if (!(ts_sz > 0 && !trim_ts_.empty())) { |
3248 | 2.72k | assert(internal_stats_.output_level_stats.num_input_records > 0); |
3249 | | // TODO: verify the number of range deletion entries. |
3250 | 2.72k | uint64_t expected = internal_stats_.output_level_stats.num_input_records - |
3251 | 2.72k | num_input_range_del; |
3252 | 2.72k | uint64_t actual = job_stats_->num_input_records; |
3253 | 2.72k | if (expected != actual) { |
3254 | 0 | char scratch[2345]; |
3255 | 0 | compact_->compaction->Summary(scratch, sizeof(scratch)); |
3256 | 0 | std::string msg = |
3257 | 0 | "Compaction number of input keys does not match " |
3258 | 0 | "number of keys processed. Expected " + |
3259 | 0 | std::to_string(expected) + " but processed " + |
3260 | 0 | std::to_string(actual) + ". Compaction summary: " + scratch; |
3261 | 0 | ROCKS_LOG_WARN( |
3262 | 0 | db_options_.info_log, |
3263 | 0 | "[%s] [JOB %d] VerifyInputRecordCount() Status: %s", |
3264 | 0 | compact_->compaction->column_family_data()->GetName().c_str(), |
3265 | 0 | job_context_->job_id, msg.c_str()); |
3266 | 0 | if (db_options_.compaction_verify_record_count) { |
3267 | 0 | return Status::Corruption(msg); |
3268 | 0 | } |
3269 | 0 | } |
3270 | 2.72k | } |
3271 | 2.72k | return Status::OK(); |
3272 | 2.72k | } |
3273 | | |
3274 | 2.72k | Status CompactionJob::VerifyOutputRecordCount() const { |
3275 | 2.72k | uint64_t total_output_num = 0; |
3276 | 2.72k | for (const auto& state : compact_->sub_compact_states) { |
3277 | 2.72k | for (const auto& output : state.GetOutputs()) { |
3278 | 1.57k | total_output_num += output.table_properties->num_entries - |
3279 | 1.57k | output.table_properties->num_range_deletions; |
3280 | 1.57k | } |
3281 | 2.72k | } |
3282 | | |
3283 | 2.72k | uint64_t expected = internal_stats_.output_level_stats.num_output_records; |
3284 | 2.72k | if (internal_stats_.has_proximal_level_output) { |
3285 | 0 | expected += internal_stats_.proximal_level_stats.num_output_records; |
3286 | 0 | } |
3287 | 2.72k | if (expected != total_output_num) { |
3288 | 0 | char scratch[2345]; |
3289 | 0 | compact_->compaction->Summary(scratch, sizeof(scratch)); |
3290 | 0 | std::string msg = |
3291 | 0 | "Number of keys in compaction output SST files does not match " |
3292 | 0 | "number of keys added. Expected " + |
3293 | 0 | std::to_string(expected) + " but there are " + |
3294 | 0 | std::to_string(total_output_num) + |
3295 | 0 | " in output SST files. Compaction summary: " + scratch; |
3296 | 0 | ROCKS_LOG_WARN( |
3297 | 0 | db_options_.info_log, |
3298 | 0 | "[%s] [JOB %d] VerifyOutputRecordCount() status: %s", |
3299 | 0 | compact_->compaction->column_family_data()->GetName().c_str(), |
3300 | 0 | job_context_->job_id, msg.c_str()); |
3301 | 0 | if (db_options_.compaction_verify_record_count) { |
3302 | 0 | return Status::Corruption(msg); |
3303 | 0 | } |
3304 | 0 | } |
3305 | 2.72k | return Status::OK(); |
3306 | 2.72k | } |
3307 | | } // namespace ROCKSDB_NAMESPACE |