Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/db_impl/db_impl_files.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#include <array>
10
#include <cinttypes>
11
#include <set>
12
#include <unordered_set>
13
14
#include "db/blob/blob_file_partition_manager.h"
15
#include "db/blob/blob_log_format.h"
16
#include "db/db_impl/db_impl.h"
17
#include "db/event_helpers.h"
18
#include "db/memtable_list.h"
19
#include "file/file_util.h"
20
#include "file/filename.h"
21
#include "file/sst_file_manager_impl.h"
22
#include "logging/logging.h"
23
#include "monitoring/thread_status_util.h"
24
#include "port/port.h"
25
#include "rocksdb/options.h"
26
#include "util/autovector.h"
27
#include "util/defer.h"
28
29
namespace ROCKSDB_NAMESPACE {
30
31
namespace {
32
33
0
Env::IOActivity GetCurrentThreadIOActivityForMetadataRead() {
34
0
  switch (ThreadStatusUtil::GetThreadOperation()) {
35
0
    case ThreadStatus::OperationType::OP_FLUSH:
36
0
      return Env::IOActivity::kFlush;
37
0
    case ThreadStatus::OperationType::OP_COMPACTION:
38
0
      return Env::IOActivity::kCompaction;
39
0
    case ThreadStatus::OperationType::OP_DBOPEN:
40
0
      return Env::IOActivity::kDBOpen;
41
0
    case ThreadStatus::OperationType::OP_GET:
42
0
      return Env::IOActivity::kGet;
43
0
    case ThreadStatus::OperationType::OP_MULTIGET:
44
0
      return Env::IOActivity::kMultiGet;
45
0
    case ThreadStatus::OperationType::OP_DBITERATOR:
46
0
      return Env::IOActivity::kDBIterator;
47
0
    case ThreadStatus::OperationType::OP_VERIFY_DB_CHECKSUM:
48
0
      return Env::IOActivity::kVerifyDBChecksum;
49
0
    case ThreadStatus::OperationType::OP_VERIFY_FILE_CHECKSUMS:
50
0
      return Env::IOActivity::kVerifyFileChecksums;
51
0
    case ThreadStatus::OperationType::OP_GETENTITY:
52
0
      return Env::IOActivity::kGetEntity;
53
0
    case ThreadStatus::OperationType::OP_MULTIGETENTITY:
54
0
      return Env::IOActivity::kMultiGetEntity;
55
0
    case ThreadStatus::OperationType::
56
0
        OP_GET_FILE_CHECKSUMS_FROM_CURRENT_MANIFEST:
57
0
      return Env::IOActivity::kGetFileChecksumsFromCurrentManifest;
58
0
    default:
59
0
      return Env::IOActivity::kUnknown;
60
0
  }
61
0
}
62
63
// A full-scan obsolete-file purge can observe a newly created direct-write
64
// blob file before it is added to the manager's active-file set. Those
65
// in-flight files are still missing their footer, so conservatively keep any
66
// footer-less blob file here rather than risk deleting a live write-path file.
67
bool ShouldKeepFooterlessBlobFile(FileSystem* fs,
68
                                  const FileOptions& file_options,
69
0
                                  const std::string& blob_file_path) {
70
0
  assert(fs != nullptr);
71
72
0
  constexpr IODebugContext* dbg = nullptr;
73
  // This purge path can run from DB open, flush/compaction cleanup, or
74
  // iterator-triggered obsolete-file cleanup. Tag the probe with the current
75
  // thread activity so db_stress keeps validating the read against the active
76
  // operation instead of seeing an unexpected kUnknown metadata read.
77
0
  IOOptions io_options;
78
0
  io_options.io_activity = GetCurrentThreadIOActivityForMetadataRead();
79
0
  uint64_t file_size = 0;
80
0
  IOStatus io_s = fs->GetFileSize(blob_file_path, io_options, &file_size, dbg);
81
0
  if (!io_s.ok()) {
82
0
    return !io_s.IsPathNotFound();
83
0
  }
84
85
0
  if (file_size < BlobLogHeader::kSize + BlobLogFooter::kSize) {
86
0
    return true;
87
0
  }
88
89
0
  FileOptions read_file_options = file_options;
90
0
  read_file_options.use_direct_reads = false;
91
92
0
  std::unique_ptr<FSRandomAccessFile> file;
93
0
  io_s = fs->NewRandomAccessFile(blob_file_path, read_file_options, &file, dbg);
94
0
  if (!io_s.ok()) {
95
0
    return !io_s.IsPathNotFound();
96
0
  }
97
98
0
  std::array<char, BlobLogFooter::kSize> scratch{};
99
0
  Slice footer_slice;
100
0
  io_s = file->Read(file_size - BlobLogFooter::kSize, BlobLogFooter::kSize,
101
0
                    io_options, &footer_slice, scratch.data(), dbg);
102
0
  if (!io_s.ok()) {
103
0
    return !io_s.IsPathNotFound();
104
0
  }
105
106
0
  if (footer_slice.size() != BlobLogFooter::kSize) {
107
0
    return true;
108
0
  }
109
110
0
  BlobLogFooter footer;
111
0
  return !footer.DecodeFrom(footer_slice).ok();
112
0
}
113
114
}  // namespace
115
116
142k
uint64_t DBImpl::MinLogNumberToKeep() {
117
142k
  return versions_->min_log_number_to_keep();
118
142k
}
119
120
0
uint64_t DBImpl::MinLogNumberToRecycle() { return min_wal_number_to_recycle_; }
121
122
107k
uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
123
107k
  mutex_.AssertHeld();
124
107k
  if (!pending_outputs_.empty()) {
125
292
    return *pending_outputs_.begin();
126
292
  }
127
107k
  return std::numeric_limits<uint64_t>::max();
128
107k
}
129
130
0
uint64_t DBImpl::GetObsoleteSstFilesSize() {
131
0
  mutex_.AssertHeld();
132
0
  return versions_->GetObsoleteSstFilesSize();
133
0
}
134
135
107k
uint64_t DBImpl::MinOptionsFileNumberToKeep() {
136
107k
  mutex_.AssertHeld();
137
107k
  if (!min_options_file_numbers_.empty()) {
138
0
    return *min_options_file_numbers_.begin();
139
0
  }
140
107k
  return std::numeric_limits<uint64_t>::max();
141
107k
}
142
143
0
Status DBImpl::DisableFileDeletions() {
144
0
  Status s;
145
0
  int my_disable_delete_obsolete_files;
146
0
  {
147
0
    InstrumentedMutexLock l(&mutex_);
148
0
    s = DisableFileDeletionsWithLock();
149
0
    my_disable_delete_obsolete_files = disable_delete_obsolete_files_;
150
0
  }
151
0
  if (my_disable_delete_obsolete_files == 1) {
152
0
    ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Disabled");
153
0
  } else {
154
0
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
155
0
                   "File Deletions Disabled, but already disabled. Counter: %d",
156
0
                   my_disable_delete_obsolete_files);
157
0
  }
158
0
  return s;
