/src/rocksdb/db/db_impl/db_impl_files.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | #include <array> |
10 | | #include <cinttypes> |
11 | | #include <set> |
12 | | #include <unordered_set> |
13 | | |
14 | | #include "db/blob/blob_file_partition_manager.h" |
15 | | #include "db/blob/blob_log_format.h" |
16 | | #include "db/db_impl/db_impl.h" |
17 | | #include "db/event_helpers.h" |
18 | | #include "db/memtable_list.h" |
19 | | #include "file/file_util.h" |
20 | | #include "file/filename.h" |
21 | | #include "file/sst_file_manager_impl.h" |
22 | | #include "logging/logging.h" |
23 | | #include "monitoring/thread_status_util.h" |
24 | | #include "port/port.h" |
25 | | #include "rocksdb/options.h" |
26 | | #include "util/autovector.h" |
27 | | #include "util/defer.h" |
28 | | |
29 | | namespace ROCKSDB_NAMESPACE { |
30 | | |
31 | | namespace { |
32 | | |
33 | 0 | Env::IOActivity GetCurrentThreadIOActivityForMetadataRead() { |
34 | 0 | switch (ThreadStatusUtil::GetThreadOperation()) { |
35 | 0 | case ThreadStatus::OperationType::OP_FLUSH: |
36 | 0 | return Env::IOActivity::kFlush; |
37 | 0 | case ThreadStatus::OperationType::OP_COMPACTION: |
38 | 0 | return Env::IOActivity::kCompaction; |
39 | 0 | case ThreadStatus::OperationType::OP_DBOPEN: |
40 | 0 | return Env::IOActivity::kDBOpen; |
41 | 0 | case ThreadStatus::OperationType::OP_GET: |
42 | 0 | return Env::IOActivity::kGet; |
43 | 0 | case ThreadStatus::OperationType::OP_MULTIGET: |
44 | 0 | return Env::IOActivity::kMultiGet; |
45 | 0 | case ThreadStatus::OperationType::OP_DBITERATOR: |
46 | 0 | return Env::IOActivity::kDBIterator; |
47 | 0 | case ThreadStatus::OperationType::OP_VERIFY_DB_CHECKSUM: |
48 | 0 | return Env::IOActivity::kVerifyDBChecksum; |
49 | 0 | case ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS: |
50 | 0 | return Env::IOActivity::kVerifyFileChecksums; |
51 | 0 | case ThreadStatus::OperationType::OP_GETENTITY: |
52 | 0 | return Env::IOActivity::kGetEntity; |
53 | 0 | case ThreadStatus::OperationType::OP_MULTIGETENTITY: |
54 | 0 | return Env::IOActivity::kMultiGetEntity; |
55 | 0 | case ThreadStatus::OperationType:: |
56 | 0 | OP_GET_FILE_CHECKSUMS_FROM_CURRENT_MANIFEST: |
57 | 0 | return Env::IOActivity::kGetFileChecksumsFromCurrentManifest; |
58 | 0 | default: |
59 | 0 | return Env::IOActivity::kUnknown; |
60 | 0 | } |
61 | 0 | } |
62 | | |
63 | | // A full-scan obsolete-file purge can observe a newly created direct-write |
64 | | // blob file before it is added to the manager's active-file set. Those |
65 | | // in-flight files are still missing their footer, so conservatively keep any |
66 | | // footer-less blob file here rather than risk deleting a live write-path file. |
67 | | bool ShouldKeepFooterlessBlobFile(FileSystem* fs, |
68 | | const FileOptions& file_options, |
69 | 0 | const std::string& blob_file_path) { |
70 | 0 | assert(fs != nullptr); |
71 | |
|
72 | 0 | constexpr IODebugContext* dbg = nullptr; |
73 | | // This purge path can run from DB open, flush/compaction cleanup, or |
74 | | // iterator-triggered obsolete-file cleanup. Tag the probe with the current |
75 | | // thread activity so db_stress keeps validating the read against the active |
76 | | // operation instead of seeing an unexpected kUnknown metadata read. |
77 | 0 | IOOptions io_options; |
78 | 0 | io_options.io_activity = GetCurrentThreadIOActivityForMetadataRead(); |
79 | 0 | uint64_t file_size = 0; |
80 | 0 | IOStatus io_s = fs->GetFileSize(blob_file_path, io_options, &file_size, dbg); |
81 | 0 | if (!io_s.ok()) { |
82 | 0 | return !io_s.IsPathNotFound(); |
83 | 0 | } |
84 | | |
85 | 0 | if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) { |
86 | 0 | return true; |
87 | 0 | } |
88 | | |
89 | 0 | FileOptions read_file_options = file_options; |
90 | 0 | read_file_options.use_direct_reads = false; |
91 | |
|
92 | 0 | std::unique_ptr<FSRandomAccessFile> file; |
93 | 0 | io_s = fs->NewRandomAccessFile(blob_file_path, read_file_options, &file, dbg); |
94 | 0 | if (!io_s.ok()) { |
95 | 0 | return !io_s.IsPathNotFound(); |
96 | 0 | } |
97 | | |
98 | 0 | std::array<char, BlobLogFooter::kSize> scratch{}; |
99 | 0 | Slice footer_slice; |
100 | 0 | io_s = file->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize, |
101 | 0 | io_options, &footer_slice, scratch.data(), dbg); |
102 | 0 | if (!io_s.ok()) { |
103 | 0 | return !io_s.IsPathNotFound(); |
104 | 0 | } |
105 | | |
106 | 0 | if (footer_slice.size() != BlobLogFooter::kSize) { |
107 | 0 | return true; |
108 | 0 | } |
109 | | |
110 | 0 | BlobLogFooter footer; |
111 | 0 | return !footer.DecodeFrom(footer_slice).ok(); |
112 | 0 | } |
113 | | |
114 | | } // namespace |
115 | | |
116 | 142k | uint64_t DBImpl::MinLogNumberToKeep() { |
117 | 142k | return versions_->min_log_number_to_keep(); |
118 | 142k | } |
119 | | |
120 | 0 | uint64_t DBImpl::MinLogNumberToRecycle() { return min_wal_number_to_recycle_; } |
121 | | |
122 | 107k | uint64_t DBImpl::MinObsoleteSstNumberToKeep() { |
123 | 107k | mutex_.AssertHeld(); |
124 | 107k | if (!pending_outputs_.empty()) { |
125 | 292 | return *pending_outputs_.begin(); |
126 | 292 | } |
127 | 107k | return std::numeric_limits<uint64_t>::max(); |
128 | 107k | } |
129 | | |
130 | 0 | uint64_t DBImpl::GetObsoleteSstFilesSize() { |
131 | 0 | mutex_.AssertHeld(); |
132 | 0 | return versions_->GetObsoleteSstFilesSize(); |
133 | 0 | } |
134 | | |
135 | 107k | uint64_t DBImpl::MinOptionsFileNumberToKeep() { |
136 | 107k | mutex_.AssertHeld(); |
137 | 107k | if (!min_options_file_numbers_.empty()) { |
138 | 0 | return *min_options_file_numbers_.begin(); |
139 | 0 | } |
140 | 107k | return std::numeric_limits<uint64_t>::max(); |
141 | 107k | } |
142 | | |
143 | 0 | Status DBImpl::DisableFileDeletions() { |
144 | 0 | Status s; |
145 | 0 | int my_disable_delete_obsolete_files; |
146 | 0 | { |
147 | 0 | InstrumentedMutexLock l(&mutex_); |
148 | 0 | s = DisableFileDeletionsWithLock(); |
149 | 0 | my_disable_delete_obsolete_files = disable_delete_obsolete_files_; |
150 | 0 | } |
151 | 0 | if (my_disable_delete_obsolete_files == 1) { |
152 | 0 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled"); |
153 | 0 | } else { |
154 | 0 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
155 | 0 | "File Deletions Disabled, but already disabled. Counter: %d", |
156 | 0 | my_disable_delete_obsolete_files); |
157 | 0 | } |
158 | 0 | return s; |
159 | 0 | } |
160 | | |
161 | | // FIXME: can be inconsistent with DisableFileDeletions in cases like |
162 | | // DBImplReadOnly |
163 | 0 | Status DBImpl::DisableFileDeletionsWithLock() { |
164 | 0 | mutex_.AssertHeld(); |
165 | 0 | ++disable_delete_obsolete_files_; |
166 | 0 | return Status::OK(); |
167 | 0 | } |
168 | | |
169 | 0 | Status DBImpl::EnableFileDeletions() { |
170 | | // Job id == 0 means that this is not our background process, but rather |
171 | | // user thread |
172 | 0 | JobContext job_context(0); |
173 | 0 | int saved_counter; // initialize on all paths |
174 | 0 | { |
175 | 0 | InstrumentedMutexLock l(&mutex_); |
176 | 0 | if (disable_delete_obsolete_files_ > 0) { |
177 | 0 | --disable_delete_obsolete_files_; |
178 | 0 | } |
179 | 0 | saved_counter = disable_delete_obsolete_files_; |
180 | 0 | if (saved_counter == 0) { |
181 | 0 | FindObsoleteFiles(&job_context, true); |
182 | 0 | bg_cv_.SignalAll(); |
183 | 0 | } |
184 | 0 | } |
185 | 0 | if (saved_counter == 0) { |
186 | 0 | ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled"); |
187 | 0 | if (job_context.HaveSomethingToDelete()) { |
188 | 0 | PurgeObsoleteFiles(job_context); |
189 | 0 | } |
190 | 0 | } else { |
191 | 0 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
192 | 0 | "File Deletions Enable, but not really enabled. Counter: %d", |
193 | 0 | saved_counter); |
194 | 0 | } |
195 | 0 | job_context.Clean(); |
196 | 0 | LogFlush(immutable_db_options_.info_log); |
197 | 0 | return Status::OK(); |
198 | 0 | } |
199 | | |
200 | 0 | bool DBImpl::IsFileDeletionsEnabled() const { |
201 | 0 | return 0 == disable_delete_obsolete_files_; |
202 | 0 | } |
203 | | |
204 | | // * Returns the list of live files in 'sst_live' and 'blob_live'. |
205 | | // If it's doing full scan: |
206 | | // * Returns the list of all files in the filesystem in |
207 | | // 'full_scan_candidate_files'. |
208 | | // Otherwise, gets obsolete files from VersionSet. |
209 | | // no_full_scan = true -- never do the full scan using GetChildren() |
210 | | // force = false -- don't force the full scan, except every |
211 | | // mutable_db_options_.delete_obsolete_files_period_micros |
212 | | // force = true -- force the full scan |
213 | | void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, |
214 | 107k | bool no_full_scan) { |
215 | 107k | mutex_.AssertHeld(); |
216 | | |
217 | | // if deletion is disabled, do nothing |
218 | 107k | if (disable_delete_obsolete_files_ > 0) { |
219 | 0 | return; |
220 | 0 | } |
221 | | |
222 | 107k | bool doing_the_full_scan = false; |
223 | | |
224 | | // logic for figuring out if we're doing the full scan |
225 | 107k | if (no_full_scan) { |
226 | 15.2k | doing_the_full_scan = false; |
227 | 92.6k | } else if (force || |
228 | 84.4k | mutable_db_options_.delete_obsolete_files_period_micros == 0) { |
229 | 84.4k | doing_the_full_scan = true; |
230 | 84.4k | } else { |
231 | 8.19k | const uint64_t now_micros = immutable_db_options_.clock->NowMicros(); |
232 | 8.19k | if ((delete_obsolete_files_last_run_ + |
233 | 8.19k | mutable_db_options_.delete_obsolete_files_period_micros) < |
234 | 8.19k | now_micros) { |
235 | 0 | doing_the_full_scan = true; |
236 | 0 | delete_obsolete_files_last_run_ = now_micros; |
237 | 0 | } |
238 | 8.19k | } |
239 | | |
240 | | // don't delete files that might be currently written to from compaction |
241 | | // threads |
242 | | // Since job_context->min_pending_output is set, until file scan finishes, |
243 | | // mutex_ cannot be released. Otherwise, we might see no min_pending_output |
244 | | // here but later find newer generated unfinalized files while scanning. |
245 | 107k | job_context->min_pending_output = MinObsoleteSstNumberToKeep(); |
246 | 107k | job_context->files_to_quarantine = error_handler_.GetFilesToQuarantine(); |
247 | 107k | job_context->min_options_file_number = MinOptionsFileNumberToKeep(); |
248 | 107k | job_context->min_blob_file_number_to_keep = |
249 | 107k | versions_->current_next_file_number(); |
250 | 139k | for (auto* cfd : *versions_->GetColumnFamilySet()) { |
251 | 139k | auto* mgr = cfd->blob_partition_manager(); |
252 | 139k | if (mgr != nullptr) { |
253 | 0 | mgr->GetActiveBlobFileNumbers( |
254 | 0 | &job_context->active_blob_direct_write_files); |
255 | 0 | mgr->GetProtectedBlobFileNumbers( |
256 | 0 | &job_context->active_blob_direct_write_files); |
257 | 0 | } |
258 | 139k | } |
259 | | |
260 | | // Get obsolete files. This function will also update the list of |
261 | | // pending files in VersionSet(). |
262 | 107k | assert(versions_); |
263 | 107k | versions_->GetObsoleteFiles( |
264 | 107k | &job_context->sst_delete_files, &job_context->blob_delete_files, |
265 | 107k | &job_context->manifest_delete_files, job_context->min_pending_output); |
266 | | |
267 | | // Mark the elements in job_context->sst_delete_files and |
268 | | // job_context->blob_delete_files as "grabbed for purge" so that other threads |
269 | | // calling FindObsoleteFiles with full_scan=true will not add these files to |
270 | | // candidate list for purge. |
271 | 107k | for (const auto& sst_to_del : job_context->sst_delete_files) { |
272 | 11.0k | MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber()); |
273 | 11.0k | } |
274 | | |
275 | 107k | for (const auto& blob_file : job_context->blob_delete_files) { |
276 | 0 | MarkAsGrabbedForPurge(blob_file.GetBlobFileNumber()); |
277 | 0 | } |
278 | | |
279 | | // store the current filenum, lognum, etc |
280 | 107k | job_context->manifest_file_number = versions_->manifest_file_number(); |
281 | 107k | job_context->pending_manifest_file_number = |
282 | 107k | versions_->pending_manifest_file_number(); |
283 | 107k | job_context->log_number = MinLogNumberToKeep(); |
284 | 107k | job_context->prev_log_number = versions_->prev_log_number(); |
285 | | |
286 | 107k | if (doing_the_full_scan) { |
287 | 84.4k | versions_->AddLiveFiles(&job_context->sst_live, &job_context->blob_live); |
288 | 84.4k | InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), |
289 | 84.4k | dbname_); |
290 | | // PurgeObsoleteFiles will dedupe duplicate files. |
291 | 84.4k | IOOptions io_opts; |
292 | 84.4k | io_opts.do_not_recurse = true; |
293 | 84.4k | for (auto& path : CollectAllDBPaths()) { |
294 | | // set of all files in the directory. We'll exclude files that are still |
295 | | // alive in the subsequent processings. |
296 | 84.4k | std::vector<std::string> files; |
297 | 84.4k | Status s = immutable_db_options_.fs->GetChildren( |
298 | 84.4k | path, io_opts, &files, /*IODebugContext*=*/nullptr); |
299 | 84.4k | s.PermitUncheckedError(); // TODO: What should we do on error? |
300 | 1.55M | for (const std::string& file : files) { |
301 | 1.55M | uint64_t number; |
302 | 1.55M | FileType type; |
303 | | // 1. If we cannot parse the file name, we skip; |
304 | | // 2. If the file with file_number equals number has already been |
305 | | // grabbed for purge by another compaction job, or it has already been |
306 | | // schedule for purge, we also skip it if we |
307 | | // are doing full scan in order to avoid double deletion of the same |
308 | | // file under race conditions. See |
309 | | // https://github.com/facebook/rocksdb/issues/3573 |
310 | 1.55M | if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) || |
311 | 1.55M | !ShouldPurge(number)) { |
312 | 4.82k | continue; |
313 | 4.82k | } |
314 | | |
315 | | // TODO(icanadi) clean up this mess to avoid having one-off "/" |
316 | | // prefixes |
317 | 1.55M | job_context->full_scan_candidate_files.emplace_back("/" + file, path); |
318 | 1.55M | } |
319 | 84.4k | } |
320 | | |
321 | | // Add log files in wal_dir |
322 | 84.4k | if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) { |
323 | 0 | std::vector<std::string> log_files; |
324 | 0 | Status s = immutable_db_options_.fs->GetChildren( |
325 | 0 | immutable_db_options_.wal_dir, io_opts, &log_files, |
326 | 0 | /*IODebugContext*=*/nullptr); |
327 | 0 | s.PermitUncheckedError(); // TODO: What should we do on error? |
328 | 0 | for (const std::string& log_file : log_files) { |
329 | 0 | job_context->full_scan_candidate_files.emplace_back( |
330 | 0 | log_file, immutable_db_options_.wal_dir); |
331 | 0 | } |
332 | 0 | } |
333 | | |
334 | | // Add info log files in db_log_dir |
335 | 84.4k | if (!immutable_db_options_.db_log_dir.empty() && |
336 | 0 | immutable_db_options_.db_log_dir != dbname_) { |
337 | 0 | std::vector<std::string> info_log_files; |
338 | 0 | Status s = immutable_db_options_.fs->GetChildren( |
339 | 0 | immutable_db_options_.db_log_dir, io_opts, &info_log_files, |
340 | 0 | /*IODebugContext*=*/nullptr); |
341 | 0 | s.PermitUncheckedError(); // TODO: What should we do on error? |
342 | 0 | for (std::string& log_file : info_log_files) { |
343 | 0 | job_context->full_scan_candidate_files.emplace_back( |
344 | 0 | log_file, immutable_db_options_.db_log_dir); |
345 | 0 | } |
346 | 0 | } |
347 | 84.4k | } else { |
348 | | // Instead of filling ob_context->sst_live and job_context->blob_live, |
349 | | // directly remove files that show up in any Version. This is because |
350 | | // candidate files tend to be a small percentage of all files, so it is |
351 | | // usually cheaper to check them against every version, compared to |
352 | | // building a map for all files. |
353 | 23.4k | versions_->RemoveLiveFiles(job_context->sst_delete_files, |
354 | 23.4k | job_context->blob_delete_files); |
355 | 23.4k | } |
356 | | |
357 | | // Before potentially releasing mutex and waiting on condvar, increment |
358 | | // pending_purge_obsolete_files_ so that another thread executing |
359 | | // `GetSortedWals` will wait until this thread finishes execution since the |
360 | | // other thread will be waiting for `pending_purge_obsolete_files_`. |
361 | | // pending_purge_obsolete_files_ MUST be decremented if there is nothing to |
362 | | // delete. |
363 | 107k | ++pending_purge_obsolete_files_; |
364 | | |
365 | 107k | Defer cleanup([job_context, this]() { |
366 | 107k | assert(job_context != nullptr); |
367 | 107k | if (!job_context->HaveSomethingToDelete()) { |
368 | 16.7k | mutex_.AssertHeld(); |
369 | 16.7k | --pending_purge_obsolete_files_; |
370 | 16.7k | if (pending_purge_obsolete_files_ == 0) { |
371 | 16.7k | bg_cv_.SignalAll(); |
372 | 16.7k | } |
373 | 16.7k | } |
374 | 107k | }); |
375 | | |
376 | | // logs_ is empty when called during recovery, in which case there can't yet |
377 | | // be any tracked obsolete logs |
378 | 107k | wal_write_mutex_.Lock(); |
379 | | |
380 | 107k | if (alive_wal_files_.empty() || logs_.empty()) { |
381 | 0 | mutex_.AssertHeld(); |
382 | | // We may reach here if the db is DBImplSecondary |
383 | 0 | wal_write_mutex_.Unlock(); |
384 | 0 | return; |
385 | 0 | } |
386 | | |
387 | 107k | bool mutex_unlocked = false; |
388 | 107k | if (!alive_wal_files_.empty() && !logs_.empty()) { |
389 | 107k | uint64_t min_log_number = job_context->log_number; |
390 | 107k | size_t num_alive_wal_files = alive_wal_files_.size(); |
391 | | // find newly obsoleted log files |
392 | 109k | while (alive_wal_files_.begin()->number < min_log_number) { |
393 | 1.83k | auto& earliest = *alive_wal_files_.begin(); |
394 | 1.83k | if (immutable_db_options_.recycle_log_file_num > |
395 | 1.83k | wal_recycle_files_.size() && |
396 | 0 | earliest.number >= MinLogNumberToRecycle()) { |
397 | 0 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
398 | 0 | "adding log %" PRIu64 " to recycle list\n", |
399 | 0 | earliest.number); |
400 | 0 | wal_recycle_files_.push_back(earliest.number); |
401 | 1.83k | } else { |
402 | 1.83k | job_context->log_delete_files.push_back(earliest.number); |
403 | 1.83k | } |
404 | 1.83k | if (job_context->size_log_to_delete == 0) { |
405 | 1.83k | job_context->prev_wals_total_size = wals_total_size_.LoadRelaxed(); |
406 | 1.83k | job_context->num_alive_wal_files = num_alive_wal_files; |
407 | 1.83k | } |
408 | 1.83k | job_context->size_log_to_delete += earliest.size; |
409 | 1.83k | wals_total_size_.FetchSubRelaxed(earliest.size); |
410 | 1.83k | alive_wal_files_.pop_front(); |
411 | | |
412 | | // Current log should always stay alive since it can't have |
413 | | // number < MinLogNumber(). |
414 | 1.83k | assert(alive_wal_files_.size()); |
415 | 1.83k | } |
416 | 107k | wal_write_mutex_.Unlock(); |
417 | 107k | mutex_.Unlock(); |
418 | 107k | mutex_unlocked = true; |
419 | 107k | TEST_SYNC_POINT_CALLBACK("FindObsoleteFiles::PostMutexUnlock", nullptr); |
420 | 107k | wal_write_mutex_.Lock(); |
421 | 109k | while (!logs_.empty() && logs_.front().number < min_log_number) { |
422 | 1.83k | auto& log = logs_.front(); |
423 | 1.83k | if (log.IsSyncing()) { |
424 | 0 | wal_sync_cv_.Wait(); |
425 | | // logs_ could have changed while we were waiting. |
426 | 0 | continue; |
427 | 0 | } |
428 | | // This WAL file is not live, so it's OK if we never sync the rest of it. |
429 | | // If it's already closed, then it's been fully synced. If |
430 | | // !background_close_inactive_wals then we need to Close it before |
431 | | // removing from logs_ but not blocking while holding wal_write_mutex_. |
432 | 1.83k | if (!immutable_db_options_.background_close_inactive_wals && |
433 | 1.83k | log.writer->file()) { |
434 | | // We are taking ownership of and pinning the front entry, so we can |
435 | | // expect it to be the same after releasing and re-acquiring the lock |
436 | 1.83k | log.PrepareForSync(); |
437 | 1.83k | wal_write_mutex_.Unlock(); |
438 | | // TODO: maybe check the return value of Close. |
439 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
440 | 1.83k | auto s = log.writer->file()->Close({}); |
441 | 1.83k | s.PermitUncheckedError(); |
442 | 1.83k | wal_write_mutex_.Lock(); |
443 | 1.83k | log.writer->PublishIfClosed(); |
444 | 1.83k | assert(&log == &logs_.front()); |
445 | 1.83k | log.FinishSync(); |
446 | 1.83k | wal_sync_cv_.SignalAll(); |
447 | 1.83k | } |
448 | 1.83k | wals_to_free_.push_back(log.ReleaseWriter()); |
449 | 1.83k | logs_.pop_front(); |
450 | 1.83k | } |
451 | | // Current log cannot be obsolete. |
452 | 107k | assert(!logs_.empty()); |
453 | 107k | } |
454 | | |
455 | | // We're just cleaning up for DB::Write(). |
456 | 107k | assert(job_context->wals_to_free.empty()); |
457 | 107k | job_context->wals_to_free = wals_to_free_; |
458 | | |
459 | 107k | wals_to_free_.clear(); |
460 | 107k | wal_write_mutex_.Unlock(); |
461 | 107k | if (mutex_unlocked) { |
462 | 107k | mutex_.Lock(); |
463 | 107k | } |
464 | 107k | job_context->log_recycle_files.assign(wal_recycle_files_.begin(), |
465 | 107k | wal_recycle_files_.end()); |
466 | 107k | } |
467 | | |
468 | | // Delete obsolete files and log status and information of file deletion |
469 | | void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname, |
470 | | const std::string& path_to_sync, |
471 | 84.9k | FileType type, uint64_t number) { |
472 | 84.9k | TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl::BeforeDeletion", |
473 | 84.9k | const_cast<std::string*>(&fname)); |
474 | 84.9k | IGNORE_STATUS_IF_ERROR(Status::IOError()); |
475 | | |
476 | 84.9k | Status file_deletion_status; |
477 | 84.9k | if (type == kTableFile || type == kBlobFile || type == kWalFile) { |
478 | | // Rate limit WAL deletion only if its in the DB dir |
479 | 42.7k | file_deletion_status = DeleteDBFile( |
480 | 42.7k | &immutable_db_options_, fname, path_to_sync, |
481 | 42.7k | /*force_bg=*/false, |
482 | 42.7k | /*force_fg=*/(type == kWalFile) ? !wal_in_db_path_ : false); |
483 | 42.7k | } else { |
484 | 42.2k | file_deletion_status = env_->DeleteFile(fname); |
485 | 42.2k | } |
486 | 84.9k | TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion", |
487 | 84.9k | &file_deletion_status); |
488 | 84.9k | if (file_deletion_status.ok()) { |
489 | 84.9k | ROCKS_LOG_DEBUG(immutable_db_options_.info_log, |
490 | 84.9k | "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id, |
491 | 84.9k | fname.c_str(), type, number, |
492 | 84.9k | file_deletion_status.ToString().c_str()); |
493 | 84.9k | } else if (env_->FileExists(fname).IsNotFound()) { |
494 | 0 | ROCKS_LOG_INFO( |
495 | 0 | immutable_db_options_.info_log, |
496 | 0 | "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64 |
497 | 0 | " -- %s\n", |
498 | 0 | job_id, fname.c_str(), type, number, |
499 | 0 | file_deletion_status.ToString().c_str()); |
500 | 0 | } else { |
501 | 0 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
502 | 0 | "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n", |
503 | 0 | job_id, fname.c_str(), type, number, |
504 | 0 | file_deletion_status.ToString().c_str()); |
505 | 0 | } |
506 | 84.9k | if (type == kTableFile) { |
507 | 6.14k | EventHelpers::LogAndNotifyTableFileDeletion( |
508 | 6.14k | &event_logger_, job_id, number, fname, file_deletion_status, GetName(), |
509 | 6.14k | immutable_db_options_.listeners); |
510 | 6.14k | } |
511 | 84.9k | if (type == kBlobFile) { |
512 | 0 | EventHelpers::LogAndNotifyBlobFileDeletion( |
513 | 0 | &event_logger_, immutable_db_options_.listeners, job_id, number, fname, |
514 | 0 | file_deletion_status, GetName()); |
515 | 0 | } |
516 | 84.9k | } |
517 | | |
518 | | // Diffs the files listed in filenames and those that do not |
519 | | // belong to live files are possibly removed. Also, removes all the |
520 | | // files in sst_delete_files and log_delete_files. |
521 | | // It is not necessary to hold the mutex when invoking this method. |
522 | 91.1k | void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { |
523 | 91.1k | TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin"); |
524 | | // we'd better have sth to delete |
525 | 91.1k | assert(state.HaveSomethingToDelete()); |
526 | | |
527 | | // FindObsoleteFiles() should've populated this so nonzero |
528 | 91.1k | assert(state.manifest_file_number != 0); |
529 | | |
530 | 91.1k | IGNORE_STATUS_IF_ERROR(Status::IOError()); |
531 | | |
532 | | // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow. |
533 | 91.1k | std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(), |
534 | 91.1k | state.sst_live.end()); |
535 | 91.1k | std::unordered_set<uint64_t> blob_live_set(state.blob_live.begin(), |
536 | 91.1k | state.blob_live.end()); |
537 | 91.1k | std::unordered_set<uint64_t> wal_recycle_files_set( |
538 | 91.1k | state.log_recycle_files.begin(), state.log_recycle_files.end()); |
539 | 91.1k | std::unordered_set<uint64_t> quarantine_files_set( |
540 | 91.1k | state.files_to_quarantine.begin(), state.files_to_quarantine.end()); |
541 | | |
542 | 91.1k | auto candidate_files = state.full_scan_candidate_files; |
543 | 91.1k | candidate_files.reserve( |
544 | 91.1k | candidate_files.size() + state.sst_delete_files.size() + |
545 | 91.1k | state.blob_delete_files.size() + state.log_delete_files.size() + |
546 | 91.1k | state.manifest_delete_files.size()); |
547 | | // We may ignore the dbname when generating the file names. |
548 | 91.1k | for (auto& file : state.sst_delete_files) { |
549 | 11.0k | auto* handle = file.metadata->fd.pinned_reader.GetCacheHandle(); |
550 | 11.0k | if (file.only_delete_metadata) { |
551 | 5.46k | if (handle) { |
552 | | // Simply release handle of file that is not being deleted |
553 | 5.46k | table_cache_->Release(handle); |
554 | 5.46k | } |
555 | 5.57k | } else { |
556 | | // File is being deleted (actually obsolete) |
557 | 5.57k | auto number = file.metadata->fd.GetNumber(); |
558 | 5.57k | candidate_files.emplace_back(MakeTableFileName(number), file.path); |
559 | 5.57k | TableCache::ReleaseObsolete(table_cache_.get(), number, handle, |
560 | 5.57k | file.uncache_aggressiveness); |
561 | 5.57k | } |
562 | 11.0k | file.DeleteMetadata(); |
563 | 11.0k | } |
564 | | |
565 | 91.1k | for (const auto& blob_file : state.blob_delete_files) { |
566 | 0 | candidate_files.emplace_back(BlobFileName(blob_file.GetBlobFileNumber()), |
567 | 0 | blob_file.GetPath()); |
568 | 0 | } |
569 | | |
570 | 91.1k | auto wal_dir = immutable_db_options_.GetWalDir(); |
571 | 91.1k | for (auto file_num : state.log_delete_files) { |
572 | 1.83k | if (file_num > 0) { |
573 | 1.83k | candidate_files.emplace_back(LogFileName(file_num), wal_dir); |
574 | 1.83k | } |
575 | 1.83k | } |
576 | 91.1k | for (const auto& filename : state.manifest_delete_files) { |
577 | 42.2k | candidate_files.emplace_back(filename, dbname_); |
578 | 42.2k | } |
579 | | |
580 | | // dedup state.candidate_files so we don't try to delete the same |
581 | | // file twice |
582 | 91.1k | std::sort(candidate_files.begin(), candidate_files.end(), |
583 | 91.1k | [](const JobContext::CandidateFileInfo& lhs, |
584 | 8.17M | const JobContext::CandidateFileInfo& rhs) { |
585 | 8.17M | if (lhs.file_name < rhs.file_name) { |
586 | 6.37M | return true; |
587 | 6.37M | } else if (lhs.file_name > rhs.file_name) { |
588 | 1.75M | return false; |
589 | 1.75M | } else { |
590 | 43.7k | return (lhs.file_path < rhs.file_path); |
591 | 43.7k | } |
592 | 8.17M | }); |
593 | 91.1k | candidate_files.erase( |
594 | 91.1k | std::unique(candidate_files.begin(), candidate_files.end()), |
595 | 91.1k | candidate_files.end()); |
596 | | |
597 | 91.1k | if (state.prev_wals_total_size > 0) { |
598 | 1.83k | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
599 | 1.83k | "[JOB %d] Try to delete WAL files size %" PRIu64 |
600 | 1.83k | ", prev total WAL file size %" PRIu64 |
601 | 1.83k | ", number of live WAL files %" ROCKSDB_PRIszt ".\n", |
602 | 1.83k | state.job_id, state.size_log_to_delete, |
603 | 1.83k | state.prev_wals_total_size, state.num_alive_wal_files); |
604 | 1.83k | } |
605 | | |
606 | 91.1k | std::vector<std::string> old_info_log_files; |
607 | 91.1k | InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), |
608 | 91.1k | dbname_); |
609 | | |
610 | | // File numbers of most recent two OPTIONS file in candidate_files (found in |
611 | | // previous FindObsoleteFiles(full_scan=true)) |
612 | | // At this point, there must not be any duplicate file numbers in |
613 | | // candidate_files. |
614 | 91.1k | uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min(); |
615 | 91.1k | uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min(); |
616 | 1.56M | for (const auto& candidate_file : candidate_files) { |
617 | 1.56M | const std::string& fname = candidate_file.file_name; |
618 | 1.56M | uint64_t number; |
619 | 1.56M | FileType type; |
620 | 1.56M | if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) || |
621 | 1.56M | type != kOptionsFile) { |
622 | 1.40M | continue; |
623 | 1.40M | } |
624 | 155k | if (number > optsfile_num1) { |
625 | 155k | optsfile_num2 = optsfile_num1; |
626 | 155k | optsfile_num1 = number; |
627 | 155k | } else if (number > optsfile_num2) { |
628 | 0 | optsfile_num2 = number; |
629 | 0 | } |
630 | 155k | } |
631 | | |
632 | | // For remote compactions, we need to keep OPTIONS file that may get |
633 | | // referenced by the remote worker |
634 | | |
635 | 91.1k | optsfile_num2 = std::min(optsfile_num2, state.min_options_file_number); |
636 | | |
637 | | // Close WALs before trying to delete them. |
638 | 91.1k | for (const auto w : state.wals_to_free) { |
639 | | // TODO: maybe check the return value of Close. |
640 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
641 | 1.83k | auto s = w->Close({}); |
642 | 1.83k | s.PermitUncheckedError(); |
643 | 1.83k | } |
644 | | |
645 | 91.1k | bool own_files = OwnTablesAndLogs(); |
646 | 91.1k | std::unordered_set<uint64_t> files_to_del; |
647 | 1.56M | for (const auto& candidate_file : candidate_files) { |
648 | 1.56M | const std::string& to_delete = candidate_file.file_name; |
649 | 1.56M | uint64_t number; |
650 | 1.56M | FileType type; |
651 | | // Ignore file if we cannot recognize it. |
652 | 1.56M | if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) { |
653 | 0 | continue; |
654 | 0 | } |
655 | | |
656 | 1.56M | if (quarantine_files_set.find(number) != quarantine_files_set.end()) { |
657 | 0 | continue; |
658 | 0 | } |
659 | | |
660 | 1.56M | bool keep = true; |
661 | 1.56M | switch (type) { |
662 | 121k | case kWalFile: |
663 | 121k | keep = ((number >= state.log_number) || |
664 | 36.6k | (number == state.prev_log_number) || |
665 | 36.6k | (wal_recycle_files_set.find(number) != |
666 | 36.6k | wal_recycle_files_set.end())); |
667 | 121k | break; |
668 | 126k | case kDescriptorFile: |
669 | | // Keep my manifest file, and any newer incarnations' |
670 | | // (can happen during manifest roll) |
671 | 126k | keep = (number >= state.manifest_file_number); |
672 | 126k | break; |
673 | 163k | case kTableFile: |
674 | | // If the second condition is not there, this makes |
675 | | // DontDeletePendingOutputs fail |
676 | | // FIXME: but should NOT keep if it came from sst_delete_files? |
677 | 163k | keep = (sst_live_set.find(number) != sst_live_set.end()) || |
678 | 6.14k | number >= state.min_pending_output; |
679 | 163k | if (!keep) { |
680 | | // NOTE: sometimes redundant (if came from sst_delete_files) |
681 | | // We don't know which column family is applicable here so we don't |
682 | | // know what uncache_aggressiveness would be used with |
683 | | // ReleaseObsolete(). Anyway, obsolete files ideally go into |
684 | | // sst_delete_files for better/quicker handling, and this is just a |
685 | | // backstop. |
686 | 6.14k | TableCache::Evict(table_cache_.get(), number); |
687 | 6.14k | files_to_del.insert(number); |
688 | 6.14k | } |
689 | 163k | break; |
690 | 0 | case kBlobFile: { |
691 | 0 | keep = number >= state.min_pending_output || |
692 | 0 | number >= state.min_blob_file_number_to_keep || |
693 | 0 | (blob_live_set.find(number) != blob_live_set.end()) || |
694 | 0 | (state.active_blob_direct_write_files.find(number) != |
695 | 0 | state.active_blob_direct_write_files.end()); |
696 | 0 | if (!keep) { |
697 | 0 | const std::string blob_file_path = |
698 | 0 | BlobFileName(candidate_file.file_path, number); |
699 | 0 | if (ShouldKeepBlobFileDuringPurge(number, blob_file_path)) { |
700 | 0 | keep = true; |
701 | 0 | break; |
702 | 0 | } |
703 | 0 | TableCache::Evict(table_cache_.get(), number); |
704 | 0 | files_to_del.insert(number); |
705 | 0 | } |
706 | 0 | break; |
707 | 0 | } |
708 | 0 | case kTempFile: |
709 | | // Any temp files that are currently being written to must |
710 | | // be recorded in pending_outputs_, which is inserted into "live". |
711 | | // Also, SetCurrentFile creates a temp file when writing out new |
712 | | // manifest, which is equal to state.pending_manifest_file_number. We |
713 | | // should not delete that file |
714 | | // |
715 | | // TODO(yhchiang): carefully modify the third condition to safely |
716 | | // remove the temp options files. |
717 | 0 | keep = (sst_live_set.find(number) != sst_live_set.end()) || |
718 | 0 | (blob_live_set.find(number) != blob_live_set.end()) || |
719 | 0 | (number == state.pending_manifest_file_number) || |
720 | 0 | (to_delete.find(kOptionsFileNamePrefix) != std::string::npos); |
721 | 0 | break; |
722 | 742k | case kInfoLogFile: |
723 | 742k | keep = true; |
724 | 742k | if (number != 0) { |
725 | 658k | old_info_log_files.push_back(to_delete); |
726 | 658k | } |
727 | 742k | break; |
728 | 155k | case kOptionsFile: |
729 | 155k | keep = (number >= optsfile_num2); |
730 | 155k | break; |
731 | 0 | case kCompactionProgressFile: |
732 | | // Keep compaction progress files - they are managed |
733 | | // separately by DBImplSecondary for now |
734 | 0 | keep = true; |
735 | 0 | break; |
736 | 84.4k | case kCurrentFile: |
737 | 168k | case kDBLockFile: |
738 | 253k | case kIdentityFile: |
739 | 253k | case kMetaDatabase: |
740 | 253k | keep = true; |
741 | 253k | break; |
742 | 1.56M | } |
743 | | |
744 | 1.56M | if (keep) { |
745 | 1.47M | continue; |
746 | 1.47M | } |
747 | | |
748 | 84.9k | std::string fname; |
749 | 84.9k | std::string dir_to_sync; |
750 | 84.9k | if (type == kTableFile) { |
751 | 6.14k | fname = MakeTableFileName(candidate_file.file_path, number); |
752 | 6.14k | dir_to_sync = candidate_file.file_path; |
753 | 78.8k | } else if (type == kBlobFile) { |
754 | 0 | fname = BlobFileName(candidate_file.file_path, number); |
755 | 0 | dir_to_sync = candidate_file.file_path; |
756 | 78.8k | } else { |
757 | 78.8k | dir_to_sync = (type == kWalFile) ? wal_dir : dbname_; |
758 | 78.8k | fname = dir_to_sync + |
759 | 78.8k | ((!dir_to_sync.empty() && dir_to_sync.back() == '/') || |
760 | 78.8k | (!to_delete.empty() && to_delete.front() == '/') |
761 | 78.8k | ? "" |
762 | 78.8k | : "/") + |
763 | 78.8k | to_delete; |
764 | 78.8k | } |
765 | | |
766 | 84.9k | if (type == kWalFile && (immutable_db_options_.WAL_ttl_seconds > 0 || |
767 | 36.6k | immutable_db_options_.WAL_size_limit_MB > 0)) { |
768 | 0 | wal_manager_.ArchiveWALFile(fname, number); |
769 | 0 | continue; |
770 | 0 | } |
771 | | |
772 | | // If I do not own these files, e.g. secondary instance with max_open_files |
773 | | // = -1, then no need to delete or schedule delete these files since they |
774 | | // will be removed by their owner, e.g. the primary instance. |
775 | 84.9k | if (!own_files) { |
776 | 0 | continue; |
777 | 0 | } |
778 | 84.9k | if (schedule_only) { |
779 | 0 | InstrumentedMutexLock guard_lock(&mutex_); |
780 | 0 | SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id); |
781 | 84.9k | } else { |
782 | 84.9k | DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number); |
783 | 84.9k | } |
784 | 84.9k | } |
785 | | |
786 | 91.1k | { |
787 | | // After purging obsolete files, remove them from files_grabbed_for_purge_. |
788 | 91.1k | InstrumentedMutexLock guard_lock(&mutex_); |
789 | 91.1k | autovector<uint64_t> to_be_removed; |
790 | 91.1k | for (auto fn : files_grabbed_for_purge_) { |
791 | 16.5k | if (files_to_del.count(fn) != 0) { |
792 | 5.57k | to_be_removed.emplace_back(fn); |
793 | 5.57k | } |
794 | 16.5k | } |
795 | 91.1k | for (auto fn : to_be_removed) { |
796 | 5.57k | files_grabbed_for_purge_.erase(fn); |
797 | 5.57k | } |
798 | 91.1k | } |
799 | | |
800 | | // Delete old info log files. |
801 | 91.1k | size_t old_info_log_file_count = old_info_log_files.size(); |
802 | 91.1k | if (old_info_log_file_count != 0 && |
803 | 69.5k | old_info_log_file_count >= immutable_db_options_.keep_log_file_num) { |
804 | 0 | std::sort(old_info_log_files.begin(), old_info_log_files.end()); |
805 | 0 | size_t end = |
806 | 0 | old_info_log_file_count - immutable_db_options_.keep_log_file_num; |
807 | 0 | for (unsigned int i = 0; i <= end; i++) { |
808 | 0 | std::string& to_delete = old_info_log_files.at(i); |
809 | 0 | std::string full_path_to_delete = |
810 | 0 | (immutable_db_options_.db_log_dir.empty() |
811 | 0 | ? dbname_ |
812 | 0 | : immutable_db_options_.db_log_dir) + |
813 | 0 | "/" + to_delete; |
814 | 0 | ROCKS_LOG_INFO(immutable_db_options_.info_log, |
815 | 0 | "[JOB %d] Delete info log file %s\n", state.job_id, |
816 | 0 | full_path_to_delete.c_str()); |
817 | 0 | Status s = env_->DeleteFile(full_path_to_delete); |
818 | 0 | if (!s.ok()) { |
819 | 0 | if (env_->FileExists(full_path_to_delete).IsNotFound()) { |
820 | 0 | ROCKS_LOG_INFO( |
821 | 0 | immutable_db_options_.info_log, |
822 | 0 | "[JOB %d] Tried to delete non-existing info log file %s FAILED " |
823 | 0 | "-- %s\n", |
824 | 0 | state.job_id, to_delete.c_str(), s.ToString().c_str()); |
825 | 0 | } else { |
826 | 0 | ROCKS_LOG_ERROR(immutable_db_options_.info_log, |
827 | 0 | "[JOB %d] Delete info log file %s FAILED -- %s\n", |
828 | 0 | state.job_id, to_delete.c_str(), |
829 | 0 | s.ToString().c_str()); |
830 | 0 | } |
831 | 0 | } |
832 | 0 | } |
833 | 0 | } |
834 | 91.1k | wal_manager_.PurgeObsoleteWALFiles(); |
835 | 91.1k | LogFlush(immutable_db_options_.info_log); |
836 | 91.1k | InstrumentedMutexLock l(&mutex_); |
837 | 91.1k | --pending_purge_obsolete_files_; |
838 | 91.1k | assert(pending_purge_obsolete_files_ >= 0); |
839 | 91.1k | if (schedule_only) { |
840 | | // Must change from pending_purge_obsolete_files_ to bg_purge_scheduled_ |
841 | | // while holding mutex (for GetSortedWalFiles() etc.) |
842 | 0 | SchedulePurge(); |
843 | 0 | } |
844 | 91.1k | if (pending_purge_obsolete_files_ == 0) { |
845 | 91.1k | bg_cv_.SignalAll(); |
846 | 91.1k | } |
847 | 91.1k | TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End"); |
848 | 91.1k | } |
849 | | |
850 | 42.2k | void DBImpl::DeleteObsoleteFiles() { |
851 | 42.2k | mutex_.AssertHeld(); |
852 | 42.2k | JobContext job_context(next_job_id_.fetch_add(1)); |
853 | 42.2k | FindObsoleteFiles(&job_context, true); |
854 | | |
855 | 42.2k | mutex_.Unlock(); |
856 | 42.2k | if (job_context.HaveSomethingToDelete()) { |
857 | 42.2k | bool defer_purge = immutable_db_options_.avoid_unnecessary_blocking_io; |
858 | 42.2k | PurgeObsoleteFiles(job_context, defer_purge); |
859 | 42.2k | } |
860 | 42.2k | job_context.Clean(); |
861 | 42.2k | mutex_.Lock(); |
862 | 42.2k | } |
863 | | |
864 | | bool DBImpl::ShouldKeepBlobFileDuringPurge(uint64_t number, |
865 | 0 | const std::string& blob_file_path) { |
866 | 0 | { |
867 | 0 | InstrumentedMutexLock lock(&mutex_); |
868 | 0 | for (auto* cfd : *versions_->GetColumnFamilySet()) { |
869 | 0 | auto* mgr = cfd->blob_partition_manager(); |
870 | 0 | if (mgr != nullptr && mgr->IsTrackedBlobFileNumber(number)) { |
871 | 0 | return true; |
872 | 0 | } |
873 | 0 | } |
874 | 0 | } |
875 | | |
876 | 0 | return ShouldKeepFooterlessBlobFile(fs_.get(), file_options_, blob_file_path); |
877 | 0 | } |
878 | | |
879 | | VersionEdit GetDBRecoveryEditForObsoletingMemTables( |
880 | | VersionSet* vset, const ColumnFamilyData& cfd, |
881 | | const autovector<VersionEdit*>& edit_list, |
882 | | const autovector<ReadOnlyMemTable*>& memtables, |
883 | 1.83k | LogsWithPrepTracker* prep_tracker) { |
884 | 1.83k | VersionEdit wal_deletion_edit; |
885 | 1.83k | uint64_t min_wal_number_to_keep = 0; |
886 | 1.83k | assert(edit_list.size() > 0); |
887 | 1.83k | if (vset->db_options()->allow_2pc) { |
888 | | // Note that if mempurge is successful, the edit_list will |
889 | | // not be applicable (contains info of new min_log number to keep, |
890 | | // and level 0 file path of SST file created during normal flush, |
891 | | // so both pieces of information are irrelevant after a successful |
892 | | // mempurge operation). |
893 | 0 | min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC( |
894 | 0 | vset, cfd, edit_list, memtables, prep_tracker); |
895 | | |
896 | | // We piggyback the information of earliest log file to keep in the |
897 | | // manifest entry for the last file flushed. |
898 | 1.83k | } else { |
899 | 1.83k | min_wal_number_to_keep = |
900 | 1.83k | PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list); |
901 | 1.83k | } |
902 | | |
903 | 1.83k | wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep); |
904 | 1.83k | if (vset->db_options()->track_and_verify_wals_in_manifest) { |
905 | 0 | if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) { |
906 | 0 | wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep); |
907 | 0 | } |
908 | 0 | } |
909 | 1.83k | return wal_deletion_edit; |
910 | 1.83k | } |
911 | | |
912 | | uint64_t FindMinPrepLogReferencedByMemTable( |
913 | 0 | VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush) { |
914 | 0 | uint64_t min_log = 0; |
915 | | |
916 | | // we must look through the memtables for two phase transactions |
917 | | // that have been committed but not yet flushed |
918 | 0 | std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set( |
919 | 0 | memtables_to_flush.begin(), memtables_to_flush.end()); |
920 | 0 | for (auto loop_cfd : *vset->GetColumnFamilySet()) { |
921 | 0 | if (loop_cfd->IsDropped()) { |
922 | 0 | continue; |
923 | 0 | } |
924 | | |
925 | 0 | auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection( |
926 | 0 | &memtables_to_flush_set); |
927 | |
|
928 | 0 | if (log > 0 && (min_log == 0 || log < min_log)) { |
929 | 0 | min_log = log; |
930 | 0 | } |
931 | |
|
932 | 0 | log = loop_cfd->mem()->GetMinLogContainingPrepSection(); |
933 | |
|
934 | 0 | if (log > 0 && (min_log == 0 || log < min_log)) { |
935 | 0 | min_log = log; |
936 | 0 | } |
937 | 0 | } |
938 | |
|
939 | 0 | return min_log; |
940 | 0 | } |
941 | | |
942 | | uint64_t FindMinPrepLogReferencedByMemTable( |
943 | | VersionSet* vset, const autovector<const autovector<ReadOnlyMemTable*>*>& |
944 | 0 | memtables_to_flush) { |
945 | 0 | uint64_t min_log = 0; |
946 | |
|
947 | 0 | std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set; |
948 | 0 | for (const autovector<ReadOnlyMemTable*>* memtables : memtables_to_flush) { |
949 | 0 | memtables_to_flush_set.insert(memtables->begin(), memtables->end()); |
950 | 0 | } |
951 | 0 | for (auto loop_cfd : *vset->GetColumnFamilySet()) { |
952 | 0 | if (loop_cfd->IsDropped()) { |
953 | 0 | continue; |
954 | 0 | } |
955 | | |
956 | 0 | auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection( |
957 | 0 | &memtables_to_flush_set); |
958 | 0 | if (log > 0 && (min_log == 0 || log < min_log)) { |
959 | 0 | min_log = log; |
960 | 0 | } |
961 | |
|
962 | 0 | log = loop_cfd->mem()->GetMinLogContainingPrepSection(); |
963 | 0 | if (log > 0 && (min_log == 0 || log < min_log)) { |
964 | 0 | min_log = log; |
965 | 0 | } |
966 | 0 | } |
967 | |
|
968 | 0 | return min_log; |
969 | 0 | } |
970 | | |
971 | | uint64_t PrecomputeMinLogNumberToKeepNon2PC( |
972 | | VersionSet* vset, const ColumnFamilyData& cfd_to_flush, |
973 | 1.83k | const autovector<VersionEdit*>& edit_list) { |
974 | 1.83k | assert(vset != nullptr); |
975 | | |
976 | | // Precompute the min log number containing unflushed data for the column |
977 | | // family being flushed (`cfd_to_flush`). |
978 | 1.83k | uint64_t cf_min_log_number_to_keep = 0; |
979 | 1.83k | for (auto& e : edit_list) { |
980 | 1.83k | if (e->HasLogNumber()) { |
981 | 1.83k | cf_min_log_number_to_keep = |
982 | 1.83k | std::max(cf_min_log_number_to_keep, e->GetLogNumber()); |
983 | 1.83k | } |
984 | 1.83k | } |
985 | 1.83k | if (cf_min_log_number_to_keep == 0) { |
986 | | // No version edit contains information on log number. The log number |
987 | | // for this column family should stay the same as it is. |
988 | 0 | cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber(); |
989 | 0 | } |
990 | | |
991 | | // Get min log number containing unflushed data for other column families. |
992 | 1.83k | uint64_t min_log_number_to_keep = |
993 | 1.83k | vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush); |
994 | 1.83k | if (cf_min_log_number_to_keep != 0) { |
995 | 1.83k | min_log_number_to_keep = |
996 | 1.83k | std::min(cf_min_log_number_to_keep, min_log_number_to_keep); |
997 | 1.83k | } |
998 | 1.83k | return min_log_number_to_keep; |
999 | 1.83k | } |
1000 | | |
1001 | | uint64_t PrecomputeMinLogNumberToKeepNon2PC( |
1002 | | VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, |
1003 | 0 | const autovector<autovector<VersionEdit*>>& edit_lists) { |
1004 | 0 | assert(vset != nullptr); |
1005 | 0 | assert(!cfds_to_flush.empty()); |
1006 | 0 | assert(cfds_to_flush.size() == edit_lists.size()); |
1007 | |
|
1008 | 0 | uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t>::max(); |
1009 | 0 | for (const auto& edit_list : edit_lists) { |
1010 | 0 | uint64_t log = 0; |
1011 | 0 | for (const auto& e : edit_list) { |
1012 | 0 | if (e->HasLogNumber()) { |
1013 | 0 | log = std::max(log, e->GetLogNumber()); |
1014 | 0 | } |
1015 | 0 | } |
1016 | 0 | if (log != 0) { |
1017 | 0 | min_log_number_to_keep = std::min(min_log_number_to_keep, log); |
1018 | 0 | } |
1019 | 0 | } |
1020 | 0 | if (min_log_number_to_keep == std::numeric_limits<uint64_t>::max()) { |
1021 | 0 | min_log_number_to_keep = cfds_to_flush[0]->GetLogNumber(); |
1022 | 0 | for (size_t i = 1; i < cfds_to_flush.size(); i++) { |
1023 | 0 | min_log_number_to_keep = |
1024 | 0 | std::min(min_log_number_to_keep, cfds_to_flush[i]->GetLogNumber()); |
1025 | 0 | } |
1026 | 0 | } |
1027 | |
|
1028 | 0 | std::unordered_set<const ColumnFamilyData*> flushed_cfds( |
1029 | 0 | cfds_to_flush.begin(), cfds_to_flush.end()); |
1030 | 0 | min_log_number_to_keep = |
1031 | 0 | std::min(min_log_number_to_keep, |
1032 | 0 | vset->PreComputeMinLogNumberWithUnflushedData(flushed_cfds)); |
1033 | |
|
1034 | 0 | return min_log_number_to_keep; |
1035 | 0 | } |
1036 | | |
1037 | | uint64_t PrecomputeMinLogNumberToKeep2PC( |
1038 | | VersionSet* vset, const ColumnFamilyData& cfd_to_flush, |
1039 | | const autovector<VersionEdit*>& edit_list, |
1040 | | const autovector<ReadOnlyMemTable*>& memtables_to_flush, |
1041 | 0 | LogsWithPrepTracker* prep_tracker) { |
1042 | 0 | assert(vset != nullptr); |
1043 | 0 | assert(prep_tracker != nullptr); |
1044 | | // Calculate updated min_log_number_to_keep |
1045 | | // Since the function should only be called in 2pc mode, log number in |
1046 | | // the version edit should be sufficient. |
1047 | |
|
1048 | 0 | uint64_t min_log_number_to_keep = |
1049 | 0 | PrecomputeMinLogNumberToKeepNon2PC(vset, cfd_to_flush, edit_list); |
1050 | | |
1051 | | // if are 2pc we must consider logs containing prepared |
1052 | | // sections of outstanding transactions. |
1053 | | // |
1054 | | // We must check min logs with outstanding prep before we check |
1055 | | // logs references by memtables because a log referenced by the |
1056 | | // first data structure could transition to the second under us. |
1057 | | // |
1058 | | // TODO: iterating over all column families under db mutex. |
1059 | | // should find more optimal solution |
1060 | 0 | auto min_log_in_prep_heap = |
1061 | 0 | prep_tracker->FindMinLogContainingOutstandingPrep(); |
1062 | |
|
1063 | 0 | if (min_log_in_prep_heap != 0 && |
1064 | 0 | min_log_in_prep_heap < min_log_number_to_keep) { |
1065 | 0 | min_log_number_to_keep = min_log_in_prep_heap; |
1066 | 0 | } |
1067 | |
|
1068 | 0 | uint64_t min_log_refed_by_mem = |
1069 | 0 | FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); |
1070 | |
|
1071 | 0 | if (min_log_refed_by_mem != 0 && |
1072 | 0 | min_log_refed_by_mem < min_log_number_to_keep) { |
1073 | 0 | min_log_number_to_keep = min_log_refed_by_mem; |
1074 | 0 | } |
1075 | 0 | return min_log_number_to_keep; |
1076 | 0 | } |
1077 | | |
1078 | | uint64_t PrecomputeMinLogNumberToKeep2PC( |
1079 | | VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush, |
1080 | | const autovector<autovector<VersionEdit*>>& edit_lists, |
1081 | | const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush, |
1082 | 0 | LogsWithPrepTracker* prep_tracker) { |
1083 | 0 | assert(vset != nullptr); |
1084 | 0 | assert(prep_tracker != nullptr); |
1085 | 0 | assert(cfds_to_flush.size() == edit_lists.size()); |
1086 | 0 | assert(cfds_to_flush.size() == memtables_to_flush.size()); |
1087 | |
|
1088 | 0 | uint64_t min_log_number_to_keep = |
1089 | 0 | PrecomputeMinLogNumberToKeepNon2PC(vset, cfds_to_flush, edit_lists); |
1090 | |
|
1091 | 0 | uint64_t min_log_in_prep_heap = |
1092 | 0 | prep_tracker->FindMinLogContainingOutstandingPrep(); |
1093 | |
|
1094 | 0 | if (min_log_in_prep_heap != 0 && |
1095 | 0 | min_log_in_prep_heap < min_log_number_to_keep) { |
1096 | 0 | min_log_number_to_keep = min_log_in_prep_heap; |
1097 | 0 | } |
1098 | |
|
1099 | 0 | uint64_t min_log_refed_by_mem = |
1100 | 0 | FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush); |
1101 | |
|
1102 | 0 | if (min_log_refed_by_mem != 0 && |
1103 | 0 | min_log_refed_by_mem < min_log_number_to_keep) { |
1104 | 0 | min_log_number_to_keep = min_log_refed_by_mem; |
1105 | 0 | } |
1106 | |
|
1107 | 0 | return min_log_number_to_keep; |
1108 | 0 | } |
1109 | | |
1110 | | void DBImpl::SetDBId(std::string&& id, bool read_only, |
1111 | 7.43k | VersionEdit* version_edit) { |
1112 | 7.43k | assert(db_id_.empty()); |
1113 | 7.43k | assert(!id.empty()); |
1114 | 7.43k | db_id_ = std::move(id); |
1115 | 7.43k | if (!read_only && version_edit) { |
1116 | 7.43k | assert(version_edit != nullptr); |
1117 | 7.43k | assert(versions_->GetColumnFamilySet() != nullptr); |
1118 | 7.43k | version_edit->SetDBId(db_id_); |
1119 | 7.43k | versions_->db_id_ = db_id_; |
1120 | 7.43k | } |
1121 | 7.43k | } |
1122 | | |
1123 | | Status DBImpl::SetupDBId(const WriteOptions& write_options, bool read_only, |
1124 | | bool is_new_db, bool is_retry, |
1125 | 42.2k | VersionEdit* version_edit) { |
1126 | 42.2k | Status s; |
1127 | 42.2k | if (!is_new_db) { |
1128 | | // Check for the IDENTITY file and create it if not there or |
1129 | | // broken or not matching manifest |
1130 | 34.7k | std::string db_id_in_file; |
1131 | 34.7k | s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr); |
1132 | 34.7k | if (s.ok()) { |
1133 | 34.7k | IOOptions opts; |
1134 | 34.7k | if (is_retry) { |
1135 | 0 | opts.verify_and_reconstruct_read = true; |
1136 | 0 | } |
1137 | 34.7k | s = GetDbIdentityFromIdentityFile(opts, &db_id_in_file); |
1138 | 34.7k | if (s.ok() && !db_id_in_file.empty()) { |
1139 | 34.7k | if (db_id_.empty()) { |
1140 | | // Loaded from file and wasn't already known from manifest |
1141 | 0 | SetDBId(std::move(db_id_in_file), read_only, version_edit); |
1142 | 0 | return s; |
1143 | 34.7k | } else if (db_id_ == db_id_in_file) { |
1144 | | // Loaded from file and matches manifest |
1145 | 34.7k | return s; |
1146 | 34.7k | } |
1147 | 34.7k | } |
1148 | 34.7k | } |
1149 | 0 | if (s.IsNotFound()) { |
1150 | 0 | s = Status::OK(); |
1151 | 0 | } |
1152 | 0 | if (!s.ok()) { |
1153 | 0 | assert(s.IsIOError()); |
1154 | 0 | return s; |
1155 | 0 | } |
1156 | 0 | } |
1157 | | // Otherwise IDENTITY file is missing or no good. |
1158 | | // Generate new id if needed |
1159 | 7.43k | if (db_id_.empty()) { |
1160 | 7.43k | SetDBId(env_->GenerateUniqueId(), read_only, version_edit); |
1161 | 7.43k | } |
1162 | | // Persist it to IDENTITY file if allowed |
1163 | 7.43k | if (!read_only && immutable_db_options_.write_identity_file) { |
1164 | 7.43k | s = SetIdentityFile(write_options, env_, dbname_, |
1165 | 7.43k | immutable_db_options_.metadata_write_temperature, |
1166 | 7.43k | db_id_); |
1167 | 7.43k | } |
1168 | | // NOTE: an obsolete IDENTITY file with write_identity_file=false is handled |
1169 | | // elsewhere, so that it's only deleted after successful recovery |
1170 | 7.43k | return s; |
1171 | 42.2k | } |
1172 | | |
1173 | 168k | std::set<std::string> DBImpl::CollectAllDBPaths() { |
1174 | 168k | std::set<std::string> all_db_paths; |
1175 | 168k | all_db_paths.insert(NormalizePath(dbname_)); |
1176 | 168k | for (const auto& db_path : immutable_db_options_.db_paths) { |
1177 | 168k | all_db_paths.insert(NormalizePath(db_path.path)); |
1178 | 168k | } |
1179 | 229k | for (const auto* cfd : *versions_->GetColumnFamilySet()) { |
1180 | 229k | for (const auto& cf_path : cfd->ioptions().cf_paths) { |
1181 | 229k | all_db_paths.insert(NormalizePath(cf_path.path)); |
1182 | 229k | } |
1183 | 229k | } |
1184 | 168k | return all_db_paths; |
1185 | 168k | } |
1186 | | |
1187 | 42.2k | Status DBImpl::MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx) { |
1188 | 42.2k | mutex_.AssertHeld(); |
1189 | 42.2k | uint64_t next_file_number = versions_->current_next_file_number(); |
1190 | 42.2k | uint64_t largest_file_number = next_file_number; |
1191 | 42.2k | Status s; |
1192 | 42.2k | for (const auto& path : CollectAllDBPaths()) { |
1193 | 42.2k | std::vector<std::string> files; |
1194 | 42.2k | s = env_->GetChildren(path, &files); |
1195 | 42.2k | if (!s.ok()) { |
1196 | 0 | break; |
1197 | 0 | } |
1198 | 706k | for (const auto& fname : files) { |
1199 | 706k | uint64_t number = 0; |
1200 | 706k | FileType type; |
1201 | 706k | if (!ParseFileName(fname, &number, &type)) { |
1202 | 371k | continue; |
1203 | 371k | } |
1204 | 335k | const std::string normalized_fpath = path + kFilePathSeparator + fname; |
1205 | 335k | largest_file_number = std::max(largest_file_number, number); |
1206 | 335k | if ((type == kTableFile || type == kBlobFile)) { |
1207 | 67.5k | recovery_ctx->existing_data_files_.push_back(normalized_fpath); |
1208 | 67.5k | } |
1209 | 335k | } |
1210 | 42.2k | } |
1211 | 42.2k | if (!s.ok()) { |
1212 | 0 | return s; |
1213 | 0 | } |
1214 | | |
1215 | 42.2k | if (largest_file_number >= next_file_number) { |
1216 | 42.2k | versions_->next_file_number_.store(largest_file_number + 1); |
1217 | 42.2k | } |
1218 | | |
1219 | 42.2k | VersionEdit edit; |
1220 | 42.2k | edit.SetNextFile(versions_->next_file_number_.load()); |
1221 | 42.2k | assert(versions_->GetColumnFamilySet()); |
1222 | 42.2k | ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault(); |
1223 | | assert(default_cfd); |
1224 | 42.2k | recovery_ctx->UpdateVersionEdits(default_cfd, edit); |
1225 | 42.2k | return s; |
1226 | 42.2k | } |
1227 | | } // namespace ROCKSDB_NAMESPACE |