159
0
}
160
161
// FIXME: can be inconsistent with DisableFileDeletions in cases like
162
// DBImplReadOnly
163
0
Status DBImpl::DisableFileDeletionsWithLock() {
164
0
  mutex_.AssertHeld();
165
0
  ++disable_delete_obsolete_files_;
166
0
  return Status::OK();
167
0
}
168
169
0
Status DBImpl::EnableFileDeletions() {
170
  // Job id == 0 means that this is not our background process, but rather
171
  // user thread
172
0
  JobContext job_context(0);
173
0
  int saved_counter;  // initialize on all paths
174
0
  {
175
0
    InstrumentedMutexLock l(&mutex_);
176
0
    if (disable_delete_obsolete_files_ > 0) {
177
0
      --disable_delete_obsolete_files_;
178
0
    }
179
0
    saved_counter = disable_delete_obsolete_files_;
180
0
    if (saved_counter == 0) {
181
0
      FindObsoleteFiles(&job_context, true);
182
0
      bg_cv_.SignalAll();
183
0
    }
184
0
  }
185
0
  if (saved_counter == 0) {
186
0
    ROCKS_LOG_INFO(immutable_db_options_.info_log, "File Deletions Enabled");
187
0
    if (job_context.HaveSomethingToDelete()) {
188
0
      PurgeObsoleteFiles(job_context);
189
0
    }
190
0
  } else {
191
0
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
192
0
                   "File Deletions Enable, but not really enabled. Counter: %d",
193
0
                   saved_counter);
194
0
  }
195
0
  job_context.Clean();
196
0
  LogFlush(immutable_db_options_.info_log);
197
0
  return Status::OK();
198
0
}
199
200
0
bool DBImpl::IsFileDeletionsEnabled() const {
201
0
  return 0 == disable_delete_obsolete_files_;
202
0
}
203
204
// * Returns the list of live files in 'sst_live' and 'blob_live'.
205
// If it's doing full scan:
206
// * Returns the list of all files in the filesystem in
207
// 'full_scan_candidate_files'.
208
// Otherwise, gets obsolete files from VersionSet.
209
// no_full_scan = true -- never do the full scan using GetChildren()
210
// force = false -- don't force the full scan, except every
211
//  mutable_db_options_.delete_obsolete_files_period_micros
212
// force = true -- force the full scan
213
void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
214
107k
                               bool no_full_scan) {
215
107k
  mutex_.AssertHeld();
216
217
  // if deletion is disabled, do nothing
218
107k
  if (disable_delete_obsolete_files_ > 0) {
219
0
    return;
220
0
  }
221
222
107k
  bool doing_the_full_scan = false;
223
224
  // logic for figuring out if we're doing the full scan
225
107k
  if (no_full_scan) {
226
15.2k
    doing_the_full_scan = false;
227
92.6k
  } else if (force ||
228
84.4k
             mutable_db_options_.delete_obsolete_files_period_micros == 0) {
229
84.4k
    doing_the_full_scan = true;
230
84.4k
  } else {
231
8.19k
    const uint64_t now_micros = immutable_db_options_.clock->NowMicros();
232
8.19k
    if ((delete_obsolete_files_last_run_ +
233
8.19k
         mutable_db_options_.delete_obsolete_files_period_micros) <
234
8.19k
        now_micros) {
235
0
      doing_the_full_scan = true;
236
0
      delete_obsolete_files_last_run_ = now_micros;
237
0
    }
238
8.19k
  }
239
240
  // don't delete files that might be currently written to from compaction
241
  // threads
242
  // Since job_context->min_pending_output is set, until file scan finishes,
243
  // mutex_ cannot be released. Otherwise, we might see no min_pending_output
244
  // here but later find newer generated unfinalized files while scanning.
245
107k
  job_context->min_pending_output = MinObsoleteSstNumberToKeep();
246
107k
  job_context->files_to_quarantine = error_handler_.GetFilesToQuarantine();
247
107k
  job_context->min_options_file_number = MinOptionsFileNumberToKeep();
248
107k
  job_context->min_blob_file_number_to_keep =
249
107k
      versions_->current_next_file_number();
250
139k
  for (auto* cfd : *versions_->GetColumnFamilySet()) {
251
139k
    auto* mgr = cfd->blob_partition_manager();
252
139k
    if (mgr != nullptr) {
253
0
      mgr->GetActiveBlobFileNumbers(
254
0
          &job_context->active_blob_direct_write_files);
255
0
      mgr->GetProtectedBlobFileNumbers(
256
0
          &job_context->active_blob_direct_write_files);
257
0
    }
258
139k
  }
259
260
  // Get obsolete files.  This function will also update the list of
261
  // pending files in VersionSet().
262
107k
  assert(versions_);
263
107k
  versions_->GetObsoleteFiles(
264
107k
      &job_context->sst_delete_files, &job_context->blob_delete_files,
265
107k
      &job_context->manifest_delete_files, job_context->min_pending_output);
266
267
  // Mark the elements in job_context->sst_delete_files and
268
  // job_context->blob_delete_files as "grabbed for purge" so that other threads
269
  // calling FindObsoleteFiles with full_scan=true will not add these files to
270
  // candidate list for purge.
271
107k
  for (const auto& sst_to_del : job_context->sst_delete_files) {
272
11.0k
    MarkAsGrabbedForPurge(sst_to_del.metadata->fd.GetNumber());
273
11.0k
  }
274
275
107k
  for (const auto& blob_file : job_context->blob_delete_files) {
276
0
    MarkAsGrabbedForPurge(blob_file.GetBlobFileNumber());
277
0
  }
278
279
  // store the current filenum, lognum, etc
280
107k
  job_context->manifest_file_number = versions_->manifest_file_number();
281
107k
  job_context->pending_manifest_file_number =
282
107k
      versions_->pending_manifest_file_number();
283
107k
  job_context->log_number = MinLogNumberToKeep();
284
107k
  job_context->prev_log_number = versions_->prev_log_number();
285
286
107k
  if (doing_the_full_scan) {
287
84.4k
    versions_->AddLiveFiles(&job_context->sst_live, &job_context->blob_live);
288
84.4k
    InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
289
84.4k
                                  dbname_);
290
    // PurgeObsoleteFiles will dedupe duplicate files.
291
84.4k
    IOOptions io_opts;
292
84.4k
    io_opts.do_not_recurse = true;
293
84.4k
    for (auto& path : CollectAllDBPaths()) {
294
      // set of all files in the directory. We'll exclude files that are still
295
      // alive in the subsequent processings.
296
84.4k
      std::vector<std::string> files;
297
84.4k
      Status s = immutable_db_options_.fs->GetChildren(
298
84.4k
          path, io_opts, &files, /*IODebugContext*=*/nullptr);
299
84.4k
      s.PermitUncheckedError();  // TODO: What should we do on error?
300
1.55M
      for (const std::string& file : files) {
301
1.55M
        uint64_t number;
302
1.55M
        FileType type;
303
        // 1. If we cannot parse the file name, we skip;
304
        // 2. If the file with file_number equals number has already been
305
        // grabbed for purge by another compaction job, or it has already been
306
        // schedule for purge, we also skip it if we
307
        // are doing full scan in order to avoid double deletion of the same
308
        // file under race conditions. See
309
        // https://github.com/facebook/rocksdb/issues/3573
310
1.55M
        if (!ParseFileName(file, &number, info_log_prefix.prefix, &type) ||
311
1.55M
            !ShouldPurge(number)) {
312
4.82k
          continue;
313
4.82k
        }
314
315
        // TODO(icanadi) clean up this mess to avoid having one-off "/"
316
        // prefixes
317
1.55M
        job_context->full_scan_candidate_files.emplace_back("/" + file, path);
318
1.55M
      }
319
84.4k
    }
320
321
    // Add log files in wal_dir
322
84.4k
    if (!immutable_db_options_.IsWalDirSameAsDBPath(dbname_)) {
323
0
      std::vector<std::string> log_files;
324
0
      Status s = immutable_db_options_.fs->GetChildren(
325
0
          immutable_db_options_.wal_dir, io_opts, &log_files,
326
0
          /*IODebugContext*=*/nullptr);
327
0
      s.PermitUncheckedError();  // TODO: What should we do on error?
328
0
      for (const std::string& log_file : log_files) {
329
0
        job_context->full_scan_candidate_files.emplace_back(
330
0
            log_file, immutable_db_options_.wal_dir);
331
0
      }
332
0
    }
333
334
    // Add info log files in db_log_dir
335
84.4k
    if (!immutable_db_options_.db_log_dir.empty() &&
336
0
        immutable_db_options_.db_log_dir != dbname_) {
337
0
      std::vector<std::string> info_log_files;
338
0
      Status s = immutable_db_options_.fs->GetChildren(
339
0
          immutable_db_options_.db_log_dir, io_opts, &info_log_files,
340
0
          /*IODebugContext*=*/nullptr);
341
0
      s.PermitUncheckedError();  // TODO: What should we do on error?
342
0
      for (std::string& log_file : info_log_files) {
343
0
        job_context->full_scan_candidate_files.emplace_back(
344
0
            log_file, immutable_db_options_.db_log_dir);
345
0
      }
346
0
    }
347
84.4k
  } else {
348
    // Instead of filling ob_context->sst_live and job_context->blob_live,
349
    // directly remove files that show up in any Version. This is because
350
    // candidate files tend to be a small percentage of all files, so it is
351
    // usually cheaper to check them against every version, compared to
352
    // building a map for all files.
353
23.4k
    versions_->RemoveLiveFiles(job_context->sst_delete_files,
354
23.4k
                               job_context->blob_delete_files);
355
23.4k
  }
356
357
  // Before potentially releasing mutex and waiting on condvar, increment
358
  // pending_purge_obsolete_files_ so that another thread executing
359
  // `GetSortedWals` will wait until this thread finishes execution since the
360
  // other thread will be waiting for `pending_purge_obsolete_files_`.
361
  // pending_purge_obsolete_files_ MUST be decremented if there is nothing to
362
  // delete.
363
107k
  ++pending_purge_obsolete_files_;
364
365
107k
  Defer cleanup([job_context, this]() {
366
107k
    assert(job_context != nullptr);
367
107k
    if (!job_context->HaveSomethingToDelete()) {
368
16.7k
      mutex_.AssertHeld();
369
16.7k
      --pending_purge_obsolete_files_;
370
16.7k
      if (pending_purge_obsolete_files_ == 0) {
371
16.7k
        bg_cv_.SignalAll();
372
16.7k
      }
373
16.7k
    }
374
107k
  });
375
376
  // logs_ is empty when called during recovery, in which case there can't yet
377
  // be any tracked obsolete logs
378
107k
  wal_write_mutex_.Lock();
379
380
107k
  if (alive_wal_files_.empty() || logs_.empty()) {
381
0
    mutex_.AssertHeld();
382
    // We may reach here if the db is DBImplSecondary
383
0
    wal_write_mutex_.Unlock();
384
0
    return;
385
0
  }
386
387
107k
  bool mutex_unlocked = false;
388
107k
  if (!alive_wal_files_.empty() && !logs_.empty()) {
389
107k
    uint64_t min_log_number = job_context->log_number;
390
107k
    size_t num_alive_wal_files = alive_wal_files_.size();
391
    // find newly obsoleted log files
392
109k
    while (alive_wal_files_.begin()->number < min_log_number) {
393
1.83k
      auto& earliest = *alive_wal_files_.begin();
394
1.83k
      if (immutable_db_options_.recycle_log_file_num >
395
1.83k
              wal_recycle_files_.size() &&
396
0
          earliest.number >= MinLogNumberToRecycle()) {
397
0
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
398
0
                       "adding log %" PRIu64 " to recycle list\n",
399
0
                       earliest.number);
400
0
        wal_recycle_files_.push_back(earliest.number);
401
1.83k
      } else {
402
1.83k
        job_context->log_delete_files.push_back(earliest.number);
403
1.83k
      }
404
1.83k
      if (job_context->size_log_to_delete == 0) {
405
1.83k
        job_context->prev_wals_total_size = wals_total_size_.LoadRelaxed();
406
1.83k
        job_context->num_alive_wal_files = num_alive_wal_files;
407
1.83k
      }
408
1.83k
      job_context->size_log_to_delete += earliest.size;
409
1.83k
      wals_total_size_.FetchSubRelaxed(earliest.size);
410
1.83k
      alive_wal_files_.pop_front();
411
412
      // Current log should always stay alive since it can't have
413
      // number < MinLogNumber().
414
1.83k
      assert(alive_wal_files_.size());
415
1.83k
    }
416
107k
    wal_write_mutex_.Unlock();
417
107k
    mutex_.Unlock();
418
107k
    mutex_unlocked = true;
419
107k
    TEST_SYNC_POINT_CALLBACK("FindObsoleteFiles::PostMutexUnlock", nullptr);
420
107k
    wal_write_mutex_.Lock();
421
109k
    while (!logs_.empty() && logs_.front().number < min_log_number) {
422
1.83k
      auto& log = logs_.front();
423
1.83k
      if (log.IsSyncing()) {
424
0
        wal_sync_cv_.Wait();
425
        // logs_ could have changed while we were waiting.
426
0
        continue;
427
0
      }
428
      // This WAL file is not live, so it's OK if we never sync the rest of it.
429
      // If it's already closed, then it's been fully synced. If
430
      // !background_close_inactive_wals then we need to Close it before
431
      // removing from logs_ but not blocking while holding wal_write_mutex_.
432
1.83k
      if (!immutable_db_options_.background_close_inactive_wals &&
433
1.83k
          log.writer->file()) {
434
        // We are taking ownership of and pinning the front entry, so we can
435
        // expect it to be the same after releasing and re-acquiring the lock
436
1.83k
        log.PrepareForSync();
437
1.83k
        wal_write_mutex_.Unlock();
438
        // TODO: maybe check the return value of Close.
439
        // TODO: plumb Env::IOActivity, Env::IOPriority
440
1.83k
        auto s = log.writer->file()->Close({});
441
1.83k
        s.PermitUncheckedError();
442
1.83k
        wal_write_mutex_.Lock();
443
1.83k
        log.writer->PublishIfClosed();
444
1.83k
        assert(&log == &logs_.front());
445
1.83k
        log.FinishSync();
446
1.83k
        wal_sync_cv_.SignalAll();
447
1.83k
      }
448
1.83k
      wals_to_free_.push_back(log.ReleaseWriter());
449
1.83k
      logs_.pop_front();
450
1.83k
    }
451
    // Current log cannot be obsolete.
452
107k
    assert(!logs_.empty());
453
107k
  }
454
455
  // We're just cleaning up for DB::Write().
456
107k
  assert(job_context->wals_to_free.empty());
457
107k
  job_context->wals_to_free = wals_to_free_;
458
459
107k
  wals_to_free_.clear();
460
107k
  wal_write_mutex_.Unlock();
461
107k
  if (mutex_unlocked) {
462
107k
    mutex_.Lock();
463
107k
  }
464
107k
  job_context->log_recycle_files.assign(wal_recycle_files_.begin(),
465
107k
                                        wal_recycle_files_.end());
466
107k
}
467
468
// Delete obsolete files and log status and information of file deletion
469
void DBImpl::DeleteObsoleteFileImpl(int job_id, const std::string& fname,
470
                                    const std::string& path_to_sync,
471
84.9k
                                    FileType type, uint64_t number) {
472
84.9k
  TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl::BeforeDeletion",
473
84.9k
                           const_cast<std::string*>(&fname));
474
84.9k
  IGNORE_STATUS_IF_ERROR(Status::IOError());
475
476
84.9k
  Status file_deletion_status;
477
84.9k
  if (type == kTableFile || type == kBlobFile || type == kWalFile) {
478
    // Rate limit WAL deletion only if its in the DB dir
479
42.7k
    file_deletion_status = DeleteDBFile(
480
42.7k
        &immutable_db_options_, fname, path_to_sync,
481
42.7k
        /*force_bg=*/false,
482
42.7k
        /*force_fg=*/(type == kWalFile) ? !wal_in_db_path_ : false);
483
42.7k
  } else {
484
42.2k
    file_deletion_status = env_->DeleteFile(fname);
485
42.2k
  }
486
84.9k
  TEST_SYNC_POINT_CALLBACK("DBImpl::DeleteObsoleteFileImpl:AfterDeletion",
487
84.9k
                           &file_deletion_status);
488
84.9k
  if (file_deletion_status.ok()) {
489
84.9k
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
490
84.9k
                    "[JOB %d] Delete %s type=%d #%" PRIu64 " -- %s\n", job_id,
491
84.9k
                    fname.c_str(), type, number,
492
84.9k
                    file_deletion_status.ToString().c_str());
493
84.9k
  } else if (env_->FileExists(fname).IsNotFound()) {
494
0
    ROCKS_LOG_INFO(
495
0
        immutable_db_options_.info_log,
496
0
        "[JOB %d] Tried to delete a non-existing file %s type=%d #%" PRIu64
497
0
        " -- %s\n",
498
0
        job_id, fname.c_str(), type, number,
499
0
        file_deletion_status.ToString().c_str());
500
0
  } else {
501
0
    ROCKS_LOG_ERROR(immutable_db_options_.info_log,
502
0
                    "[JOB %d] Failed to delete %s type=%d #%" PRIu64 " -- %s\n",
503
0
                    job_id, fname.c_str(), type, number,
504
0
                    file_deletion_status.ToString().c_str());
505
0
  }
506
84.9k
  if (type == kTableFile) {
507
6.14k
    EventHelpers::LogAndNotifyTableFileDeletion(
508
6.14k
        &event_logger_, job_id, number, fname, file_deletion_status, GetName(),
509
6.14k
        immutable_db_options_.listeners);
510
6.14k
  }
511
84.9k
  if (type == kBlobFile) {
512
0
    EventHelpers::LogAndNotifyBlobFileDeletion(
513
0
        &event_logger_, immutable_db_options_.listeners, job_id, number, fname,
514
0
        file_deletion_status, GetName());
515
0
  }
516
84.9k
}
517
518
// Diffs the files listed in filenames and those that do not
519
// belong to live files are possibly removed. Also, removes all the
520
// files in sst_delete_files and log_delete_files.
521
// It is not necessary to hold the mutex when invoking this method.
522
91.1k
void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
523
91.1k
  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:Begin");
524
  // we'd better have sth to delete
525
91.1k
  assert(state.HaveSomethingToDelete());
526
527
  // FindObsoleteFiles() should've populated this so nonzero
528
91.1k
  assert(state.manifest_file_number != 0);
529
530
91.1k
  IGNORE_STATUS_IF_ERROR(Status::IOError());
531
532
  // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow.
533
91.1k
  std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(),
534
91.1k
                                            state.sst_live.end());
535
91.1k
  std::unordered_set<uint64_t> blob_live_set(state.blob_live.begin(),
536
91.1k
                                             state.blob_live.end());
537
91.1k
  std::unordered_set<uint64_t> wal_recycle_files_set(
538
91.1k
      state.log_recycle_files.begin(), state.log_recycle_files.end());
539
91.1k
  std::unordered_set<uint64_t> quarantine_files_set(
540
91.1k
      state.files_to_quarantine.begin(), state.files_to_quarantine.end());
541
542
91.1k
  auto candidate_files = state.full_scan_candidate_files;
543
91.1k
  candidate_files.reserve(
544
91.1k
      candidate_files.size() + state.sst_delete_files.size() +
545
91.1k
      state.blob_delete_files.size() + state.log_delete_files.size() +
546
91.1k
      state.manifest_delete_files.size());
547
  // We may ignore the dbname when generating the file names.
548
91.1k
  for (auto& file : state.sst_delete_files) {
549
11.0k
    auto* handle = file.metadata->fd.pinned_reader.GetCacheHandle();
550
11.0k
    if (file.only_delete_metadata) {
551
5.46k
      if (handle) {
552
        // Simply release handle of file that is not being deleted
553
5.46k
        table_cache_->Release(handle);
554
5.46k
      }
555
5.57k
    } else {
556
      // File is being deleted (actually obsolete)
557
5.57k
      auto number = file.metadata->fd.GetNumber();
558
5.57k
      candidate_files.emplace_back(MakeTableFileName(number), file.path);
559
5.57k
      TableCache::ReleaseObsolete(table_cache_.get(), number, handle,
560
5.57k
                                  file.uncache_aggressiveness);
561
5.57k
    }
562
11.0k
    file.DeleteMetadata();
563
11.0k
  }
564
565
91.1k
  for (const auto& blob_file : state.blob_delete_files) {
566
0
    candidate_files.emplace_back(BlobFileName(blob_file.GetBlobFileNumber()),
567
0
                                 blob_file.GetPath());
568
0
  }
569
570
91.1k
  auto wal_dir = immutable_db_options_.GetWalDir();
571
91.1k
  for (auto file_num : state.log_delete_files) {
572
1.83k
    if (file_num > 0) {
573
1.83k
      candidate_files.emplace_back(LogFileName(file_num), wal_dir);
574
1.83k
    }
575
1.83k
  }
576
91.1k
  for (const auto& filename : state.manifest_delete_files) {
577
42.2k
    candidate_files.emplace_back(filename, dbname_);
578
42.2k
  }
579
580
  // dedup state.candidate_files so we don't try to delete the same
581
  // file twice
582
91.1k
  std::sort(candidate_files.begin(), candidate_files.end(),
583
91.1k
            [](const JobContext::CandidateFileInfo& lhs,
584
8.17M
               const JobContext::CandidateFileInfo& rhs) {
585
8.17M
              if (lhs.file_name < rhs.file_name) {
586
6.37M
                return true;
587
6.37M
              } else if (lhs.file_name > rhs.file_name) {
588
1.75M
                return false;
589
1.75M
              } else {
590
43.7k
                return (lhs.file_path < rhs.file_path);
591
43.7k
              }
592
8.17M
            });
593
91.1k
  candidate_files.erase(
594
91.1k
      std::unique(candidate_files.begin(), candidate_files.end()),
595
91.1k
      candidate_files.end());
596
597
91.1k
  if (state.prev_wals_total_size > 0) {
598
1.83k
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
599
1.83k
                   "[JOB %d] Try to delete WAL files size %" PRIu64
600
1.83k
                   ", prev total WAL file size %" PRIu64
601
1.83k
                   ", number of live WAL files %" ROCKSDB_PRIszt ".\n",
602
1.83k
                   state.job_id, state.size_log_to_delete,
603
1.83k
                   state.prev_wals_total_size, state.num_alive_wal_files);
604
1.83k
  }
605
606
91.1k
  std::vector<std::string> old_info_log_files;
607
91.1k
  InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
608
91.1k
                                dbname_);
609
610
  // File numbers of most recent two OPTIONS file in candidate_files (found in
611
  // previous FindObsoleteFiles(full_scan=true))
612
  // At this point, there must not be any duplicate file numbers in
613
  // candidate_files.
614
91.1k
  uint64_t optsfile_num1 = std::numeric_limits<uint64_t>::min();
615
91.1k
  uint64_t optsfile_num2 = std::numeric_limits<uint64_t>::min();
616
1.56M
  for (const auto& candidate_file : candidate_files) {
617
1.56M
    const std::string& fname = candidate_file.file_name;
618
1.56M
    uint64_t number;
619
1.56M
    FileType type;
620
1.56M
    if (!ParseFileName(fname, &number, info_log_prefix.prefix, &type) ||
621
1.56M
        type != kOptionsFile) {
622
1.40M
      continue;
623
1.40M
    }
624
155k
    if (number > optsfile_num1) {
625
155k
      optsfile_num2 = optsfile_num1;
626
155k
      optsfile_num1 = number;
627
155k
    } else if (number > optsfile_num2) {
628
0
      optsfile_num2 = number;
629
0
    }
630
155k
  }
631
632
  // For remote compactions, we need to keep OPTIONS file that may get
633
  // referenced by the remote worker
634
635
91.1k
  optsfile_num2 = std::min(optsfile_num2, state.min_options_file_number);
636
637
  // Close WALs before trying to delete them.
638
91.1k
  for (const auto w : state.wals_to_free) {
639
    // TODO: maybe check the return value of Close.
640
    // TODO: plumb Env::IOActivity, Env::IOPriority
641
1.83k
    auto s = w->Close({});
642
1.83k
    s.PermitUncheckedError();
643
1.83k
  }
644
645
91.1k
  bool own_files = OwnTablesAndLogs();
646
91.1k
  std::unordered_set<uint64_t> files_to_del;
647
1.56M
  for (const auto& candidate_file : candidate_files) {
648
1.56M
    const std::string& to_delete = candidate_file.file_name;
649
1.56M
    uint64_t number;
650
1.56M
    FileType type;
651
    // Ignore file if we cannot recognize it.
652
1.56M
    if (!ParseFileName(to_delete, &number, info_log_prefix.prefix, &type)) {
653
0
      continue;
654
0
    }
655
656
1.56M
    if (quarantine_files_set.find(number) != quarantine_files_set.end()) {
657
0
      continue;
658
0
    }
659
660
1.56M
    bool keep = true;
661
1.56M
    switch (type) {
662
121k
      case kWalFile:
663
121k
        keep = ((number >= state.log_number) ||
664
36.6k
                (number == state.prev_log_number) ||
665
36.6k
                (wal_recycle_files_set.find(number) !=
666
36.6k
                 wal_recycle_files_set.end()));
667
121k
        break;
668
126k
      case kDescriptorFile:
669
        // Keep my manifest file, and any newer incarnations'
670
        // (can happen during manifest roll)
671
126k
        keep = (number >= state.manifest_file_number);
672
126k
        break;
673
163k
      case kTableFile:
674
        // If the second condition is not there, this makes
675
        // DontDeletePendingOutputs fail
676
        // FIXME: but should NOT keep if it came from sst_delete_files?
677
163k
        keep = (sst_live_set.find(number) != sst_live_set.end()) ||
678
6.14k
               number >= state.min_pending_output;
679
163k
        if (!keep) {
680
          // NOTE: sometimes redundant (if came from sst_delete_files)
681
          // We don't know which column family is applicable here so we don't
682
          // know what uncache_aggressiveness would be used with
683
          // ReleaseObsolete(). Anyway, obsolete files ideally go into
684
          // sst_delete_files for better/quicker handling, and this is just a
685
          // backstop.
686
6.14k
          TableCache::Evict(table_cache_.get(), number);
687
6.14k
          files_to_del.insert(number);
688
6.14k
        }
689
163k
        break;
690
0
      case kBlobFile: {
691
0
        keep = number >= state.min_pending_output ||
692
0
               number >= state.min_blob_file_number_to_keep ||
693
0
               (blob_live_set.find(number) != blob_live_set.end()) ||
694
0
               (state.active_blob_direct_write_files.find(number) !=
695
0
                state.active_blob_direct_write_files.end());
696
0
        if (!keep) {
697
0
          const std::string blob_file_path =
698
0
              BlobFileName(candidate_file.file_path, number);
699
0
          if (ShouldKeepBlobFileDuringPurge(number, blob_file_path)) {
700
0
            keep = true;
701
0
            break;
702
0
          }
703
0
          TableCache::Evict(table_cache_.get(), number);
704
0
          files_to_del.insert(number);
705
0
        }
706
0
        break;
707
0
      }
708
0
      case kTempFile:
709
        // Any temp files that are currently being written to must
710
        // be recorded in pending_outputs_, which is inserted into "live".
711
        // Also, SetCurrentFile creates a temp file when writing out new
712
        // manifest, which is equal to state.pending_manifest_file_number. We
713
        // should not delete that file
714
        //
715
        // TODO(yhchiang): carefully modify the third condition to safely
716
        //                 remove the temp options files.
717
0
        keep = (sst_live_set.find(number) != sst_live_set.end()) ||
718
0
               (blob_live_set.find(number) != blob_live_set.end()) ||
719
0
               (number == state.pending_manifest_file_number) ||
720
0
               (to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
721
0
        break;
722
742k
      case kInfoLogFile:
723
742k
        keep = true;
724
742k
        if (number != 0) {
725
658k
          old_info_log_files.push_back(to_delete);
726
658k
        }
727
742k
        break;
728
155k
      case kOptionsFile:
729
155k
        keep = (number >= optsfile_num2);
730
155k
        break;
731
0
      case kCompactionProgressFile:
732
        // Keep compaction progress files - they are managed
733
        // separately by DBImplSecondary for now
734
0
        keep = true;
735
0
        break;
736
84.4k
      case kCurrentFile:
737
168k
      case kDBLockFile:
738
253k
      case kIdentityFile:
739
253k
      case kMetaDatabase:
740
253k
        keep = true;
741
253k
        break;
742
1.56M
    }
743
744
1.56M
    if (keep) {
745
1.47M
      continue;
746
1.47M
    }
747
748
84.9k
    std::string fname;
749
84.9k
    std::string dir_to_sync;
750
84.9k
    if (type == kTableFile) {
751
6.14k
      fname = MakeTableFileName(candidate_file.file_path, number);
752
6.14k
      dir_to_sync = candidate_file.file_path;
753
78.8k
    } else if (type == kBlobFile) {
754
0
      fname = BlobFileName(candidate_file.file_path, number);
755
0
      dir_to_sync = candidate_file.file_path;
756
78.8k
    } else {
757
78.8k
      dir_to_sync = (type == kWalFile) ? wal_dir : dbname_;
758
78.8k
      fname = dir_to_sync +
759
78.8k
              ((!dir_to_sync.empty() && dir_to_sync.back() == '/') ||
760
78.8k
                       (!to_delete.empty() && to_delete.front() == '/')
761
78.8k
                   ? ""
762
78.8k
                   : "/") +
763
78.8k
              to_delete;
764
78.8k
    }
765
766
84.9k
    if (type == kWalFile && (immutable_db_options_.WAL_ttl_seconds > 0 ||
767
36.6k
                             immutable_db_options_.WAL_size_limit_MB > 0)) {
768
0
      wal_manager_.ArchiveWALFile(fname, number);
769
0
      continue;
770
0
    }
771
772
    // If I do not own these files, e.g. secondary instance with max_open_files
773
    // = -1, then no need to delete or schedule delete these files since they
774
    // will be removed by their owner, e.g. the primary instance.
775
84.9k
    if (!own_files) {
776
0
      continue;
777
0
    }
778
84.9k
    if (schedule_only) {
779
0
      InstrumentedMutexLock guard_lock(&mutex_);
780
0
      SchedulePendingPurge(fname, dir_to_sync, type, number, state.job_id);
781
84.9k
    } else {
782
84.9k
      DeleteObsoleteFileImpl(state.job_id, fname, dir_to_sync, type, number);
783
84.9k
    }
784
84.9k
  }
785
786
91.1k
  {
787
    // After purging obsolete files, remove them from files_grabbed_for_purge_.
788
91.1k
    InstrumentedMutexLock guard_lock(&mutex_);
789
91.1k
    autovector<uint64_t> to_be_removed;
790
91.1k
    for (auto fn : files_grabbed_for_purge_) {
791
16.5k
      if (files_to_del.count(fn) != 0) {
792
5.57k
        to_be_removed.emplace_back(fn);
793
5.57k
      }
794
16.5k
    }
795
91.1k
    for (auto fn : to_be_removed) {
796
5.57k
      files_grabbed_for_purge_.erase(fn);
797
5.57k
    }
798
91.1k
  }
799
800
  // Delete old info log files.
801
91.1k
  size_t old_info_log_file_count = old_info_log_files.size();
802
91.1k
  if (old_info_log_file_count != 0 &&
803
69.5k
      old_info_log_file_count >= immutable_db_options_.keep_log_file_num) {
804
0
    std::sort(old_info_log_files.begin(), old_info_log_files.end());
805
0
    size_t end =
806
0
        old_info_log_file_count - immutable_db_options_.keep_log_file_num;
807
0
    for (unsigned int i = 0; i <= end; i++) {
808
0
      std::string& to_delete = old_info_log_files.at(i);
809
0
      std::string full_path_to_delete =
810
0
          (immutable_db_options_.db_log_dir.empty()
811
0
               ? dbname_
812
0
               : immutable_db_options_.db_log_dir) +
813
0
          "/" + to_delete;
814
0
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
815
0
                     "[JOB %d] Delete info log file %s\n", state.job_id,
816
0
                     full_path_to_delete.c_str());
817
0
      Status s = env_->DeleteFile(full_path_to_delete);
818
0
      if (!s.ok()) {
819
0
        if (env_->FileExists(full_path_to_delete).IsNotFound()) {
820
0
          ROCKS_LOG_INFO(
821
0
              immutable_db_options_.info_log,
822
0
              "[JOB %d] Tried to delete non-existing info log file %s FAILED "
823
0
              "-- %s\n",
824
0
              state.job_id, to_delete.c_str(), s.ToString().c_str());
825
0
        } else {
826
0
          ROCKS_LOG_ERROR(immutable_db_options_.info_log,
827
0
                          "[JOB %d] Delete info log file %s FAILED -- %s\n",
828
0
                          state.job_id, to_delete.c_str(),
829
0
                          s.ToString().c_str());
830
0
        }
831
0
      }
832
0
    }
833
0
  }
834
91.1k
  wal_manager_.PurgeObsoleteWALFiles();
835
91.1k
  LogFlush(immutable_db_options_.info_log);
836
91.1k
  InstrumentedMutexLock l(&mutex_);
837
91.1k
  --pending_purge_obsolete_files_;
838
91.1k
  assert(pending_purge_obsolete_files_ >= 0);
839
91.1k
  if (schedule_only) {
840
    // Must change from pending_purge_obsolete_files_ to bg_purge_scheduled_
841
    // while holding mutex (for GetSortedWalFiles() etc.)
842
0
    SchedulePurge();
843
0
  }
844
91.1k
  if (pending_purge_obsolete_files_ == 0) {
845
91.1k
    bg_cv_.SignalAll();
846
91.1k
  }
847
91.1k
  TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:End");
848
91.1k
}
849
850
42.2k
void DBImpl::DeleteObsoleteFiles() {
851
42.2k
  mutex_.AssertHeld();
852
42.2k
  JobContext job_context(next_job_id_.fetch_add(1));
853
42.2k
  FindObsoleteFiles(&job_context, true);
854
855
42.2k
  mutex_.Unlock();
856
42.2k
  if (job_context.HaveSomethingToDelete()) {
857
42.2k
    bool defer_purge = immutable_db_options_.avoid_unnecessary_blocking_io;
858
42.2k
    PurgeObsoleteFiles(job_context, defer_purge);
859
42.2k
  }
860
42.2k
  job_context.Clean();
861
42.2k
  mutex_.Lock();
862
42.2k
}
863
864
bool DBImpl::ShouldKeepBlobFileDuringPurge(uint64_t number,
865
0
                                           const std::string& blob_file_path) {
866
0
  {
867
0
    InstrumentedMutexLock lock(&mutex_);
868
0
    for (auto* cfd : *versions_->GetColumnFamilySet()) {
869
0
      auto* mgr = cfd->blob_partition_manager();
870
0
      if (mgr != nullptr && mgr->IsTrackedBlobFileNumber(number)) {
871
0
        return true;
872
0
      }
873
0
    }
874
0
  }
875
876
0
  return ShouldKeepFooterlessBlobFile(fs_.get(), file_options_, blob_file_path);
877
0
}
878
879
VersionEdit GetDBRecoveryEditForObsoletingMemTables(
880
    VersionSet* vset, const ColumnFamilyData& cfd,
881
    const autovector<VersionEdit*>& edit_list,
882
    const autovector<ReadOnlyMemTable*>& memtables,
883
1.83k
    LogsWithPrepTracker* prep_tracker) {
884
1.83k
  VersionEdit wal_deletion_edit;
885
1.83k
  uint64_t min_wal_number_to_keep = 0;
886
1.83k
  assert(edit_list.size() > 0);
887
1.83k
  if (vset->db_options()->allow_2pc) {
888
    // Note that if mempurge is successful, the edit_list will
889
    // not be applicable (contains info of new min_log number to keep,
890
    // and level 0 file path of SST file created during normal flush,
891
    // so both pieces of information are irrelevant after a successful
892
    // mempurge operation).
893
0
    min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
894
0
        vset, cfd, edit_list, memtables, prep_tracker);
895
896
    // We piggyback the information of earliest log file to keep in the
897
    // manifest entry for the last file flushed.
898
1.83k
  } else {
899
1.83k
    min_wal_number_to_keep =
900
1.83k
        PrecomputeMinLogNumberToKeepNon2PC(vset, cfd, edit_list);
901
1.83k
  }
902
903
1.83k
  wal_deletion_edit.SetMinLogNumberToKeep(min_wal_number_to_keep);
904
1.83k
  if (vset->db_options()->track_and_verify_wals_in_manifest) {
905
0
    if (min_wal_number_to_keep > vset->GetWalSet().GetMinWalNumberToKeep()) {
906
0
      wal_deletion_edit.DeleteWalsBefore(min_wal_number_to_keep);
907
0
    }
908
0
  }
909
1.83k
  return wal_deletion_edit;
910
1.83k
}
911
912
uint64_t FindMinPrepLogReferencedByMemTable(
913
0
    VersionSet* vset, const autovector<ReadOnlyMemTable*>& memtables_to_flush) {
914
0
  uint64_t min_log = 0;
915
916
  // we must look through the memtables for two phase transactions
917
  // that have been committed but not yet flushed
918
0
  std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set(
919
0
      memtables_to_flush.begin(), memtables_to_flush.end());
920
0
  for (auto loop_cfd : *vset->GetColumnFamilySet()) {
921
0
    if (loop_cfd->IsDropped()) {
922
0
      continue;
923
0
    }
924
925
0
    auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
926
0
        &memtables_to_flush_set);
927
928
0
    if (log > 0 && (min_log == 0 || log < min_log)) {
929
0
      min_log = log;
930
0
    }
931
932
0
    log = loop_cfd->mem()->GetMinLogContainingPrepSection();
933
934
0
    if (log > 0 && (min_log == 0 || log < min_log)) {
935
0
      min_log = log;
936
0
    }
937
0
  }
938
939
0
  return min_log;
940
0
}
941
942
uint64_t FindMinPrepLogReferencedByMemTable(
943
    VersionSet* vset, const autovector<const autovector<ReadOnlyMemTable*>*>&
944
0
                          memtables_to_flush) {
945
0
  uint64_t min_log = 0;
946
947
0
  std::unordered_set<ReadOnlyMemTable*> memtables_to_flush_set;
948
0
  for (const autovector<ReadOnlyMemTable*>* memtables : memtables_to_flush) {
949
0
    memtables_to_flush_set.insert(memtables->begin(), memtables->end());
950
0
  }
951
0
  for (auto loop_cfd : *vset->GetColumnFamilySet()) {
952
0
    if (loop_cfd->IsDropped()) {
953
0
      continue;
954
0
    }
955
956
0
    auto log = loop_cfd->imm()->PrecomputeMinLogContainingPrepSection(
957
0
        &memtables_to_flush_set);
958
0
    if (log > 0 && (min_log == 0 || log < min_log)) {
959
0
      min_log = log;
960
0
    }
961
962
0
    log = loop_cfd->mem()->GetMinLogContainingPrepSection();
963
0
    if (log > 0 && (min_log == 0 || log < min_log)) {
964
0
      min_log = log;
965
0
    }
966
0
  }
967
968
0
  return min_log;
969
0
}
970
971
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
972
    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
973
1.83k
    const autovector<VersionEdit*>& edit_list) {
974
1.83k
  assert(vset != nullptr);
975
976
  // Precompute the min log number containing unflushed data for the column
977
  // family being flushed (`cfd_to_flush`).
978
1.83k
  uint64_t cf_min_log_number_to_keep = 0;
979
1.83k
  for (auto& e : edit_list) {
980
1.83k
    if (e->HasLogNumber()) {
981
1.83k
      cf_min_log_number_to_keep =
982
1.83k
          std::max(cf_min_log_number_to_keep, e->GetLogNumber());
983
1.83k
    }
984
1.83k
  }
985
1.83k
  if (cf_min_log_number_to_keep == 0) {
986
    // No version edit contains information on log number. The log number
987
    // for this column family should stay the same as it is.
988
0
    cf_min_log_number_to_keep = cfd_to_flush.GetLogNumber();
989
0
  }
990
991
  // Get min log number containing unflushed data for other column families.
992
1.83k
  uint64_t min_log_number_to_keep =
993
1.83k
      vset->PreComputeMinLogNumberWithUnflushedData(&cfd_to_flush);
994
1.83k
  if (cf_min_log_number_to_keep != 0) {
995
1.83k
    min_log_number_to_keep =
996
1.83k
        std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
997
1.83k
  }
998
1.83k
  return min_log_number_to_keep;
999
1.83k
}
1000
1001
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
1002
    VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
1003
0
    const autovector<autovector<VersionEdit*>>& edit_lists) {
1004
0
  assert(vset != nullptr);
1005
0
  assert(!cfds_to_flush.empty());
1006
0
  assert(cfds_to_flush.size() == edit_lists.size());
1007
1008
0
  uint64_t min_log_number_to_keep = std::numeric_limits<uint64_t>::max();
1009
0
  for (const auto& edit_list : edit_lists) {
1010
0
    uint64_t log = 0;
1011
0
    for (const auto& e : edit_list) {
1012
0
      if (e->HasLogNumber()) {
1013
0
        log = std::max(log, e->GetLogNumber());
1014
0
      }
1015
0
    }
1016
0
    if (log != 0) {
1017
0
      min_log_number_to_keep = std::min(min_log_number_to_keep, log);
1018
0
    }
1019
0
  }
1020
0
  if (min_log_number_to_keep == std::numeric_limits<uint64_t>::max()) {
1021
0
    min_log_number_to_keep = cfds_to_flush[0]->GetLogNumber();
1022
0
    for (size_t i = 1; i < cfds_to_flush.size(); i++) {
1023
0
      min_log_number_to_keep =
1024
0
          std::min(min_log_number_to_keep, cfds_to_flush[i]->GetLogNumber());
1025
0
    }
1026
0
  }
1027
1028
0
  std::unordered_set<const ColumnFamilyData*> flushed_cfds(
1029
0
      cfds_to_flush.begin(), cfds_to_flush.end());
1030
0
  min_log_number_to_keep =
1031
0
      std::min(min_log_number_to_keep,
1032
0
               vset->PreComputeMinLogNumberWithUnflushedData(flushed_cfds));
1033
1034
0
  return min_log_number_to_keep;
1035
0
}
1036
1037
uint64_t PrecomputeMinLogNumberToKeep2PC(
1038
    VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
1039
    const autovector<VersionEdit*>& edit_list,
1040
    const autovector<ReadOnlyMemTable*>& memtables_to_flush,
1041
0
    LogsWithPrepTracker* prep_tracker) {
1042
0
  assert(vset != nullptr);
1043
0
  assert(prep_tracker != nullptr);
1044
  // Calculate updated min_log_number_to_keep
1045
  // Since the function should only be called in 2pc mode, log number in
1046
  // the version edit should be sufficient.
1047
1048
0
  uint64_t min_log_number_to_keep =
1049
0
      PrecomputeMinLogNumberToKeepNon2PC(vset, cfd_to_flush, edit_list);
1050
1051
  // if are 2pc we must consider logs containing prepared
1052
  // sections of outstanding transactions.
1053
  //
1054
  // We must check min logs with outstanding prep before we check
1055
  // logs references by memtables because a log referenced by the
1056
  // first data structure could transition to the second under us.
1057
  //
1058
  // TODO: iterating over all column families under db mutex.
1059
  // should find more optimal solution
1060
0
  auto min_log_in_prep_heap =
1061
0
      prep_tracker->FindMinLogContainingOutstandingPrep();
1062
1063
0
  if (min_log_in_prep_heap != 0 &&
1064
0
      min_log_in_prep_heap < min_log_number_to_keep) {
1065
0
    min_log_number_to_keep = min_log_in_prep_heap;
1066
0
  }
1067
1068
0
  uint64_t min_log_refed_by_mem =
1069
0
      FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush);
1070
1071
0
  if (min_log_refed_by_mem != 0 &&
1072
0
      min_log_refed_by_mem < min_log_number_to_keep) {
1073
0
    min_log_number_to_keep = min_log_refed_by_mem;
1074
0
  }
1075
0
  return min_log_number_to_keep;
1076
0
}
1077
1078
uint64_t PrecomputeMinLogNumberToKeep2PC(
1079
    VersionSet* vset, const autovector<ColumnFamilyData*>& cfds_to_flush,
1080
    const autovector<autovector<VersionEdit*>>& edit_lists,
1081
    const autovector<const autovector<ReadOnlyMemTable*>*>& memtables_to_flush,
1082
0
    LogsWithPrepTracker* prep_tracker) {
1083
0
  assert(vset != nullptr);
1084
0
  assert(prep_tracker != nullptr);
1085
0
  assert(cfds_to_flush.size() == edit_lists.size());
1086
0
  assert(cfds_to_flush.size() == memtables_to_flush.size());
1087
1088
0
  uint64_t min_log_number_to_keep =
1089
0
      PrecomputeMinLogNumberToKeepNon2PC(vset, cfds_to_flush, edit_lists);
1090
1091
0
  uint64_t min_log_in_prep_heap =
1092
0
      prep_tracker->FindMinLogContainingOutstandingPrep();
1093
1094
0
  if (min_log_in_prep_heap != 0 &&
1095
0
      min_log_in_prep_heap < min_log_number_to_keep) {
1096
0
    min_log_number_to_keep = min_log_in_prep_heap;
1097
0
  }
1098
1099
0
  uint64_t min_log_refed_by_mem =
1100
0
      FindMinPrepLogReferencedByMemTable(vset, memtables_to_flush);
1101
1102
0
  if (min_log_refed_by_mem != 0 &&
1103
0
      min_log_refed_by_mem < min_log_number_to_keep) {
1104
0
    min_log_number_to_keep = min_log_refed_by_mem;
1105
0
  }
1106
1107
0
  return min_log_number_to_keep;
1108
0
}
1109
1110
void DBImpl::SetDBId(std::string&& id, bool read_only,
1111
7.43k
                     VersionEdit* version_edit) {
1112
7.43k
  assert(db_id_.empty());
1113
7.43k
  assert(!id.empty());
1114
7.43k
  db_id_ = std::move(id);
1115
7.43k
  if (!read_only && version_edit) {
1116
7.43k
    assert(version_edit != nullptr);
1117
7.43k
    assert(versions_->GetColumnFamilySet() != nullptr);
1118
7.43k
    version_edit->SetDBId(db_id_);
1119
7.43k
    versions_->db_id_ = db_id_;
1120
7.43k
  }
1121
7.43k
}
1122
1123
Status DBImpl::SetupDBId(const WriteOptions& write_options, bool read_only,
1124
                         bool is_new_db, bool is_retry,
1125
42.2k
                         VersionEdit* version_edit) {
1126
42.2k
  Status s;
1127
42.2k
  if (!is_new_db) {
1128
    // Check for the IDENTITY file and create it if not there or
1129
    // broken or not matching manifest
1130
34.7k
    std::string db_id_in_file;
1131
34.7k
    s = fs_->FileExists(IdentityFileName(dbname_), IOOptions(), nullptr);
1132
34.7k
    if (s.ok()) {
1133
34.7k
      IOOptions opts;
1134
34.7k
      if (is_retry) {
1135
0
        opts.verify_and_reconstruct_read = true;
1136
0
      }
1137
34.7k
      s = GetDbIdentityFromIdentityFile(opts, &db_id_in_file);
1138
34.7k
      if (s.ok() && !db_id_in_file.empty()) {
1139
34.7k
        if (db_id_.empty()) {
1140
          // Loaded from file and wasn't already known from manifest
1141
0
          SetDBId(std::move(db_id_in_file), read_only, version_edit);
1142
0
          return s;
1143
34.7k
        } else if (db_id_ == db_id_in_file) {
1144
          // Loaded from file and matches manifest
1145
34.7k
          return s;
1146
34.7k
        }
1147
34.7k
      }
1148
34.7k
    }
1149
0
    if (s.IsNotFound()) {
1150
0
      s = Status::OK();
1151
0
    }
1152
0
    if (!s.ok()) {
1153
0
      assert(s.IsIOError());
1154
0
      return s;
1155
0
    }
1156
0
  }
1157
  // Otherwise IDENTITY file is missing or no good.
1158
  // Generate new id if needed
1159
7.43k
  if (db_id_.empty()) {
1160
7.43k
    SetDBId(env_->GenerateUniqueId(), read_only, version_edit);
1161
7.43k
  }
1162
  // Persist it to IDENTITY file if allowed
1163
7.43k
  if (!read_only && immutable_db_options_.write_identity_file) {
1164
7.43k
    s = SetIdentityFile(write_options, env_, dbname_,
1165
7.43k
                        immutable_db_options_.metadata_write_temperature,
1166
7.43k
                        db_id_);
1167
7.43k
  }
1168
  // NOTE: an obsolete IDENTITY file with write_identity_file=false is handled
1169
  // elsewhere, so that it's only deleted after successful recovery
1170
7.43k
  return s;
1171
42.2k
}
1172
1173
168k
std::set<std::string> DBImpl::CollectAllDBPaths() {
1174
168k
  std::set<std::string> all_db_paths;
1175
168k
  all_db_paths.insert(NormalizePath(dbname_));
1176
168k
  for (const auto& db_path : immutable_db_options_.db_paths) {
1177
168k
    all_db_paths.insert(NormalizePath(db_path.path));
1178
168k
  }
1179
229k
  for (const auto* cfd : *versions_->GetColumnFamilySet()) {
1180
229k
    for (const auto& cf_path : cfd->ioptions().cf_paths) {
1181
229k
      all_db_paths.insert(NormalizePath(cf_path.path));
1182
229k
    }
1183
229k
  }
1184
168k
  return all_db_paths;
1185
168k
}
1186
1187
42.2k
Status DBImpl::MaybeUpdateNextFileNumber(RecoveryContext* recovery_ctx) {
1188
42.2k
  mutex_.AssertHeld();
1189
42.2k
  uint64_t next_file_number = versions_->current_next_file_number();
1190
42.2k
  uint64_t largest_file_number = next_file_number;
1191
42.2k
  Status s;
1192
42.2k
  for (const auto& path : CollectAllDBPaths()) {
1193
42.2k
    std::vector<std::string> files;
1194
42.2k
    s = env_->GetChildren(path, &files);
1195
42.2k
    if (!s.ok()) {
1196
0
      break;
1197
0
    }
1198
706k
    for (const auto& fname : files) {
1199
706k
      uint64_t number = 0;
1200
706k
      FileType type;
1201
706k
      if (!ParseFileName(fname, &number, &type)) {
1202
371k
        continue;
1203
371k
      }
1204
335k
      const std::string normalized_fpath = path + kFilePathSeparator + fname;
1205
335k
      largest_file_number = std::max(largest_file_number, number);
1206
335k
      if ((type == kTableFile || type == kBlobFile)) {
1207
67.5k
        recovery_ctx->existing_data_files_.push_back(normalized_fpath);
1208
67.5k
      }
1209
335k
    }
1210
42.2k
  }
1211
42.2k
  if (!s.ok()) {
1212
0
    return s;
1213
0
  }
1214
1215
42.2k
  if (largest_file_number >= next_file_number) {
1216
42.2k
    versions_->next_file_number_.store(largest_file_number + 1);
1217
42.2k
  }
1218
1219
42.2k
  VersionEdit edit;
1220
42.2k
  edit.SetNextFile(versions_->next_file_number_.load());
1221
42.2k
  assert(versions_->GetColumnFamilySet());
1222
42.2k
  ColumnFamilyData* default_cfd = versions_->GetColumnFamilySet()->GetDefault();
1223
  assert(default_cfd);
1224
42.2k
  recovery_ctx->UpdateVersionEdits(default_cfd, edit);
1225
42.2k
  return s;
1226
42.2k
}
1227
}  // namespace ROCKSDB_NAMESPACE