Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/db_impl/db_impl_open.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#include <cinttypes>
10
11
#include "db/blob/blob_file_partition_manager.h"
12
#include "db/builder.h"
13
#include "db/db_impl/db_impl.h"
14
#include "db/error_handler.h"
15
#include "db/periodic_task_scheduler.h"
16
#include "db/version_util.h"
17
#include "env/composite_env_wrapper.h"
18
#include "file/filename.h"
19
#include "file/read_write_util.h"
20
#include "file/sst_file_manager_impl.h"
21
#include "file/writable_file_writer.h"
22
#include "logging/logging.h"
23
#include "monitoring/persistent_stats_history.h"
24
#include "monitoring/thread_status_util.h"
25
#include "options/options_helper.h"
26
#include "rocksdb/options.h"
27
#include "rocksdb/table.h"
28
#include "rocksdb/wal_filter.h"
29
#include "test_util/sync_point.h"
30
#include "util/rate_limiter_impl.h"
31
#include "util/string_util.h"
32
#include "util/udt_util.h"
33
34
namespace ROCKSDB_NAMESPACE {
35
Options SanitizeOptions(const std::string& dbname, const Options& src,
36
7.43k
                        bool read_only, Status* logger_creation_s) {
37
7.43k
  auto db_options =
38
7.43k
      SanitizeOptions(dbname, DBOptions(src), read_only, logger_creation_s);
39
7.43k
  ImmutableDBOptions immutable_db_options(db_options);
40
7.43k
  auto cf_options = SanitizeCfOptions(immutable_db_options, read_only,
41
7.43k
                                      ColumnFamilyOptions(src));
42
7.43k
  return Options(db_options, cf_options);
43
7.43k
}
44
45
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
46
49.6k
                          bool read_only, Status* logger_creation_s) {
47
49.6k
  DBOptions result(src);
48
49
49.6k
  if (result.env == nullptr) {
50
0
    result.env = Env::Default();
51
0
  }
52
53
  // result.max_open_files means an "infinite" open files.
54
49.6k
  if (result.max_open_files != -1) {
55
0
    int max_max_open_files = port::GetMaxOpenFiles();
56
0
    if (max_max_open_files == -1) {
57
0
      max_max_open_files = 0x400000;
58
0
    }
59
0
    ClipToRange(&result.max_open_files, 20, max_max_open_files);
60
0
    TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
61
0
                             &result.max_open_files);
62
0
  }
63
64
49.6k
  if (result.info_log == nullptr && !read_only) {
65
49.6k
    Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
66
49.6k
    if (!s.ok()) {
67
      // No place suitable for logging
68
0
      result.info_log = nullptr;
69
0
      if (logger_creation_s) {
70
0
        *logger_creation_s = s;
71
0
      }
72
0
    }
73
49.6k
  }
74
75
49.6k
  if (!result.write_buffer_manager) {
76
49.6k
    result.write_buffer_manager.reset(
77
49.6k
        new WriteBufferManager(result.db_write_buffer_size));
78
49.6k
  }
79
49.6k
  auto bg_job_limits = DBImpl::GetBGJobLimits(
80
49.6k
      result.max_background_flushes, result.max_background_compactions,
81
49.6k
      result.max_background_jobs, true /* parallelize_compactions */);
82
49.6k
  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
83
49.6k
                                           Env::Priority::LOW);
84
49.6k
  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
85
49.6k
                                           Env::Priority::HIGH);
86
87
49.6k
  if (result.rate_limiter.get() != nullptr) {
88
0
    if (result.bytes_per_sync == 0) {
89
0
      result.bytes_per_sync = 1024 * 1024;
90
0
    }
91
0
  }
92
93
49.6k
  if (result.delayed_write_rate == 0) {
94
49.6k
    if (result.rate_limiter.get() != nullptr) {
95
0
      result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
96
0
    }
97
49.6k
    if (result.delayed_write_rate == 0) {
98
49.6k
      result.delayed_write_rate = 16 * 1024 * 1024;
99
49.6k
    }
100
49.6k
  }
101
102
49.6k
  if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
103
0
    result.recycle_log_file_num = false;
104
0
  }
105
106
49.6k
  if (result.recycle_log_file_num &&
107
0
      (result.wal_recovery_mode ==
108
0
           WALRecoveryMode::kTolerateCorruptedTailRecords ||
109
0
       result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
110
    // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
111
    //   feature. WAL recycling expects recovery success upon encountering a
112
    //   corrupt record at the point where new data ends and recycled data
113
    //   remains at the tail. However, `kTolerateCorruptedTailRecords` must fail
114
    //   upon encountering any such corrupt record, as it cannot differentiate
115
    //   between this and a real corruption, which would cause committed updates
116
    //   to be truncated -- a violation of the recovery guarantee.
117
    // - kPointInTimeRecovery and kAbsoluteConsistency are incompatible with
118
    //   recycle log file feature temporarily due to a bug found introducing a
119
    //   hole in the recovered data
120
    //   (https://github.com/facebook/rocksdb/pull/7252#issuecomment-673766236).
121
    //   Besides this bug, we believe the features are fundamentally compatible.
122
0
    result.recycle_log_file_num = 0;
123
0
  }
124
125
49.6k
  if (result.db_paths.size() == 0) {
126
49.6k
    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
127
49.6k
  } else if (result.wal_dir.empty()) {
128
    // Use dbname as default
129
0
    result.wal_dir = dbname;
130
0
  }
131
49.6k
  if (!result.wal_dir.empty()) {
132
    // If there is a wal_dir already set, check to see if the wal_dir is the
133
    // same as the dbname AND the same as the db_path[0] (which must exist from
134
    // a few lines ago). If the wal_dir matches both of these values, then clear
135
    // the wal_dir value, which will make wal_dir == dbname.  Most likely this
136
    // condition was the result of reading an old options file where we forced
137
    // wal_dir to be set (to dbname).
138
0
    auto npath = NormalizePath(dbname + "/");
139
0
    if (npath == NormalizePath(result.wal_dir + "/") &&
140
0
        npath == NormalizePath(result.db_paths[0].path + "/")) {
141
0
      result.wal_dir.clear();
142
0
    }
143
0
  }
144
145
49.6k
  if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
146
0
    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
147
0
  }
148
149
  // Force flush on DB open if 2PC is enabled, since with 2PC we have no
150
  // guarantee that consecutive log files have consecutive sequence id, which
151
  // make recovery complicated.
152
49.6k
  if (result.allow_2pc) {
153
0
    result.avoid_flush_during_recovery = false;
154
0
  }
155
156
49.6k
  ImmutableDBOptions immutable_db_options(result);
157
49.6k
  if (!immutable_db_options.IsWalDirSameAsDBPath()) {
158
    // Either the WAL dir and db_paths[0]/db_name are not the same, or we
159
    // cannot tell for sure. In either case, assume they're different and
160
    // explicitly cleanup the trash log files (bypass DeleteScheduler)
161
    // Do this first so even if we end up calling
162
    // DeleteScheduler::CleanupDirectory on the same dir later, it will be
163
    // safe
164
0
    std::vector<std::string> filenames;
165
0
    IOOptions io_opts;
166
0
    io_opts.do_not_recurse = true;
167
0
    auto wal_dir = immutable_db_options.GetWalDir();
168
0
    Status s = immutable_db_options.fs->GetChildren(
169
0
        wal_dir, io_opts, &filenames, /*IODebugContext*=*/nullptr);
170
0
    s.PermitUncheckedError();  //**TODO: What to do on error?
171
0
    for (std::string& filename : filenames) {
172
0
      if (filename.find(".log.trash", filename.length() -
173
0
                                          std::string(".log.trash").length()) !=
174
0
          std::string::npos) {
175
0
        std::string trash_file = wal_dir + "/" + filename;
176
0
        result.env->DeleteFile(trash_file).PermitUncheckedError();
177
0
      }
178
0
    }
179
0
  }
180
181
  // Create a default SstFileManager for purposes of tracking compaction size
182
  // and facilitating recovery from out of space errors.
183
49.6k
  if (result.sst_file_manager.get() == nullptr) {
184
49.6k
    std::shared_ptr<SstFileManager> sst_file_manager(
185
49.6k
        NewSstFileManager(result.env, result.info_log));
186
49.6k
    result.sst_file_manager = sst_file_manager;
187
49.6k
  }
188
189
  // Supported wal compression types
190
49.6k
  if (!StreamingCompressionTypeSupported(result.wal_compression)) {
191
0
    result.wal_compression = kNoCompression;
192
0
    ROCKS_LOG_WARN(result.info_log,
193
0
                   "wal_compression is disabled since only zstd is supported");
194
0
  }
195
196
49.6k
  return result;
197
49.6k
}
198
199
namespace {
200
Status ValidateOptionsByTable(
201
    const DBOptions& db_opts,
202
42.2k
    const std::vector<ColumnFamilyDescriptor>& column_families) {
203
42.2k
  Status s;
204
57.4k
  for (auto& cf : column_families) {
205
57.4k
    s = ValidateOptions(db_opts, cf.options);
206
57.4k
    if (!s.ok()) {
207
0
      return s;
208
0
    }
209
57.4k
  }
210
42.2k
  return Status::OK();
211
42.2k
}
212
}  // namespace
213
214
Status DBImpl::ValidateOptions(
215
    const DBOptions& db_options,
216
42.2k
    const std::vector<ColumnFamilyDescriptor>& column_families) {
217
42.2k
  Status s;
218
57.4k
  for (auto& cfd : column_families) {
219
57.4k
    s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
220
57.4k
    if (!s.ok()) {
221
0
      return s;
222
0
    }
223
57.4k
    if (cfd.name == kDefaultColumnFamilyName) {
224
42.2k
      if (cfd.options.disallow_memtable_writes) {
225
0
        return Status::InvalidArgument(
226
0
            "Default column family cannot use disallow_memtable_writes=true");
227
0
      }
228
42.2k
    }
229
57.4k
  }
230
42.2k
  s = ValidateOptions(db_options);
231
42.2k
  return s;
232
42.2k
}
233
234
42.2k
Status DBImpl::ValidateOptions(const DBOptions& db_options) {
235
42.2k
  if (db_options.db_paths.size() > 4) {
236
0
    return Status::NotSupported(
237
0
        "More than four DB paths are not supported yet. ");
238
0
  }
239
240
42.2k
  if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
241
    // Protect against assert in PosixMMapReadableFile constructor
242
0
    return Status::NotSupported(
243
0
        "If memory mapped reads (allow_mmap_reads) are enabled "
244
0
        "then direct I/O reads (use_direct_reads) must be disabled. ");
245
0
  }
246
247
42.2k
  if (db_options.allow_mmap_writes &&
248
0
      db_options.use_direct_io_for_flush_and_compaction) {
249
0
    return Status::NotSupported(
250
0
        "If memory mapped writes (allow_mmap_writes) are enabled "
251
0
        "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
252
0
        "be disabled. ");
253
0
  }
254
255
42.2k
  if (db_options.keep_log_file_num == 0) {
256
0
    return Status::InvalidArgument("keep_log_file_num must be greater than 0");
257
0
  }
258
259
42.2k
  if (db_options.unordered_write &&
260
0
      !db_options.allow_concurrent_memtable_write) {
261
0
    return Status::InvalidArgument(
262
0
        "unordered_write is incompatible with "
263
0
        "!allow_concurrent_memtable_write");
264
0
  }
265
266
42.2k
  if (db_options.unordered_write && db_options.enable_pipelined_write) {
267
0
    return Status::InvalidArgument(
268
0
        "unordered_write is incompatible with enable_pipelined_write");
269
0
  }
270
271
42.2k
  if (db_options.atomic_flush && db_options.enable_pipelined_write) {
272
0
    return Status::InvalidArgument(
273
0
        "atomic_flush is incompatible with enable_pipelined_write");
274
0
  }
275
276
42.2k
  if (db_options.use_direct_io_for_flush_and_compaction &&
277
0
      0 == db_options.writable_file_max_buffer_size) {
278
0
    return Status::InvalidArgument(
279
0
        "writes in direct IO require writable_file_max_buffer_size > 0");
280
0
  }
281
282
42.2k
  if (db_options.daily_offpeak_time_utc != "") {
283
0
    int start_time, end_time;
284
0
    if (!TryParseTimeRangeString(db_options.daily_offpeak_time_utc, start_time,
285
0
                                 end_time)) {
286
0
      return Status::InvalidArgument(
287
0
          "daily_offpeak_time_utc should be set in the format HH:mm-HH:mm "
288
0
          "(e.g. 04:30-07:30)");
289
0
    } else if (start_time == end_time) {
290
0
      return Status::InvalidArgument(
291
0
          "start_time and end_time cannot be the same");
292
0
    }
293
0
  }
294
295
42.2k
  if (!db_options.write_dbid_to_manifest && !db_options.write_identity_file) {
296
0
    return Status::InvalidArgument(
297
0
        "write_dbid_to_manifest and write_identity_file cannot both be false");
298
0
  }
299
42.2k
  return Status::OK();
300
42.2k
}
301
302
7.43k
Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
303
7.43k
  VersionEdit new_db_edit;
304
7.43k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
305
7.43k
  Status s = SetupDBId(write_options, /*read_only=*/false, /*is_new_db=*/true,
306
7.43k
                       /*is_retry=*/false, &new_db_edit);
307
7.43k
  if (!s.ok()) {
308
0
    return s;
309
0
  }
310
7.43k
  new_db_edit.SetLogNumber(0);
311
7.43k
  new_db_edit.SetNextFile(2);
312
7.43k
  new_db_edit.SetLastSequence(0);
313
314
7.43k
  ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
315
7.43k
  const std::string manifest = DescriptorFileName(dbname_, 1);
316
7.43k
  {
317
7.43k
    if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
318
0
      fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
319
0
    }
320
7.43k
    std::unique_ptr<FSWritableFile> file;
321
7.43k
    FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
322
    // DB option takes precedence when not kUnknown
323
7.43k
    if (immutable_db_options_.metadata_write_temperature !=
324
7.43k
        Temperature::kUnknown) {
325
0
      file_options.temperature =
326
0
          immutable_db_options_.metadata_write_temperature;
327
0
    }
328
7.43k
    s = NewWritableFile(fs_.get(), manifest, &file, file_options);
329
7.43k
    if (!s.ok()) {
330
0
      return s;
331
0
    }
332
7.43k
    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
333
7.43k
    file->SetPreallocationBlockSize(
334
7.43k
        mutable_db_options_.manifest_preallocation_size);
335
7.43k
    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
336
7.43k
        std::move(file), manifest, file_options, immutable_db_options_.clock,
337
7.43k
        io_tracer_, nullptr /* stats */,
338
7.43k
        Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
339
7.43k
        immutable_db_options_.listeners, nullptr,
340
7.43k
        tmp_set.Contains(FileType::kDescriptorFile),
341
7.43k
        tmp_set.Contains(FileType::kDescriptorFile)));
342
7.43k
    log::Writer log(std::move(file_writer), 0, false);
343
7.43k
    std::string record;
344
7.43k
    new_db_edit.EncodeTo(&record);
345
7.43k
    s = log.AddRecord(write_options, record);
346
7.43k
    if (s.ok()) {
347
7.43k
      s = SyncManifest(&immutable_db_options_, write_options, log.file());
348
7.43k
    }
349
7.43k
  }
350
7.43k
  if (s.ok()) {
351
    // Make "CURRENT" file that points to the new manifest file.
352
7.43k
    s = SetCurrentFile(write_options, fs_.get(), dbname_, 1,
353
7.43k
                       immutable_db_options_.metadata_write_temperature,
354
7.43k
                       directories_.GetDbDir());
355
7.43k
    if (new_filenames) {
356
7.43k
      new_filenames->emplace_back(
357
7.43k
          manifest.substr(manifest.find_last_of("/\\") + 1));
358
7.43k
    }
359
7.43k
  } else {
360
0
    fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
361
0
  }
362
7.43k
  return s;
363
7.43k
}
364
365
IOStatus DBImpl::CreateAndNewDirectory(
366
    FileSystem* fs, const std::string& dirname,
367
99.6k
    std::unique_ptr<FSDirectory>* directory) {
368
  // We call CreateDirIfMissing() as the directory may already exist (if we
369
  // are reopening a DB), when this happens we don't want creating the
370
  // directory to cause an error. However, we need to check if creating the
371
  // directory fails or else we may get an obscure message about the lock
372
  // file not existing. One real-world example of this occurring is if
373
  // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
374
  // when dbname_ is "dir/db" but when "dir" doesn't exist.
375
99.6k
  IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
376
99.6k
  if (!io_s.ok()) {
377
0
    return io_s;
378
0
  }
379
99.6k
  return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
380
99.6k
}
381
382
IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
383
                                     const std::string& wal_dir,
384
42.2k
                                     const std::vector<DbPath>& data_paths) {
385
42.2k
  IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
386
42.2k
  if (!io_s.ok()) {
387
0
    return io_s;
388
0
  }
389
42.2k
  if (!wal_dir.empty() && dbname != wal_dir) {
390
0
    io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
391
0
    if (!io_s.ok()) {
392
0
      return io_s;
393
0
    }
394
0
  }
395
396
42.2k
  data_dirs_.clear();
397
42.2k
  for (auto& p : data_paths) {
398
42.2k
    const std::string db_path = p.path;
399
42.2k
    if (db_path == dbname) {
400
42.2k
      data_dirs_.emplace_back(nullptr);
401
42.2k
    } else {
402
0
      std::unique_ptr<FSDirectory> path_directory;
403
0
      io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
404
0
      if (!io_s.ok()) {
405
0
        return io_s;
406
0
      }
407
0
      data_dirs_.emplace_back(path_directory.release());
408
0
    }
409
42.2k
  }
410
42.2k
  assert(data_dirs_.size() == data_paths.size());
411
42.2k
  return IOStatus::OK();
412
42.2k
}
413
414
Status DBImpl::Recover(
415
    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
416
    bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
417
    bool is_retry, uint64_t* recovered_seq, RecoveryContext* recovery_ctx,
418
42.2k
    bool* can_retry) {
419
42.2k
  mutex_.AssertHeld();
420
421
42.2k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
422
42.2k
  bool tmp_is_new_db = false;
423
42.2k
  bool& is_new_db = recovery_ctx ? recovery_ctx->is_new_db_ : tmp_is_new_db;
424
42.2k
  assert(db_lock_ == nullptr);
425
42.2k
  std::vector<std::string> files_in_dbname;
426
42.2k
  if (!read_only) {
427
42.2k
    Status s = directories_.SetDirectories(fs_.get(), dbname_,
428
42.2k
                                           immutable_db_options_.wal_dir,
429
42.2k
                                           immutable_db_options_.db_paths);
430
42.2k
    if (!s.ok()) {
431
0
      return s;
432
0
    }
433
434
42.2k
    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
435
42.2k
    if (!s.ok()) {
436
0
      return s;
437
0
    }
438
439
42.2k
    std::string current_fname = CurrentFileName(dbname_);
440
    // Path to any MANIFEST file in the db dir. It does not matter which one.
441
    // Since best-efforts recovery ignores CURRENT file, existence of a
442
    // MANIFEST indicates the recovery to recover existing db. If no MANIFEST
443
    // can be found, a new db will be created.
444
42.2k
    std::string manifest_path;
445
42.2k
    if (!immutable_db_options_.best_efforts_recovery) {
446
42.2k
      s = env_->FileExists(current_fname);
447
42.2k
    } else {
448
0
      s = Status::NotFound();
449
0
      IOOptions io_opts;
450
0
      io_opts.do_not_recurse = true;
451
0
      Status io_s = immutable_db_options_.fs->GetChildren(
452
0
          dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
453
0
      if (!io_s.ok()) {
454
0
        s = io_s;
455
0
        files_in_dbname.clear();
456
0
      }
457
0
      for (const std::string& file : files_in_dbname) {
458
0
        uint64_t number = 0;
459
0
        FileType type = kWalFile;  // initialize
460
0
        if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
461
0
          uint64_t bytes;
462
0
          s = env_->GetFileSize(DescriptorFileName(dbname_, number), &bytes);
463
0
          if (s.ok() && bytes != 0) {
464
            // Found non-empty MANIFEST (descriptor log), thus best-efforts
465
            // recovery does not have to treat the db as empty.
466
0
            manifest_path = dbname_ + "/" + file;
467
0
            break;
468
0
          }
469
0
        }
470
0
      }
471
0
    }
472
42.2k
    if (s.IsNotFound()) {
473
7.43k
      if (immutable_db_options_.create_if_missing) {
474
7.43k
        s = NewDB(&files_in_dbname);
475
7.43k
        is_new_db = true;
476
7.43k
        if (!s.ok()) {
477
0
          return s;
478
0
        }
479
7.43k
      } else {
480
0
        return Status::InvalidArgument(
481
0
            current_fname, "does not exist (create_if_missing is false)");
482
0
      }
483
34.7k
    } else if (s.ok()) {
484
34.7k
      if (immutable_db_options_.error_if_exists) {
485
0
        return Status::InvalidArgument(dbname_,
486
0
                                       "exists (error_if_exists is true)");
487
0
      }
488
34.7k
    } else {
489
      // Unexpected error reading file
490
0
      assert(s.IsIOError());
491
0
      return s;
492
0
    }
493
    // Verify compatibility of file_options_ and filesystem
494
42.2k
    {
495
42.2k
      std::unique_ptr<FSRandomAccessFile> idfile;
496
42.2k
      FileOptions customized_fs(file_options_);
497
42.2k
      customized_fs.use_direct_reads |=
498
42.2k
          immutable_db_options_.use_direct_io_for_flush_and_compaction;
499
42.2k
      const std::string& fname =
500
42.2k
          manifest_path.empty() ? current_fname : manifest_path;
501
42.2k
      s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
502
42.2k
      if (!s.ok()) {
503
0
        std::string error_str = s.ToString();
504
        // Check if unsupported Direct I/O is the root cause
505
0
        customized_fs.use_direct_reads = false;
506
0
        s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
507
0
        if (s.ok()) {
508
0
          return Status::InvalidArgument(
509
0
              "Direct I/O is not supported by the specified DB.");
510
0
        } else {
511
0
          return Status::InvalidArgument(
512
0
              "Found options incompatible with filesystem", error_str.c_str());
513
0
        }
514
0
      }
515
42.2k
    }
516
42.2k
  } else if (immutable_db_options_.best_efforts_recovery) {
517
0
    assert(files_in_dbname.empty());
518
0
    IOOptions io_opts;
519
0
    io_opts.do_not_recurse = true;
520
0
    Status s = immutable_db_options_.fs->GetChildren(
521
0
        dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
522
0
    if (s.IsNotFound()) {
523
0
      return Status::InvalidArgument(dbname_,
524
0
                                     "does not exist (open for read only)");
525
0
    } else if (s.IsIOError()) {
526
0
      return s;
527
0
    }
528
0
    assert(s.ok());
529
0
  }
530
42.2k
  assert(is_new_db || db_id_.empty());
531
42.2k
  Status s;
532
42.2k
  bool missing_table_file = false;
533
42.2k
  if (!immutable_db_options_.best_efforts_recovery) {
534
    // Status of reading the descriptor file
535
42.2k
    Status desc_status;
536
42.2k
    s = versions_->Recover(column_families, read_only, &db_id_,
537
42.2k
                           /*no_error_if_files_missing=*/false, is_retry,
538
42.2k
                           &desc_status);
539
42.2k
    desc_status.PermitUncheckedError();
540
42.2k
    if (is_retry) {
541
0
      RecordTick(stats_, FILE_READ_CORRUPTION_RETRY_COUNT);
542
0
      if (desc_status.ok()) {
543
0
        RecordTick(stats_, FILE_READ_CORRUPTION_RETRY_SUCCESS_COUNT);
544
0
      }
545
0
    }
546
42.2k
    if (can_retry) {
547
      // If we're opening for the first time and the failure is likely due to
548
      // a corrupt MANIFEST file (could result in either the log::Reader
549
      // detecting a corrupt record, or SST files not found error due to
550
      // discarding badly formed tail records)
551
42.2k
      if (!is_retry &&
552
42.2k
          (desc_status.IsCorruption() || s.IsNotFound() || s.IsCorruption()) &&
553
0
          CheckFSFeatureSupport(fs_.get(),
554
0
                                FSSupportedOps::kVerifyAndReconstructRead)) {
555
0
        *can_retry = true;
556
0
        ROCKS_LOG_ERROR(
557
0
            immutable_db_options_.info_log,
558
0
            "Possible corruption detected while replaying MANIFEST %s, %s. "
559
0
            "Will be retried.",
560
0
            desc_status.ToString().c_str(), s.ToString().c_str());
561
42.2k
      } else {
562
42.2k
        *can_retry = false;
563
42.2k
      }
564
42.2k
    }
565
42.2k
  } else {
566
0
    assert(!files_in_dbname.empty());
567
0
    s = versions_->TryRecover(column_families, read_only, files_in_dbname,
568
0
                              &db_id_, &missing_table_file);
569
0
    if (s.ok()) {
570
      // TryRecover may delete previous column_family_set_.
571
0
      column_family_memtables_.reset(
572
0
          new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
573
0
    }
574
0
  }
575
42.2k
  if (!s.ok()) {
576
0
    return s;
577
0
  }
578
42.2k
  if (s.ok() && !read_only) {
579
57.4k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
580
57.4k
      const auto& moptions = cfd->GetLatestMutableCFOptions();
581
      // Try to trivially move files down the LSM tree to start from bottommost
582
      // level when level_compaction_dynamic_level_bytes is enabled. This should
583
      // only be useful when user is migrating to turning on this option.
584
      // If a user is migrating from Level Compaction with a smaller level
585
      // multiplier or from Universal Compaction, there may be too many
586
      // non-empty levels and the trivial moves here are not sufficed for
587
      // migration. Additional compactions are needed to drain unnecessary
588
      // levels.
589
      //
590
      // Note that this step moves files down LSM without consulting
591
      // SSTPartitioner. Further compactions are still needed if
592
      // the user wants to partition SST files.
593
      // Note that files moved in this step may not respect the compression
594
      // option in target level.
595
57.4k
      if (cfd->ioptions().compaction_style ==
596
57.4k
              CompactionStyle::kCompactionStyleLevel &&
597
57.4k
          cfd->ioptions().level_compaction_dynamic_level_bytes &&
598
57.4k
          !moptions.disable_auto_compactions) {
599
57.4k
        int to_level = cfd->ioptions().num_levels - 1;
600
        // last level is reserved
601
        // allow_ingest_behind does not support Level Compaction,
602
        // and per_key_placement can have infinite compaction loop for Level
603
        // Compaction. Adjust to_level here just to be safe.
604
57.4k
        if (cfd->AllowIngestBehind() ||
605
57.4k
            moptions.preclude_last_level_data_seconds > 0) {
606
0
          to_level -= 1;
607
0
        }
608
        // Whether this column family has a level trivially moved
609
57.4k
        bool moved = false;
610
        // Fill the LSM starting from to_level and going up one level at a time.
611
        // Some loop invariants (when last level is not reserved):
612
        // - levels in (from_level, to_level] are empty, and
613
        // - levels in (to_level, last_level] are non-empty.
614
459k
        for (int from_level = to_level; from_level >= 0; --from_level) {
615
402k
          const std::vector<FileMetaData*>& level_files =
616
402k
              cfd->current()->storage_info()->LevelFiles(from_level);
617
402k
          if (level_files.empty() || from_level == 0) {
618
392k
            continue;
619
392k
          }
620
402k
          assert(from_level <= to_level);
621
          // Trivial move files from `from_level` to `to_level`
622
9.68k
          if (from_level < to_level) {
623
0
            if (!moved) {
624
              // lsm_state will look like "[1,2,3,4,5,6,0]" for an LSM with
625
              // 7 levels
626
0
              std::string lsm_state = "[";
627
0
              for (int i = 0; i < cfd->ioptions().num_levels; ++i) {
628
0
                lsm_state += std::to_string(
629
0
                    cfd->current()->storage_info()->NumLevelFiles(i));
630
0
                if (i < cfd->ioptions().num_levels - 1) {
631
0
                  lsm_state += ",";
632
0
                }
633
0
              }
634
0
              lsm_state += "]";
635
0
              ROCKS_LOG_WARN(immutable_db_options_.info_log,
636
0
                             "[%s] Trivially move files down the LSM when open "
637
0
                             "with level_compaction_dynamic_level_bytes=true,"
638
0
                             " lsm_state: %s (Files are moved only if DB "
639
0
                             "Recovery is successful).",
640
0
                             cfd->GetName().c_str(), lsm_state.c_str());
641
0
              moved = true;
642
0
            }
643
0
            ROCKS_LOG_WARN(
644
0
                immutable_db_options_.info_log,
645
0
                "[%s] Moving %zu files from from_level-%d to from_level-%d",
646
0
                cfd->GetName().c_str(), level_files.size(), from_level,
647
0
                to_level);
648
0
            VersionEdit edit;
649
0
            edit.SetColumnFamily(cfd->GetID());
650
0
            for (const FileMetaData* f : level_files) {
651
0
              edit.DeleteFile(from_level, f->fd.GetNumber());
652
0
              edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
653
0
                           f->fd.GetFileSize(), f->smallest, f->largest,
654
0
                           f->fd.smallest_seqno, f->fd.largest_seqno,
655
0
                           f->marked_for_compaction,
656
0
                           f->temperature,  // this can be different from
657
                           // `last_level_temperature`
658
0
                           f->oldest_blob_file_number, f->oldest_ancester_time,
659
0
                           f->file_creation_time, f->epoch_number,
660
0
                           f->file_checksum, f->file_checksum_func_name,
661
0
                           f->unique_id, f->compensated_range_deletion_size,
662
0
                           f->tail_size, f->user_defined_timestamps_persisted,
663
0
                           f->min_timestamp, f->max_timestamp);
664
0
              ROCKS_LOG_WARN(immutable_db_options_.info_log,
665
0
                             "[%s] Moving #%" PRIu64
666
0
                             " from from_level-%d to from_level-%d %" PRIu64
667
0
                             " bytes\n",
668
0
                             cfd->GetName().c_str(), f->fd.GetNumber(),
669
0
                             from_level, to_level, f->fd.GetFileSize());
670
0
            }
671
0
            recovery_ctx->UpdateVersionEdits(cfd, edit);
672
0
          }
673
9.68k
          --to_level;
674
9.68k
        }
675
57.4k
      }
676
57.4k
    }
677
42.2k
  }
678
42.2k
  if (is_new_db) {
679
    // Already set up DB ID in NewDB
680
34.7k
  } else if (immutable_db_options_.write_dbid_to_manifest && recovery_ctx) {
681
34.7k
    VersionEdit edit;
682
34.7k
    s = SetupDBId(write_options, read_only, is_new_db, is_retry, &edit);
683
34.7k
    recovery_ctx->UpdateVersionEdits(
684
34.7k
        versions_->GetColumnFamilySet()->GetDefault(), edit);
685
34.7k
  } else {
686
0
    s = SetupDBId(write_options, read_only, is_new_db, is_retry, nullptr);
687
0
  }
688
42.2k
  assert(!s.ok() || !db_id_.empty());
689
42.2k
  ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str());
690
42.2k
  if (s.ok() && !read_only) {
691
42.2k
    s = MaybeUpdateNextFileNumber(recovery_ctx);
692
42.2k
  }
693
694
42.2k
  if (s.ok() && !read_only) {
695
    // TODO: share file descriptors (FSDirectory) with SetDirectories above
696
42.2k
    std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
697
57.4k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
698
57.4k
      s = cfd->AddDirectories(&created_dirs);
699
57.4k
      if (!s.ok()) {
700
0
        return s;
701
0
      }
702
57.4k
    }
703
42.2k
  }
704
705
42.2k
  std::vector<std::string> files_in_wal_dir;
706
42.2k
  if (s.ok()) {
707
    // Initial max_total_in_memory_state_ before recovery wals. Log recovery
708
    // may check this value to decide whether to flush.
709
42.2k
    max_total_in_memory_state_ = 0;
710
57.4k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
711
57.4k
      const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
712
57.4k
      max_total_in_memory_state_ += mutable_cf_options.write_buffer_size *
713
57.4k
                                    mutable_cf_options.max_write_buffer_number;
714
57.4k
    }
715
716
42.2k
    SequenceNumber next_sequence(kMaxSequenceNumber);
717
42.2k
    default_cf_handle_ = new ColumnFamilyHandleImpl(
718
42.2k
        versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
719
42.2k
    default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
720
721
    // Recover from all newer log files than the ones named in the
722
    // descriptor (new log files may have been added by the previous
723
    // incarnation without registering them in the descriptor).
724
    //
725
    // Note that prev_log_number() is no longer used, but we pay
726
    // attention to it in case we are recovering a database
727
    // produced by an older version of rocksdb.
728
42.2k
    auto wal_dir = immutable_db_options_.GetWalDir();
729
42.2k
    if (!immutable_db_options_.best_efforts_recovery) {
730
42.2k
      IOOptions io_opts;
731
42.2k
      io_opts.do_not_recurse = true;
732
42.2k
      s = immutable_db_options_.fs->GetChildren(
733
42.2k
          wal_dir, io_opts, &files_in_wal_dir, /*IODebugContext*=*/nullptr);
734
42.2k
    }
735
42.2k
    if (s.IsNotFound()) {
736
0
      return Status::InvalidArgument("wal_dir not found", wal_dir);
737
42.2k
    } else if (!s.ok()) {
738
0
      return s;
739
0
    }
740
741
42.2k
    std::unordered_map<uint64_t, std::string> wal_files;
742
706k
    for (const auto& file : files_in_wal_dir) {
743
706k
      uint64_t number;
744
706k
      FileType type;
745
706k
      if (ParseFileName(file, &number, &type) && type == kWalFile) {
746
34.7k
        if (is_new_db) {
747
0
          return Status::Corruption(
748
0
              "While creating a new Db, wal_dir contains "
749
0
              "existing log file: ",
750
0
              file);
751
34.7k
        } else {
752
34.7k
          wal_files[number] = LogFileName(wal_dir, number);
753
34.7k
        }
754
34.7k
      }
755
706k
    }
756
757
42.2k
    if (immutable_db_options_.track_and_verify_wals && !is_new_db &&
758
0
        !immutable_db_options_.best_efforts_recovery && wal_files.empty()) {
759
0
      return Status::Corruption("Opening an existing DB with no WAL files");
760
0
    }
761
762
42.2k
    if (immutable_db_options_.track_and_verify_wals_in_manifest) {
763
0
      if (!immutable_db_options_.best_efforts_recovery) {
764
        // Verify WALs in MANIFEST.
765
0
        s = versions_->GetWalSet().CheckWals(env_, wal_files);
766
0
      }  // else since best effort recovery does not recover from WALs, no need
767
         // to check WALs.
768
42.2k
    } else if (!versions_->GetWalSet().GetWals().empty()) {
769
      // Tracking is disabled, clear previously tracked WALs from MANIFEST,
770
      // otherwise, in the future, if WAL tracking is enabled again,
771
      // since the WALs deleted when WAL tracking is disabled are not persisted
772
      // into MANIFEST, WAL check may fail.
773
0
      VersionEdit edit;
774
0
      WalNumber max_wal_number =
775
0
          versions_->GetWalSet().GetWals().rbegin()->first;
776
0
      edit.DeleteWalsBefore(max_wal_number + 1);
777
0
      assert(recovery_ctx != nullptr);
778
0
      assert(versions_->GetColumnFamilySet() != nullptr);
779
0
      recovery_ctx->UpdateVersionEdits(
780
0
          versions_->GetColumnFamilySet()->GetDefault(), edit);
781
0
    }
782
42.2k
    if (!s.ok()) {
783
0
      return s;
784
0
    }
785
786
42.2k
    if (!wal_files.empty()) {
787
34.7k
      if (error_if_wal_file_exists) {
788
0
        return Status::Corruption(
789
0
            "The db was opened in readonly mode with error_if_wal_file_exists"
790
0
            "flag but a WAL file already exists");
791
34.7k
      } else if (error_if_data_exists_in_wals) {
792
0
        for (auto& wal_file : wal_files) {
793
0
          uint64_t bytes;
794
0
          s = env_->GetFileSize(wal_file.second, &bytes);
795
0
          if (s.ok()) {
796
0
            if (bytes > 0) {
797
0
              return Status::Corruption(
798
0
                  "error_if_data_exists_in_wals is set but there are data "
799
0
                  " in WAL files.");
800
0
            }
801
0
          }
802
0
        }
803
0
      }
804
34.7k
    }
805
806
42.2k
    if (!wal_files.empty()) {
807
      // Recover in the order in which the wals were generated
808
34.7k
      std::vector<uint64_t> wals;
809
34.7k
      wals.reserve(wal_files.size());
810
34.7k
      for (const auto& wal_file : wal_files) {
811
34.7k
        wals.push_back(wal_file.first);
812
34.7k
      }
813
34.7k
      std::sort(wals.begin(), wals.end());
814
815
34.7k
      bool corrupted_wal_found = false;
816
34.7k
      s = RecoverLogFiles(wals, &next_sequence, read_only, is_retry,
817
34.7k
                          &corrupted_wal_found, recovery_ctx);
818
34.7k
      if (corrupted_wal_found && recovered_seq != nullptr) {
819
0
        *recovered_seq = next_sequence;
820
0
      }
821
34.7k
      if (!s.ok()) {
822
        // Clear memtables if recovery failed
823
0
        for (auto cfd : *versions_->GetColumnFamilySet()) {
824
0
          cfd->CreateNewMemtable(kMaxSequenceNumber);
825
0
        }
826
0
      }
827
34.7k
    }
828
42.2k
  }
829
830
42.2k
  if (read_only) {
831
    // If we are opening as read-only, we need to update options_file_number_
832
    // to reflect the most recent OPTIONS file. It does not matter for regular
833
    // read-write db instance because options_file_number_ will later be
834
    // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
835
0
    std::vector<std::string> filenames;
836
0
    if (s.ok()) {
837
0
      const std::string normalized_dbname = NormalizePath(dbname_);
838
0
      const std::string normalized_wal_dir =
839
0
          NormalizePath(immutable_db_options_.GetWalDir());
840
0
      if (immutable_db_options_.best_efforts_recovery) {
841
0
        filenames = std::move(files_in_dbname);
842
0
      } else if (normalized_dbname == normalized_wal_dir) {
843
0
        filenames = std::move(files_in_wal_dir);
844
0
      } else {
845
0
        IOOptions io_opts;
846
0
        io_opts.do_not_recurse = true;
847
0
        s = immutable_db_options_.fs->GetChildren(
848
0
            GetName(), io_opts, &filenames, /*IODebugContext*=*/nullptr);
849
0
      }
850
0
    }
851
0
    if (s.ok()) {
852
0
      uint64_t number = 0;
853
0
      uint64_t options_file_number = 0;
854
0
      FileType type;
855
0
      for (const auto& fname : filenames) {
856
0
        if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
857
0
          options_file_number = std::max(number, options_file_number);
858
0
        }
859
0
      }
860
0
      versions_->options_file_number_ = options_file_number;
861
0
      uint64_t options_file_size = 0;
862
0
      if (options_file_number > 0) {
863
0
        s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
864
0
                              &options_file_size);
865
0
      }
866
0
      versions_->options_file_size_ = options_file_size;
867
0
    }
868
0
  }
869
42.2k
  return s;
870
42.2k
}
871
872
0
Status DBImpl::PersistentStatsProcessFormatVersion() {
873
0
  mutex_.AssertHeld();
874
0
  Status s;
875
  // persist version when stats CF doesn't exist
876
0
  bool should_persist_format_version = !persistent_stats_cfd_exists_;
877
0
  mutex_.Unlock();
878
0
  if (persistent_stats_cfd_exists_) {
879
    // Check persistent stats format version compatibility. Drop and recreate
880
    // persistent stats CF if format version is incompatible
881
0
    uint64_t format_version_recovered = 0;
882
0
    Status s_format = DecodePersistentStatsVersionNumber(
883
0
        this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
884
0
    uint64_t compatible_version_recovered = 0;
885
0
    Status s_compatible = DecodePersistentStatsVersionNumber(
886
0
        this, StatsVersionKeyType::kCompatibleVersion,
887
0
        &compatible_version_recovered);
888
    // abort reading from existing stats CF if any of following is true:
889
    // 1. failed to read format version or compatible version from disk
890
    // 2. sst's format version is greater than current format version, meaning
891
    // this sst is encoded with a newer RocksDB release, and current compatible
892
    // version is below the sst's compatible version
893
0
    if (!s_format.ok() || !s_compatible.ok() ||
894
0
        (kStatsCFCurrentFormatVersion < format_version_recovered &&
895
0
         kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
896
0
      if (!s_format.ok() || !s_compatible.ok()) {
897
0
        ROCKS_LOG_WARN(
898
0
            immutable_db_options_.info_log,
899
0
            "Recreating persistent stats column family since reading "
900
0
            "persistent stats version key failed. Format key: %s, compatible "
901
0
            "key: %s",
902
0
            s_format.ToString().c_str(), s_compatible.ToString().c_str());
903
0
      } else {
904
0
        ROCKS_LOG_WARN(
905
0
            immutable_db_options_.info_log,
906
0
            "Recreating persistent stats column family due to corrupted or "
907
0
            "incompatible format version. Recovered format: %" PRIu64
908
0
            "; recovered format compatible since: %" PRIu64 "\n",
909
0
            format_version_recovered, compatible_version_recovered);
910
0
      }
911
0
      s = DropColumnFamily(persist_stats_cf_handle_);
912
0
      if (s.ok()) {
913
0
        s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
914
0
      }
915
0
      ColumnFamilyHandle* handle = nullptr;
916
0
      if (s.ok()) {
917
0
        ColumnFamilyOptions cfo;
918
0
        OptimizeForPersistentStats(&cfo);
919
0
        s = CreateColumnFamilyImpl(ReadOptions(Env::IOActivity::kDBOpen),
920
0
                                   WriteOptions(Env::IOActivity::kDBOpen), cfo,
921
0
                                   kPersistentStatsColumnFamilyName, &handle);
922
0
      }
923
0
      if (s.ok()) {
924
0
        persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
925
        // should also persist version here because old stats CF is discarded
926
0
        should_persist_format_version = true;
927
0
      }
928
0
    }
929
0
  }
930
0
  if (should_persist_format_version) {
931
    // Persistent stats CF being created for the first time, need to write
932
    // format version key
933
0
    WriteBatch batch;
934
0
    if (s.ok()) {
935
0
      s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
936
0
                    std::to_string(kStatsCFCurrentFormatVersion));
937
0
    }
938
0
    if (s.ok()) {
939
0
      s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
940
0
                    std::to_string(kStatsCFCompatibleFormatVersion));
941
0
    }
942
0
    if (s.ok()) {
943
      // TODO: plumb Env::IOActivity, Env::IOPriority
944
0
      WriteOptions wo;
945
0
      wo.low_pri = true;
946
0
      wo.no_slowdown = true;
947
0
      wo.sync = false;
948
0
      s = Write(wo, &batch);
949
0
    }
950
0
  }
951
0
  mutex_.Lock();
952
0
  return s;
953
0
}
954
955
0
Status DBImpl::InitPersistStatsColumnFamily() {
956
0
  mutex_.AssertHeld();
957
0
  assert(!persist_stats_cf_handle_);
958
0
  ColumnFamilyData* persistent_stats_cfd =
959
0
      versions_->GetColumnFamilySet()->GetColumnFamily(
960
0
          kPersistentStatsColumnFamilyName);
961
0
  persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
962
963
0
  Status s;
964
0
  if (persistent_stats_cfd != nullptr) {
965
    // We are recovering from a DB which already contains persistent stats CF,
966
    // the CF is already created in VersionSet::ApplyOneVersionEdit, but
967
    // column family handle was not. Need to explicitly create handle here.
968
0
    persist_stats_cf_handle_ =
969
0
        new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
970
0
  } else {
971
0
    mutex_.Unlock();
972
0
    ColumnFamilyHandle* handle = nullptr;
973
0
    ColumnFamilyOptions cfo;
974
0
    OptimizeForPersistentStats(&cfo);
975
0
    s = CreateColumnFamilyImpl(ReadOptions(Env::IOActivity::kDBOpen),
976
0
                               WriteOptions(Env::IOActivity::kDBOpen), cfo,
977
0
                               kPersistentStatsColumnFamilyName, &handle);
978
0
    persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
979
0
    mutex_.Lock();
980
0
  }
981
0
  return s;
982
0
}
983
984
42.2k
Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
985
42.2k
  mutex_.AssertHeld();
986
42.2k
  assert(versions_->descriptor_log_ == nullptr);
987
42.2k
  const ReadOptions read_options(Env::IOActivity::kDBOpen);
988
42.2k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
989
990
42.2k
  Status s = versions_->LogAndApply(recovery_ctx.cfds_, read_options,
991
42.2k
                                    write_options, recovery_ctx.edit_lists_,
992
42.2k
                                    &mutex_, directories_.GetDbDir());
993
42.2k
  return s;
994
42.2k
}
995
996
34.7k
void DBImpl::InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap() {
997
34.7k
  if (immutable_db_options_.wal_filter == nullptr) {
998
34.7k
    return;
999
34.7k
  }
1000
34.7k
  assert(immutable_db_options_.wal_filter != nullptr);
1001
0
  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
1002
1003
0
  std::map<std::string, uint32_t> cf_name_id_map;
1004
0
  std::map<uint32_t, uint64_t> cf_lognumber_map;
1005
0
  assert(versions_);
1006
0
  assert(versions_->GetColumnFamilySet());
1007
0
  for (auto cfd : *versions_->GetColumnFamilySet()) {
1008
0
    assert(cfd);
1009
0
    cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
1010
0
    cf_lognumber_map.insert(std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
1011
0
  }
1012
1013
0
  wal_filter.ColumnFamilyLogNumberMap(cf_lognumber_map, cf_name_id_map);
1014
0
}
1015
1016
bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
1017
                                                const std::string& wal_fname,
1018
                                                log::Reader::Reporter& reporter,
1019
                                                Status& status,
1020
                                                bool& stop_replay,
1021
489k
                                                WriteBatch& batch) {
1022
489k
  if (immutable_db_options_.wal_filter == nullptr) {
1023
489k
    return true;
1024
489k
  }
1025
489k
  assert(immutable_db_options_.wal_filter != nullptr);
1026
0
  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
1027
1028
0
  WriteBatch new_batch;
1029
0
  bool batch_changed = false;
1030
1031
0
  bool process_current_record = true;
1032
1033
0
  WalFilter::WalProcessingOption wal_processing_option =
1034
0
      wal_filter.LogRecordFound(wal_number, wal_fname, batch, &new_batch,
1035
0
                                &batch_changed);
1036
1037
0
  switch (wal_processing_option) {
1038
0
    case WalFilter::WalProcessingOption::kContinueProcessing:
1039
      // do nothing, proceeed normally
1040
0
      break;
1041
0
    case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
1042
      // skip current record
1043
0
      process_current_record = false;
1044
0
      break;
1045
0
    case WalFilter::WalProcessingOption::kStopReplay:
1046
      // skip current record and stop replay
1047
0
      process_current_record = false;
1048
0
      stop_replay = true;
1049
0
      break;
1050
0
    case WalFilter::WalProcessingOption::kCorruptedRecord: {
1051
0
      status = Status::Corruption("Corruption reported by Wal Filter ",
1052
0
                                  wal_filter.Name());
1053
0
      MaybeIgnoreError(&status);
1054
0
      if (!status.ok()) {
1055
0
        process_current_record = false;
1056
0
        reporter.Corruption(batch.GetDataSize(), status);
1057
0
      }
1058
0
      break;
1059
0
    }
1060
0
    default: {
1061
      // logical error which should not happen. If RocksDB throws, we would
1062
      // just do `throw std::logic_error`.
1063
0
      assert(false);
1064
0
      status = Status::NotSupported(
1065
0
          "Unknown WalProcessingOption returned by Wal Filter ",
1066
0
          wal_filter.Name());
1067
0
      MaybeIgnoreError(&status);
1068
0
      if (!status.ok()) {
1069
        // Ignore the error with current record processing.
1070
0
        stop_replay = true;
1071
0
      }
1072
0
      break;
1073
0
    }
1074
0
  }
1075
1076
0
  if (!process_current_record) {
1077
0
    return false;
1078
0
  }
1079
1080
0
  if (batch_changed) {
1081
    // Make sure that the count in the new batch is
1082
    // within the orignal count.
1083
0
    int new_count = WriteBatchInternal::Count(&new_batch);
1084
0
    int original_count = WriteBatchInternal::Count(&batch);
1085
0
    if (new_count > original_count) {
1086
0
      ROCKS_LOG_FATAL(
1087
0
          immutable_db_options_.info_log,
1088
0
          "Recovering log #%" PRIu64
1089
0
          " mode %d log filter %s returned "
1090
0
          "more records (%d) than original (%d) which is not allowed. "
1091
0
          "Aborting recovery.",
1092
0
          wal_number, static_cast<int>(immutable_db_options_.wal_recovery_mode),
1093
0
          wal_filter.Name(), new_count, original_count);
1094
0
      status = Status::NotSupported(
1095
0
          "More than original # of records "
1096
0
          "returned by Wal Filter ",
1097
0
          wal_filter.Name());
1098
0
      return false;
1099
0
    }
1100
    // Set the same sequence number in the new_batch
1101
    // as the original batch.
1102
0
    WriteBatchInternal::SetSequence(&new_batch,
1103
0
                                    WriteBatchInternal::Sequence(&batch));
1104
0
    batch = new_batch;
1105
0
  }
1106
0
  return true;
1107
0
}
1108
1109
void DBOpenLogRecordReadReporter::Corruption(size_t bytes, const Status& s,
1110
0
                                             uint64_t log_number) {
1111
0
  ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
1112
0
                 (status == nullptr ? "(ignoring error) " : ""), fname,
1113
0
                 static_cast<int>(bytes), s.ToString().c_str());
1114
0
  if (status != nullptr && status->ok()) {
1115
0
    *status = s;
1116
0
    corrupted_wal_number_ = log_number;
1117
0
  }
1118
0
}
1119
1120
0
void DBOpenLogRecordReadReporter::OldLogRecord(size_t bytes) {
1121
0
  if (old_log_record != nullptr) {
1122
0
    *old_log_record = true;
1123
0
  }
1124
0
  ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled", fname,
1125
0
                 static_cast<int>(bytes));
1126
0
}
1127
1128
// REQUIRES: wal_numbers are sorted in ascending order
1129
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
1130
                               SequenceNumber* next_sequence, bool read_only,
1131
                               bool is_retry, bool* corrupted_wal_found,
1132
34.7k
                               RecoveryContext* recovery_ctx) {
1133
34.7k
  mutex_.AssertHeld();
1134
1135
34.7k
  std::unordered_map<int, VersionEdit> version_edits;
1136
34.7k
  int job_id = 0;
1137
34.7k
  uint64_t min_wal_number = 0;
1138
34.7k
  SetupLogFilesRecovery(wal_numbers, &version_edits, &job_id, &min_wal_number);
1139
1140
34.7k
  Status status = ProcessLogFiles(
1141
34.7k
      wal_numbers, read_only, is_retry, min_wal_number, job_id, next_sequence,
1142
34.7k
      &version_edits, corrupted_wal_found, recovery_ctx);
1143
1144
34.7k
  FinishLogFilesRecovery(job_id, status);
1145
34.7k
  return status;
1146
34.7k
}
1147
1148
void DBImpl::SetupLogFilesRecovery(
1149
    const std::vector<uint64_t>& wal_numbers,
1150
    std::unordered_map<int, VersionEdit>* version_edits, int* job_id,
1151
34.7k
    uint64_t* min_wal_number) {
1152
34.7k
  assert(version_edits);
1153
34.7k
  assert(job_id);
1154
34.7k
  assert(min_wal_number);
1155
  // No need to refcount because iteration is under mutex
1156
50.0k
  for (auto cfd : *versions_->GetColumnFamilySet()) {
1157
50.0k
    VersionEdit edit;
1158
50.0k
    edit.SetColumnFamily(cfd->GetID());
1159
50.0k
    version_edits->insert({cfd->GetID(), edit});
1160
50.0k
  }
1161
1162
34.7k
  *job_id = next_job_id_.fetch_add(1);
1163
34.7k
  {
1164
34.7k
    auto stream = event_logger_.Log();
1165
34.7k
    stream << "job" << *job_id;
1166
34.7k
    stream << "event" << "recovery_started";
1167
34.7k
    stream << "wal_files";
1168
34.7k
    stream.StartArray();
1169
34.7k
    for (auto wal_number : wal_numbers) {
1170
34.7k
      stream << wal_number;
1171
34.7k
    }
1172
34.7k
    stream.EndArray();
1173
34.7k
  }
1174
1175
  // No-op for immutable_db_options_.wal_filter == nullptr.
1176
34.7k
  InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
1177
1178
34.7k
  *min_wal_number = MinLogNumberToKeep();
1179
34.7k
  if (!allow_2pc()) {
1180
    // In non-2pc mode, we skip WALs that do not back unflushed data.
1181
34.7k
    *min_wal_number =
1182
34.7k
        std::max(*min_wal_number, versions_->MinLogNumberWithUnflushedData());
1183
34.7k
  }
1184
34.7k
}
1185
1186
Status DBImpl::ProcessLogFiles(
1187
    const std::vector<uint64_t>& wal_numbers, bool read_only, bool is_retry,
1188
    uint64_t min_wal_number, int job_id, SequenceNumber* next_sequence,
1189
    std::unordered_map<int, VersionEdit>* version_edits,
1190
34.7k
    bool* corrupted_wal_found, RecoveryContext* recovery_ctx) {
1191
34.7k
  Status status;
1192
1193
34.7k
  bool stop_replay_by_wal_filter = false;
1194
34.7k
  bool stop_replay_for_corruption = false;
1195
34.7k
  bool flushed = false;
1196
34.7k
  uint64_t corrupted_wal_number = kMaxSequenceNumber;
1197
34.7k
  PredecessorWALInfo predecessor_wal_info;
1198
1199
34.7k
  for (auto wal_number : wal_numbers) {
1200
    // Detecting early break on the next iteration after `wal_number` has been
1201
    // advanced since this `wal_number` doesn't affect follow-up handling after
1202
    // breaking out of the for loop.
1203
34.7k
    if (!status.ok()) {
1204
0
      break;
1205
0
    }
1206
34.7k
    SequenceNumber prev_next_sequence = *next_sequence;
1207
34.7k
    if (status.ok()) {
1208
34.7k
      status = ProcessLogFile(
1209
34.7k
          wal_number, min_wal_number, is_retry, read_only, job_id,
1210
34.7k
          next_sequence, &stop_replay_for_corruption,
1211
34.7k
          &stop_replay_by_wal_filter, &corrupted_wal_number,
1212
34.7k
          corrupted_wal_found, version_edits, &flushed, predecessor_wal_info);
1213
34.7k
    }
1214
34.7k
    if (status.ok()) {
1215
34.7k
      status = CheckSeqnoNotSetBackDuringRecovery(prev_next_sequence,
1216
34.7k
                                                  *next_sequence);
1217
34.7k
    }
1218
34.7k
  }
1219
1220
34.7k
  if (status.ok()) {
1221
34.7k
    status = MaybeHandleStopReplayForCorruptionForInconsistency(
1222
34.7k
        stop_replay_for_corruption, corrupted_wal_number);
1223
34.7k
  }
1224
1225
34.7k
  if (status.ok()) {
1226
34.7k
    status = MaybeFlushFinalMemtableOrRestoreActiveLogFiles(
1227
34.7k
        wal_numbers, read_only, job_id, flushed, version_edits, recovery_ctx);
1228
34.7k
  }
1229
34.7k
  return status;
1230
34.7k
}
1231
1232
Status DBImpl::ProcessLogFile(
1233
    uint64_t wal_number, uint64_t min_wal_number, bool is_retry, bool read_only,
1234
    int job_id, SequenceNumber* next_sequence, bool* stop_replay_for_corruption,
1235
    bool* stop_replay_by_wal_filter, uint64_t* corrupted_wal_number,
1236
    bool* corrupted_wal_found,
1237
    std::unordered_map<int, VersionEdit>* version_edits, bool* flushed,
1238
34.7k
    PredecessorWALInfo& predecessor_wal_info) {
1239
34.7k
  assert(stop_replay_by_wal_filter);
1240
1241
  // Variable initialization starts
1242
34.7k
  Status status;
1243
34.7k
  bool old_log_record = false;
1244
1245
34.7k
  DBOpenLogRecordReadReporter reporter;
1246
34.7k
  std::unique_ptr<log::Reader> reader;
1247
1248
34.7k
  std::string fname =
1249
34.7k
      LogFileName(immutable_db_options_.GetWalDir(), wal_number);
1250
1251
34.7k
  auto logFileDropped = [this, &fname]() {
1252
0
    uint64_t bytes;
1253
0
    if (env_->GetFileSize(fname, &bytes).ok()) {
1254
0
      auto info_log = immutable_db_options_.info_log.get();
1255
0
      ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
1256
0
                     static_cast<int>(bytes));
1257
0
    }
1258
0
  };
1259
1260
34.7k
  std::string scratch;
1261
34.7k
  Slice record;
1262
34.7k
  uint64_t record_checksum;
1263
34.7k
  const UnorderedMap<uint32_t, size_t>& running_ts_sz =
1264
34.7k
      versions_->GetRunningColumnFamiliesTimestampSize();
1265
1266
  // We need to track `last_seqno_observed` in addition to `next_sequence` since
1267
  // `last_seqno_observed != *next_sequence` when there are multiple key-value
1268
  // pairs in one WAL entry
1269
34.7k
  SequenceNumber last_seqno_observed = 0;
1270
  // Variable initialization ends
1271
1272
34.7k
  if (wal_number < min_wal_number) {
1273
0
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
1274
0
                   "Skipping log #%" PRIu64
1275
0
                   " since it is older than min log to keep #%" PRIu64,
1276
0
                   wal_number, min_wal_number);
1277
0
    assert(status.ok());
1278
0
    return status;
1279
0
  }
1280
1281
34.7k
  SetupLogFileProcessing(wal_number);
1282
1283
34.7k
  if (*stop_replay_by_wal_filter) {
1284
0
    logFileDropped();
1285
0
    assert(status.ok());
1286
0
    return status;
1287
0
  }
1288
1289
34.7k
  Status init_status = InitializeLogReader(
1290
34.7k
      wal_number, is_retry, fname, *stop_replay_for_corruption, min_wal_number,
1291
34.7k
      predecessor_wal_info, &old_log_record, &status, &reporter, reader);
1292
1293
  // FIXME(hx235): Consolidate `!init_status.ok()` and `reader == nullptr` cases
1294
34.7k
  if (!init_status.ok()) {
1295
0
    assert(status.ok());
1296
0
    status.PermitUncheckedError();
1297
0
    return init_status;
1298
34.7k
  } else if (reader == nullptr) {
1299
    // TODO(hx235): remove this case since it's confusing
1300
0
    assert(status.ok());
1301
    // Fail initializing log reader for one log file with an ok status.
1302
    // Try next one.
1303
0
    return status;
1304
0
  }
1305
1306
34.7k
  TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
1307
34.7k
                           /*cb_arg=*/nullptr);
1308
524k
  while (true) {
1309
524k
    if (*stop_replay_by_wal_filter) {
1310
0
      break;
1311
0
    }
1312
1313
524k
    bool read_record = reader->ReadRecord(
1314
524k
        &record, &scratch, immutable_db_options_.wal_recovery_mode,
1315
524k
        &record_checksum);
1316
1317
    // `reader->ReadRecord` will change `status` through reporter in `reader`
1318
    // when a corruption is encountered
1319
    // FIXME(hx235): consolidate `read_record` and `status`
1320
524k
    if (!read_record || !status.ok()) {
1321
34.7k
      break;
1322
34.7k
    }
1323
1324
    // FIXME(hx235): consolidate `process_status` and `status`
1325
489k
    SequenceNumber prev_next_sequence = *next_sequence;
1326
489k
    Status process_status = ProcessLogRecord(
1327
489k
        record, reader, running_ts_sz, wal_number, fname, read_only, job_id,
1328
489k
        logFileDropped, &reporter, &record_checksum, &last_seqno_observed,
1329
489k
        next_sequence, stop_replay_for_corruption, &status,
1330
489k
        stop_replay_by_wal_filter, version_edits, flushed);
1331
1332
489k
    if (!process_status.ok()) {
1333
0
      return process_status;
1334
489k
    } else if (Status seqno_check_status = CheckSeqnoNotSetBackDuringRecovery(
1335
489k
                   prev_next_sequence, *next_sequence);
1336
489k
               !seqno_check_status.ok()) {
1337
      // Sequence number being set back indicates a serious software bug, the DB
1338
      // should not be opened in this case.
1339
0
      return seqno_check_status;
1340
489k
    } else if (*stop_replay_for_corruption) {
1341
0
      break;
1342
0
    }
1343
489k
  }
1344
1345
34.7k
  ROCKS_LOG_INFO(immutable_db_options_.info_log,
1346
34.7k
                 "Recovered to log #%" PRIu64 " next seq #%" PRIu64, wal_number,
1347
34.7k
                 *next_sequence);
1348
1349
34.7k
  if (status.ok()) {
1350
34.7k
    status = UpdatePredecessorWALInfo(wal_number, last_seqno_observed, fname,
1351
34.7k
                                      predecessor_wal_info);
1352
34.7k
  }
1353
1354
34.7k
  if (!status.ok() || old_log_record) {
1355
0
    status = HandleNonOkStatusOrOldLogRecord(
1356
0
        wal_number, next_sequence, status, reporter, &old_log_record,
1357
0
        stop_replay_for_corruption, corrupted_wal_number, corrupted_wal_found);
1358
0
  }
1359
1360
34.7k
  FinishLogFileProcessing(status, next_sequence);
1361
1362
34.7k
  return status;
1363
34.7k
}
1364
1365
34.7k
void DBImpl::SetupLogFileProcessing(uint64_t wal_number) {
1366
  // The previous incarnation may not have written any MANIFEST
1367
  // records after allocating this log number.  So we manually
1368
  // update the file number allocation counter in VersionSet.
1369
34.7k
  versions_->MarkFileNumberUsed(wal_number);
1370
1371
34.7k
  ROCKS_LOG_INFO(immutable_db_options_.info_log,
1372
34.7k
                 "Recovering log #%" PRIu64 " mode %d", wal_number,
1373
34.7k
                 static_cast<int>(immutable_db_options_.wal_recovery_mode));
1374
34.7k
}
1375
1376
Status DBImpl::InitializeLogReader(
1377
    uint64_t wal_number, bool is_retry, std::string& fname,
1378
    bool stop_replay_for_corruption, uint64_t min_wal_number,
1379
    const PredecessorWALInfo& predecessor_wal_info, bool* const old_log_record,
1380
    Status* const reporter_status, DBOpenLogRecordReadReporter* reporter,
1381
34.7k
    std::unique_ptr<log::Reader>& reader) {
1382
34.7k
  assert(old_log_record);
1383
34.7k
  assert(reporter_status);
1384
34.7k
  assert(reporter);
1385
1386
34.7k
  Status status;
1387
1388
34.7k
  std::unique_ptr<SequentialFileReader> file_reader;
1389
34.7k
  {
1390
34.7k
    std::unique_ptr<FSSequentialFile> file;
1391
34.7k
    status = fs_->NewSequentialFile(
1392
34.7k
        fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
1393
34.7k
    if (!status.ok()) {
1394
0
      MaybeIgnoreError(&status);
1395
0
      return status;
1396
0
    }
1397
34.7k
    file_reader.reset(new SequentialFileReader(
1398
34.7k
        std::move(file), fname, immutable_db_options_.log_readahead_size,
1399
34.7k
        io_tracer_, /*listeners=*/{}, /*rate_limiter=*/nullptr,
1400
34.7k
        /*verify_and_reconstruct_read=*/is_retry));
1401
34.7k
  }
1402
1403
  // Create the log reader.
1404
0
  reporter->env = env_;
1405
34.7k
  reporter->info_log = immutable_db_options_.info_log.get();
1406
34.7k
  reporter->fname = fname.c_str();
1407
34.7k
  reporter->old_log_record = old_log_record;
1408
34.7k
  if (!immutable_db_options_.paranoid_checks ||
1409
34.7k
      immutable_db_options_.wal_recovery_mode ==
1410
34.7k
          WALRecoveryMode::kSkipAnyCorruptedRecords) {
1411
0
    reporter->status = nullptr;
1412
34.7k
  } else {
1413
34.7k
    reporter->status = reporter_status;
1414
34.7k
  }
1415
  // We intentially make log::Reader do checksumming even if
1416
  // paranoid_checks==false so that corruptions cause entire commits
1417
  // to be skipped instead of propagating bad information (like overly
1418
  // large sequence numbers).
1419
34.7k
  reader.reset(new log::Reader(
1420
34.7k
      immutable_db_options_.info_log, std::move(file_reader), reporter,
1421
34.7k
      true /*checksum*/, wal_number,
1422
34.7k
      immutable_db_options_.track_and_verify_wals, stop_replay_for_corruption,
1423
34.7k
      min_wal_number, predecessor_wal_info));
1424
34.7k
  return status;
1425
34.7k
}
1426
1427
Status DBImpl::ProcessLogRecord(
1428
    Slice record, const std::unique_ptr<log::Reader>& reader,
1429
    const UnorderedMap<uint32_t, size_t>& running_ts_sz, uint64_t wal_number,
1430
    const std::string& fname, bool read_only, int job_id,
1431
    const std::function<void()>& logFileDropped,
1432
    DBOpenLogRecordReadReporter* reporter, uint64_t* record_checksum,
1433
    SequenceNumber* last_seqno_observed, SequenceNumber* next_sequence,
1434
    bool* stop_replay_for_corruption, Status* status,
1435
    bool* stop_replay_by_wal_filter,
1436
489k
    std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
1437
489k
  assert(reporter);
1438
489k
  assert(last_seqno_observed);
1439
489k
  assert(stop_replay_for_corruption);
1440
489k
  assert(status);
1441
489k
  assert(stop_replay_by_wal_filter);
1442
1443
489k
  Status process_status;
1444
489k
  bool has_valid_writes = false;
1445
489k
  WriteBatch batch;
1446
489k
  std::unique_ptr<WriteBatch> new_batch;
1447
489k
  WriteBatch* batch_to_use = nullptr;
1448
1449
489k
  if (record.size() < WriteBatchInternal::kHeader) {
1450
0
    reporter->Corruption(record.size(),
1451
0
                         Status::Corruption("log record too small"));
1452
0
    assert(process_status.ok());
1453
0
    return process_status;
1454
0
  }
1455
1456
489k
  process_status = InitializeWriteBatchForLogRecord(
1457
489k
      record, reader, running_ts_sz, &batch, new_batch, batch_to_use,
1458
489k
      record_checksum);
1459
489k
  if (!process_status.ok()) {
1460
0
    return process_status;
1461
0
  }
1462
489k
  assert(batch_to_use);
1463
1464
489k
  *last_seqno_observed = WriteBatchInternal::Sequence(batch_to_use);
1465
1466
489k
  if (*last_seqno_observed > kMaxSequenceNumber) {
1467
0
    reporter->Corruption(
1468
0
        record.size(),
1469
0
        Status::Corruption("sequence " + std::to_string(*last_seqno_observed) +
1470
0
                           " is too large"));
1471
0
    assert(process_status.ok());
1472
0
    return process_status;
1473
0
  }
1474
1475
489k
  MaybeReviseStopReplayForCorruption(*last_seqno_observed, next_sequence,
1476
489k
                                     stop_replay_for_corruption);
1477
489k
  if (*stop_replay_for_corruption) {
1478
0
    logFileDropped();
1479
0
    assert(process_status.ok());
1480
0
    return process_status;
1481
0
  }
1482
1483
  // For the default case of wal_filter == nullptr, always performs no-op
1484
  // and returns true.
1485
489k
  if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, *reporter, *status,
1486
489k
                                          *stop_replay_by_wal_filter,
1487
489k
                                          *batch_to_use)) {
1488
0
    assert(process_status.ok());
1489
0
    return process_status;
1490
489k
  } else {
1491
    // FIXME(hx235): Handle the potential non-okay `status` when
1492
    // `InvokeWalFilterIfNeededOnWalRecord()` returns true
1493
489k
    status->PermitUncheckedError();
1494
489k
  }
1495
1496
489k
  assert(process_status.ok());
1497
489k
  process_status = InsertLogRecordToMemtable(
1498
489k
      batch_to_use, wal_number, next_sequence, &has_valid_writes, read_only);
1499
489k
  MaybeIgnoreError(&process_status);
1500
  // We are treating this as a failure while reading since we read valid
1501
  // blocks that do not form coherent data
1502
489k
  if (!process_status.ok()) {
1503
    // FIXME(hx235): `reporter->Corruption()` will override the non-ok status
1504
    // set in `InvokeWalFilterIfNeededOnWalRecord` through passing `*status`
1505
0
    reporter->Corruption(record.size(), process_status);
1506
0
    process_status = Status::OK();
1507
0
    return process_status;
1508
0
  }
1509
1510
489k
  process_status = MaybeWriteLevel0TableForRecovery(
1511
489k
      has_valid_writes, read_only, wal_number, job_id, next_sequence,
1512
489k
      version_edits, flushed);
1513
1514
489k
  return process_status;
1515
489k
}
1516
1517
// We create a new batch and initialize with a valid prot_info_ to store
1518
// the data checksum
1519
Status DBImpl::InitializeWriteBatchForLogRecord(
1520
    Slice record, const std::unique_ptr<log::Reader>& reader,
1521
    const UnorderedMap<uint32_t, size_t>& running_ts_sz, WriteBatch* batch,
1522
    std::unique_ptr<WriteBatch>& new_batch, WriteBatch*& batch_to_use,
1523
489k
    uint64_t* record_checksum) {
1524
489k
  assert(batch);
1525
489k
  assert(record_checksum);
1526
1527
489k
  Status status = WriteBatchInternal::SetContents(batch, record);
1528
489k
  if (!status.ok()) {
1529
0
    return status;
1530
0
  }
1531
1532
489k
  const UnorderedMap<uint32_t, size_t>& record_ts_sz =
1533
489k
      reader->GetRecordedTimestampSize();
1534
489k
  status = HandleWriteBatchTimestampSizeDifference(
1535
489k
      batch, running_ts_sz, record_ts_sz,
1536
489k
      TimestampSizeConsistencyMode::kReconcileInconsistency, seq_per_batch_,
1537
489k
      batch_per_txn_, &new_batch);
1538
489k
  if (!status.ok()) {
1539
0
    return status;
1540
0
  }
1541
1542
489k
  bool batch_updated = new_batch != nullptr;
1543
489k
  batch_to_use = batch_updated ? new_batch.get() : batch;
1544
489k
  TEST_SYNC_POINT_CALLBACK(
1545
489k
      "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch", batch_to_use);
1546
489k
  TEST_SYNC_POINT_CALLBACK(
1547
489k
      "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
1548
489k
      record_checksum);
1549
489k
  status = WriteBatchInternal::UpdateProtectionInfo(
1550
489k
      batch_to_use, 8 /* bytes_per_key */,
1551
489k
      batch_updated ? nullptr : record_checksum);
1552
1553
489k
  return status;
1554
489k
}
1555
1556
void DBImpl::MaybeReviseStopReplayForCorruption(
1557
    SequenceNumber sequence, SequenceNumber const* const next_sequence,
1558
489k
    bool* stop_replay_for_corruption) {
1559
489k
  if (immutable_db_options_.wal_recovery_mode ==
1560
489k
      WALRecoveryMode::kPointInTimeRecovery) {
1561
489k
    assert(next_sequence);
1562
489k
    assert(stop_replay_for_corruption);
1563
    // In point-in-time recovery mode, if sequence id of log files are
1564
    // consecutive, we continue recovery despite corruption. This could
1565
    // happen when we open and write to a corrupted DB, where sequence id
1566
    // will start from the last sequence id we recovered.
1567
489k
    if (sequence == *next_sequence) {
1568
436k
      *stop_replay_for_corruption = false;
1569
436k
    }
1570
489k
  }
1571
489k
}
1572
1573
Status DBImpl::InsertLogRecordToMemtable(WriteBatch* batch_to_use,
1574
                                         uint64_t wal_number,
1575
                                         SequenceNumber* next_sequence,
1576
                                         bool* has_valid_writes,
1577
489k
                                         bool read_only) {
1578
  // If column family was not found, it might mean that the WAL write
1579
  // batch references to the column family that was dropped after the
1580
  // insert. We don't want to fail the whole write batch in that case --
1581
  // we just ignore the update.
1582
  // That's why we set ignore missing column families to true
1583
489k
  assert(batch_to_use);
1584
489k
  assert(has_valid_writes);
1585
489k
  Status status = WriteBatchInternal::InsertInto(
1586
489k
      batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
1587
489k
      &trim_history_scheduler_, true, wal_number, this,
1588
489k
      false /* concurrent_memtable_writes */, next_sequence, has_valid_writes,
1589
489k
      seq_per_batch_, batch_per_txn_);
1590
1591
  // Check WriteBufferManager global limit during recovery.
1592
  // When multiple RocksDB instances share a WriteBufferManager, a recovering
1593
  // instance could exceed the global memory limit. Schedule flushes when needed
1594
  // to prevent OOM during WAL recovery.
1595
  //
1596
  // Skip scheduling in read-only mode since flushes cannot be performed and
1597
  // the scheduler would never be drained, causing assertion failures on
1598
  // duplicate ScheduleWork() calls.
1599
  //
1600
  // TODO: Currently we schedule all CFs with non-empty memtables for flush
1601
  // (similar to the atomic_flush=false path in the normal write flow). This
1602
  // may produce more, smaller L0 files in some CFs. A future improvement
1603
  // could flush only the oldest CF or pick CFs more selectively to reduce
1604
  // unnecessary L0 file creation.
1605
489k
  if (status.ok() && *has_valid_writes && !read_only &&
1606
475k
      immutable_db_options_.enforce_write_buffer_manager_during_recovery &&
1607
475k
      write_buffer_manager_ != nullptr &&
1608
475k
      write_buffer_manager_->ShouldFlush()) {
1609
0
    for (auto cfd : *versions_->GetColumnFamilySet()) {
1610
0
      if (cfd->mem() != nullptr && cfd->mem()->GetFirstSequenceNumber() != 0 &&
1611
0
          !cfd->mem()->HasFlushScheduled()) {
1612
0
        cfd->mem()->MarkFlushScheduled();
1613
0
        flush_scheduler_.ScheduleWork(cfd);
1614
0
      }
1615
0
    }
1616
0
  }
1617
1618
489k
  return status;
1619
489k
}
1620
1621
Status DBImpl::MaybeWriteLevel0TableForRecovery(
1622
    bool has_valid_writes, bool read_only, uint64_t wal_number, int job_id,
1623
    SequenceNumber const* const next_sequence,
1624
489k
    std::unordered_map<int, VersionEdit>* version_edits, bool* flushed) {
1625
489k
  assert(next_sequence);
1626
489k
  assert(version_edits);
1627
489k
  assert(flushed);
1628
1629
489k
  Status status;
1630
489k
  if (has_valid_writes && !read_only) {
1631
    // we can do this because this is called before client has access to the
1632
    // DB and there is only a single thread operating on DB
1633
475k
    ColumnFamilyData* cfd;
1634
1635
475k
    while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1636
0
      cfd->UnrefAndTryDelete();
1637
      // If this asserts, it means that InsertInto failed in
1638
      // filtering updates to already-flushed column families
1639
0
      assert(cfd->GetLogNumber() <= wal_number);
1640
0
      (void)wal_number;
1641
0
      auto iter = version_edits->find(cfd->GetID());
1642
0
      assert(iter != version_edits->end());
1643
0
      VersionEdit* edit = &iter->second;
1644
0
      status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1645
0
      if (!status.ok()) {
1646
        // Reflect errors immediately so that conditions like full
1647
        // file-systems cause the DB::Open() to fail.
1648
0
        return status;
1649
0
      }
1650
0
      *flushed = true;
1651
1652
0
      cfd->CreateNewMemtable(*next_sequence - 1);
1653
0
    }
1654
475k
  }
1655
489k
  return status;
1656
489k
}
1657
1658
Status DBImpl::HandleNonOkStatusOrOldLogRecord(
1659
    uint64_t wal_number, SequenceNumber const* const next_sequence,
1660
    Status status, const DBOpenLogRecordReadReporter& reporter,
1661
    bool* old_log_record, bool* stop_replay_for_corruption,
1662
0
    uint64_t* corrupted_wal_number, bool* corrupted_wal_found) {
1663
0
  assert(!status.ok() || *old_log_record);
1664
1665
0
  assert(next_sequence);
1666
0
  assert(old_log_record);
1667
0
  assert(stop_replay_for_corruption);
1668
0
  assert(corrupted_wal_number);
1669
1670
0
  if (status.IsNotSupported()) {
1671
    // We should not treat NotSupported as corruption. It is rather a clear
1672
    // sign that we are processing a WAL that is produced by an incompatible
1673
    // version of the code.
1674
0
    return status;
1675
0
  }
1676
1677
0
  if (immutable_db_options_.wal_recovery_mode ==
1678
0
      WALRecoveryMode::kSkipAnyCorruptedRecords) {
1679
    // We should ignore all errors unconditionally
1680
0
    return Status::OK();
1681
0
  } else if (immutable_db_options_.wal_recovery_mode ==
1682
0
             WALRecoveryMode::kPointInTimeRecovery) {
1683
0
    if (status.IsIOError()) {
1684
0
      ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1685
0
                      "IOError during point-in-time reading log #%" PRIu64
1686
0
                      " seq #%" PRIu64
1687
0
                      ". %s. This likely mean loss of synced WAL, "
1688
0
                      "thus recovery fails.",
1689
0
                      wal_number, *next_sequence, status.ToString().c_str());
1690
0
      return status;
1691
0
    }
1692
    // We should ignore the error but not continue replaying
1693
0
    *old_log_record = false;
1694
0
    *stop_replay_for_corruption = true;
1695
    // TODO(hx235): have a single source of corrupted WAL number once we
1696
    // consolidate the statuses
1697
0
    uint64_t reporter_corrupted_wal_number = reporter.GetCorruptedLogNumber();
1698
0
    *corrupted_wal_number = reporter_corrupted_wal_number != kMaxSequenceNumber
1699
0
                                ? reporter_corrupted_wal_number
1700
0
                                : wal_number;
1701
0
    if (corrupted_wal_found != nullptr) {
1702
0
      *corrupted_wal_found = true;
1703
0
    }
1704
0
    return Status::OK();
1705
0
  } else {
1706
0
    assert(immutable_db_options_.wal_recovery_mode ==
1707
0
               WALRecoveryMode::kTolerateCorruptedTailRecords ||
1708
0
           immutable_db_options_.wal_recovery_mode ==
1709
0
               WALRecoveryMode::kAbsoluteConsistency);
1710
0
    return status;
1711
0
  }
1712
0
}
1713
1714
Status DBImpl::UpdatePredecessorWALInfo(
1715
    uint64_t wal_number, const SequenceNumber last_seqno_observed,
1716
34.7k
    const std::string& fname, PredecessorWALInfo& predecessor_wal_info) {
1717
34.7k
  uint64_t bytes;
1718
1719
34.7k
  Status s = env_->GetFileSize(fname, &bytes);
1720
34.7k
  if (!s.ok()) {
1721
0
    return s;
1722
0
  }
1723
1724
34.7k
  SequenceNumber mock_seqno = kMaxSequenceNumber;
1725
34.7k
  [[maybe_unused]] std::pair<uint64_t, SequenceNumber*> pair =
1726
34.7k
      std::make_pair(wal_number, &mock_seqno);
1727
34.7k
  TEST_SYNC_POINT_CALLBACK("DBImpl::UpdatePredecessorWALInfo", &pair);
1728
34.7k
  predecessor_wal_info = PredecessorWALInfo(
1729
34.7k
      wal_number, bytes,
1730
34.7k
      mock_seqno != kMaxSequenceNumber ? mock_seqno : last_seqno_observed);
1731
1732
34.7k
  return s;
1733
34.7k
}
1734
1735
void DBImpl::FinishLogFileProcessing(const Status& status,
1736
34.7k
                                     const SequenceNumber* next_sequence) {
1737
34.7k
  if (status.ok()) {
1738
34.7k
    assert(next_sequence);
1739
34.7k
    flush_scheduler_.Clear();
1740
34.7k
    trim_history_scheduler_.Clear();
1741
34.7k
    auto last_sequence = *next_sequence - 1;
1742
34.7k
    if ((*next_sequence != kMaxSequenceNumber) &&
1743
25.7k
        (versions_->LastSequence() <= last_sequence)) {
1744
25.7k
      versions_->SetLastAllocatedSequence(last_sequence);
1745
25.7k
      versions_->SetLastPublishedSequence(last_sequence);
1746
25.7k
      versions_->SetLastSequence(last_sequence);
1747
25.7k
    }
1748
34.7k
  }
1749
34.7k
}
1750
1751
Status DBImpl::MaybeHandleStopReplayForCorruptionForInconsistency(
1752
34.7k
    bool stop_replay_for_corruption, uint64_t corrupted_wal_number) {
1753
34.7k
  Status status;
1754
1755
  // Compare the corrupted log number to all columnfamily's current log number.
1756
  // Abort Open() if any column family's log number is greater than
1757
  // the corrupted log number, which means CF contains data beyond the point of
1758
  // corruption. This could during PIT recovery when the WAL is corrupted and
1759
  // some (but not all) CFs are flushed
1760
  // Exclude the PIT case where no log is dropped after the corruption point.
1761
  // This is to cover the case for empty wals after corrupted log, in which we
1762
  // don't reset stop_replay_for_corruption.
1763
34.7k
  if (stop_replay_for_corruption == true &&
1764
0
      (immutable_db_options_.wal_recovery_mode ==
1765
0
           WALRecoveryMode::kPointInTimeRecovery ||
1766
0
       immutable_db_options_.wal_recovery_mode ==
1767
0
           WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1768
0
    for (auto cfd : *versions_->GetColumnFamilySet()) {
1769
      // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
1770
      // the CF is still consistent: If a new column family is created during
1771
      // the flush and the WAL sync fails at the same time, the new CF points to
1772
      // the new WAL but the old WAL is curropted. Since the new CF is empty, it
1773
      // is still consistent. We add the check of CF sst file size to avoid the
1774
      // false positive alert.
1775
1776
      // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
1777
      // the ignorance of a very rare inconsistency case caused in data
1778
      // canclation. One CF is empty due to KV deletion. But those operations
1779
      // are in the WAL. If the WAL is corrupted, the status of this CF might
1780
      // not be consistent with others. However, the consistency check will be
1781
      // bypassed due to empty CF.
1782
      // TODO: a better and complete implementation is needed to ensure strict
1783
      // consistency check in WAL recovery including hanlding the tailing
1784
      // issues.
1785
0
      if (cfd->GetLogNumber() > corrupted_wal_number &&
1786
0
          cfd->GetLiveSstFilesSize() > 0) {
1787
0
        ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1788
0
                        "Column family inconsistency: SST file contains data"
1789
0
                        " beyond the point of corruption.");
1790
0
        status = Status::Corruption(
1791
0
            "Column family inconsistency: SST file contains data"
1792
0
            " beyond the point of corruption in CF " +
1793
0
            cfd->GetName() +
1794
0
            ". WAL recovery stopped at corruption point, but SST files"
1795
0
            " contain newer data.");
1796
0
        return status;
1797
0
      }
1798
0
    }
1799
0
  }
1800
34.7k
  return status;
1801
34.7k
}
1802
1803
Status DBImpl::MaybeFlushFinalMemtableOrRestoreActiveLogFiles(
1804
    const std::vector<uint64_t>& wal_numbers, bool read_only, int job_id,
1805
    bool flushed, std::unordered_map<int, VersionEdit>* version_edits,
1806
34.7k
    RecoveryContext* recovery_ctx) {
1807
34.7k
  assert(version_edits);
1808
1809
34.7k
  Status status;
1810
  // True if there's any data in the WALs; if not, we can skip re-processing
1811
  // them later
1812
34.7k
  bool data_seen = false;
1813
34.7k
  if (!read_only) {
1814
    // no need to refcount since client still doesn't have access
1815
    // to the DB and can not drop column families while we iterate
1816
34.7k
    const WalNumber max_wal_number = wal_numbers.back();
1817
50.0k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
1818
50.0k
      auto iter = version_edits->find(cfd->GetID());
1819
50.0k
      assert(iter != version_edits->end());
1820
50.0k
      VersionEdit* edit = &iter->second;
1821
1822
50.0k
      if (cfd->GetLogNumber() > max_wal_number) {
1823
        // Column family cfd has already flushed the data
1824
        // from all wals. Memtable has to be empty because
1825
        // we filter the updates based on wal_number
1826
        // (in WriteBatch::InsertInto)
1827
0
        assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1828
0
        assert(edit->NumEntries() == 0);
1829
0
        continue;
1830
0
      }
1831
1832
50.0k
      TEST_SYNC_POINT_CALLBACK(
1833
50.0k
          "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1834
1835
      // flush the final memtable (if non-empty)
1836
50.0k
      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1837
        // If flush happened in the middle of recovery (e.g. due to memtable
1838
        // being full), we flush at the end. Otherwise we'll need to record
1839
        // where we were on last flush, which make the logic complicated.
1840
14.6k
        if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1841
14.6k
          status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1842
14.6k
          if (!status.ok()) {
1843
            // Recovery failed
1844
0
            break;
1845
0
          }
1846
14.6k
          flushed = true;
1847
1848
14.6k
          cfd->CreateNewMemtable(versions_->LastSequence());
1849
14.6k
        }
1850
14.6k
        data_seen = true;
1851
14.6k
      }
1852
1853
      // Update the log number info in the version edit corresponding to this
1854
      // column family. Note that the version edits will be written to MANIFEST
1855
      // together later.
1856
      // writing wal_number in the manifest means that any log file
1857
      // with number strongly less than (wal_number + 1) is already
1858
      // recovered and should be ignored on next reincarnation.
1859
      // Since we already recovered max_wal_number, we want all wals
1860
      // with numbers `<= max_wal_number` (includes this one) to be ignored
1861
50.0k
      if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1862
50.0k
        edit->SetLogNumber(max_wal_number + 1);
1863
50.0k
      }
1864
50.0k
    }
1865
34.7k
    if (status.ok()) {
1866
      // we must mark the next log number as used, even though it's
1867
      // not actually used. that is because VersionSet assumes
1868
      // VersionSet::next_file_number_ always to be strictly greater than any
1869
      // log number
1870
34.7k
      versions_->MarkFileNumberUsed(max_wal_number + 1);
1871
34.7k
      assert(recovery_ctx != nullptr);
1872
1873
50.0k
      for (auto* cfd : *versions_->GetColumnFamilySet()) {
1874
50.0k
        auto iter = version_edits->find(cfd->GetID());
1875
50.0k
        assert(iter != version_edits->end());
1876
50.0k
        recovery_ctx->UpdateVersionEdits(cfd, iter->second);
1877
50.0k
      }
1878
1879
34.7k
      if (flushed || !data_seen) {
1880
34.7k
        VersionEdit wal_deletion;
1881
34.7k
        if (immutable_db_options_.track_and_verify_wals_in_manifest) {
1882
0
          wal_deletion.DeleteWalsBefore(max_wal_number + 1);
1883
0
        }
1884
34.7k
        if (!allow_2pc()) {
1885
          // In non-2pc mode, flushing the memtables of the column families
1886
          // means we can advance min_log_number_to_keep.
1887
34.7k
          wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
1888
34.7k
        }
1889
34.7k
        assert(versions_->GetColumnFamilySet() != nullptr);
1890
34.7k
        recovery_ctx->UpdateVersionEdits(
1891
34.7k
            versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
1892
34.7k
      }
1893
34.7k
    }
1894
34.7k
  }
1895
1896
34.7k
  if (status.ok()) {
1897
34.7k
    if (data_seen && !flushed) {
1898
0
      status = RestoreAliveLogFiles(wal_numbers);
1899
34.7k
    } else if (!wal_numbers.empty()) {  // If there's no data in the WAL, or we
1900
                                        // flushed all the data, still
1901
      // truncate the log file. If the process goes into a crash loop before
1902
      // the file is deleted, the preallocated space will never get freed.
1903
34.7k
      const bool truncate = !read_only;
1904
34.7k
      GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
1905
34.7k
          .PermitUncheckedError();
1906
34.7k
    }
1907
34.7k
  }
1908
34.7k
  return status;
1909
34.7k
}
1910
1911
Status DBImpl::CheckSeqnoNotSetBackDuringRecovery(
1912
524k
    SequenceNumber prev_next_seqno, SequenceNumber current_next_seqno) {
1913
524k
  if (prev_next_seqno == kMaxSequenceNumber ||
1914
524k
      prev_next_seqno <= current_next_seqno) {
1915
524k
    return Status::OK();
1916
524k
  }
1917
0
  std::string msg =
1918
0
      "Sequence number is being set backwards during recovery, this is likely "
1919
0
      "a software bug or a data corruption. Prev next seqno: " +
1920
0
      std::to_string(prev_next_seqno) +
1921
0
      " , current next seqno: " + std::to_string(current_next_seqno);
1922
0
  return Status::Corruption(msg);
1923
524k
}
1924
1925
34.7k
void DBImpl::FinishLogFilesRecovery(int job_id, const Status& status) {
1926
34.7k
  event_logger_.Log() << "job" << job_id << "event"
1927
34.7k
                      << (status.ok() ? "recovery_finished" : "recovery_failed")
1928
34.7k
                      << "status" << status.ToString();
1929
34.7k
}
1930
1931
Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
1932
34.7k
                                          WalFileNumberSize* log_ptr) {
1933
34.7k
  WalFileNumberSize log(wal_number);
1934
34.7k
  std::string fname =
1935
34.7k
      LogFileName(immutable_db_options_.GetWalDir(), wal_number);
1936
34.7k
  Status s;
1937
  // This gets the appear size of the wals, not including preallocated space.
1938
34.7k
  s = env_->GetFileSize(fname, &log.size);
1939
34.7k
  TEST_SYNC_POINT_CALLBACK("DBImpl::GetLogSizeAndMaybeTruncate:0", /*arg=*/&s);
1940
34.7k
  if (s.ok() && truncate) {
1941
34.7k
    std::unique_ptr<FSWritableFile> last_log;
1942
34.7k
    Status truncate_status = fs_->ReopenWritableFile(
1943
34.7k
        fname,
1944
34.7k
        fs_->OptimizeForLogWrite(
1945
34.7k
            file_options_,
1946
34.7k
            BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1947
34.7k
        &last_log, nullptr);
1948
34.7k
    if (truncate_status.ok()) {
1949
34.7k
      truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1950
34.7k
    }
1951
34.7k
    if (truncate_status.ok()) {
1952
34.7k
      truncate_status = last_log->Close(IOOptions(), nullptr);
1953
34.7k
    }
1954
    // Not a critical error if fail to truncate.
1955
34.7k
    if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
1956
0
      ROCKS_LOG_WARN(immutable_db_options_.info_log,
1957
0
                     "Failed to truncate log #%" PRIu64 ": %s", wal_number,
1958
0
                     truncate_status.ToString().c_str());
1959
0
    }
1960
34.7k
  }
1961
34.7k
  if (log_ptr) {
1962
0
    *log_ptr = log;
1963
0
  }
1964
34.7k
  return s;
1965
34.7k
}
1966
1967
0
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
1968
0
  if (wal_numbers.empty()) {
1969
0
    return Status::OK();
1970
0
  }
1971
0
  Status s;
1972
0
  mutex_.AssertHeld();
1973
0
  assert(immutable_db_options_.avoid_flush_during_recovery);
1974
  // Mark these as alive so they'll be considered for deletion later by
1975
  // FindObsoleteFiles()
1976
0
  wals_total_size_.StoreRelaxed(0);
1977
0
  wal_empty_ = false;
1978
0
  uint64_t min_wal_with_unflushed_data =
1979
0
      versions_->MinLogNumberWithUnflushedData();
1980
0
  for (auto wal_number : wal_numbers) {
1981
0
    if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
1982
      // In non-2pc mode, the WAL files not backing unflushed data are not
1983
      // alive, thus should not be added to the alive_wal_files_.
1984
0
      continue;
1985
0
    }
1986
    // We preallocate space for wals, but then after a crash and restart, those
1987
    // preallocated space are not needed anymore. It is likely only the last
1988
    // log has such preallocated space, so we only truncate for the last log.
1989
0
    WalFileNumberSize log;
1990
0
    s = GetLogSizeAndMaybeTruncate(
1991
0
        wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
1992
0
    if (!s.ok()) {
1993
0
      break;
1994
0
    }
1995
0
    wals_total_size_.FetchAddRelaxed(log.size);
1996
0
    alive_wal_files_.push_back(log);
1997
0
  }
1998
0
  return s;
1999
0
}
2000
2001
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
2002
14.6k
                                           MemTable* mem, VersionEdit* edit) {
2003
14.6k
  mutex_.AssertHeld();
2004
14.6k
  assert(cfd);
2005
14.6k
  assert(cfd->imm());
2006
  // The immutable memtable list must be empty.
2007
14.6k
  assert(std::numeric_limits<uint64_t>::max() ==
2008
14.6k
         cfd->imm()->GetEarliestMemTableID());
2009
2010
14.6k
  const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
2011
2012
14.6k
  FileMetaData meta;
2013
14.6k
  std::vector<BlobFileAddition> blob_file_additions;
2014
2015
14.6k
  std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
2016
14.6k
      new std::list<uint64_t>::iterator(
2017
14.6k
          CaptureCurrentFileNumberInPendingOutputs()));
2018
14.6k
  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
2019
14.6k
  ReadOptions ro;
2020
14.6k
  ro.total_order_seek = true;
2021
14.6k
  ro.io_activity = Env::IOActivity::kDBOpen;
2022
14.6k
  Arena arena;
2023
14.6k
  Status s;
2024
14.6k
  TableProperties table_properties;
2025
14.6k
  const auto* ucmp = cfd->internal_comparator().user_comparator();
2026
14.6k
  assert(ucmp);
2027
14.6k
  const size_t ts_sz = ucmp->timestamp_size();
2028
14.6k
  const bool logical_strip_timestamp =
2029
14.6k
      ts_sz > 0 && !cfd->ioptions().persist_user_defined_timestamps;
2030
  // Note that here we treat flush as level 0 compaction in internal stats
2031
14.6k
  InternalStats::CompactionStats flush_stats(CompactionReason::kFlush,
2032
14.6k
                                             1 /* count */);
2033
14.6k
  {
2034
14.6k
    ScopedArenaPtr<InternalIterator> iter(
2035
14.6k
        logical_strip_timestamp
2036
14.6k
            ? mem->NewTimestampStrippingIterator(
2037
0
                  ro, /*seqno_to_time_mapping=*/nullptr, &arena,
2038
0
                  /*prefix_extractor=*/nullptr, ts_sz)
2039
14.6k
            : mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena,
2040
14.6k
                               /*prefix_extractor=*/nullptr,
2041
14.6k
                               /*for_flush=*/true));
2042
14.6k
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
2043
14.6k
                    "[%s] [WriteLevel0TableForRecovery]"
2044
14.6k
                    " Level-0 table #%" PRIu64 ": started",
2045
14.6k
                    cfd->GetName().c_str(), meta.fd.GetNumber());
2046
2047
    // Get the latest mutable cf options while the mutex is still locked
2048
14.6k
    const MutableCFOptions mutable_cf_options_copy =
2049
14.6k
        cfd->GetLatestMutableCFOptions();
2050
14.6k
    bool paranoid_file_checks =
2051
14.6k
        cfd->GetLatestMutableCFOptions().paranoid_file_checks;
2052
2053
14.6k
    int64_t _current_time = 0;
2054
14.6k
    immutable_db_options_.clock->GetCurrentTime(&_current_time)
2055
14.6k
        .PermitUncheckedError();  // ignore error
2056
14.6k
    const uint64_t current_time = static_cast<uint64_t>(_current_time);
2057
14.6k
    meta.oldest_ancester_time = current_time;
2058
14.6k
    meta.epoch_number = cfd->NewEpochNumber();
2059
14.6k
    {
2060
14.6k
      auto write_hint = cfd->current()->storage_info()->CalculateSSTWriteHint(
2061
14.6k
          /*level=*/0,
2062
14.6k
          immutable_db_options_.calculate_sst_write_lifetime_hint_set);
2063
14.6k
      mutex_.Unlock();
2064
2065
14.6k
      SequenceNumber earliest_write_conflict_snapshot;
2066
14.6k
      std::vector<SequenceNumber> snapshot_seqs =
2067
14.6k
          snapshots_.GetAll(&earliest_write_conflict_snapshot);
2068
14.6k
      SequenceNumber earliest_snapshot =
2069
14.6k
          (snapshot_seqs.empty() ? kMaxSequenceNumber : snapshot_seqs.at(0));
2070
14.6k
      auto snapshot_checker = snapshot_checker_.get();
2071
14.6k
      if (use_custom_gc_ && snapshot_checker == nullptr) {
2072
0
        snapshot_checker = DisableGCSnapshotChecker::Instance();
2073
0
      }
2074
14.6k
      std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
2075
14.6k
          range_del_iters;
2076
14.6k
      auto range_del_iter =
2077
14.6k
          logical_strip_timestamp
2078
14.6k
              ? mem->NewTimestampStrippingRangeTombstoneIterator(
2079
0
                    ro, kMaxSequenceNumber, ts_sz)
2080
              // This is called during recovery, where a live memtable is
2081
              // flushed directly. In this case, no fragmented tombstone list is
2082
              // cached in this memtable yet.
2083
14.6k
              : mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
2084
14.6k
                                               false /* immutable_memtable */);
2085
14.6k
      if (range_del_iter != nullptr) {
2086
2.55k
        range_del_iters.emplace_back(range_del_iter);
2087
2.55k
      }
2088
2089
14.6k
      IOStatus io_s;
2090
14.6k
      const ReadOptions read_option(Env::IOActivity::kDBOpen);
2091
14.6k
      const WriteOptions write_option(Env::IO_HIGH, Env::IOActivity::kDBOpen);
2092
2093
14.6k
      TableBuilderOptions tboptions(
2094
14.6k
          cfd->ioptions(), mutable_cf_options_copy, read_option, write_option,
2095
14.6k
          cfd->internal_comparator(), cfd->internal_tbl_prop_coll_factories(),
2096
14.6k
          GetCompressionFlush(cfd->ioptions(), mutable_cf_options_copy),
2097
14.6k
          mutable_cf_options_copy.compression_opts, cfd->GetID(),
2098
14.6k
          cfd->GetName(), 0 /* level */, current_time /* newest_key_time */,
2099
14.6k
          false /* is_bottommost */, TableFileCreationReason::kRecovery,
2100
14.6k
          0 /* oldest_key_time */, 0 /* file_creation_time */, db_id_,
2101
14.6k
          db_session_id_, 0 /* target_file_size */, meta.fd.GetNumber(),
2102
14.6k
          kMaxSequenceNumber);
2103
14.6k
      Version* version = cfd->current();
2104
14.6k
      version->Ref();
2105
14.6k
      TableProperties temp_table_proerties;
2106
14.6k
      s = BuildTable(
2107
14.6k
          dbname_, versions_.get(), immutable_db_options_, tboptions,
2108
14.6k
          file_options_for_compaction_, cfd->table_cache(), iter.get(),
2109
14.6k
          std::move(range_del_iters), &meta, &blob_file_additions,
2110
14.6k
          snapshot_seqs, earliest_snapshot, earliest_write_conflict_snapshot,
2111
14.6k
          kMaxSequenceNumber, snapshot_checker, paranoid_file_checks,
2112
14.6k
          cfd->internal_stats(), &io_s, io_tracer_,
2113
14.6k
          BlobFileCreationReason::kRecovery,
2114
14.6k
          nullptr /* seqno_to_time_mapping */, &event_logger_, job_id,
2115
14.6k
          &temp_table_proerties /* table_properties */, write_hint,
2116
14.6k
          nullptr /*full_history_ts_low*/, &blob_callback_, version,
2117
14.6k
          nullptr /* memtable_payload_bytes */,
2118
14.6k
          nullptr /* memtable_garbage_bytes */, &flush_stats);
2119
14.6k
      version->Unref();
2120
14.6k
      LogFlush(immutable_db_options_.info_log);
2121
14.6k
      ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
2122
14.6k
                      "[%s] [WriteLevel0TableForRecovery]"
2123
14.6k
                      " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
2124
14.6k
                      cfd->GetName().c_str(), meta.fd.GetNumber(),
2125
14.6k
                      meta.fd.GetFileSize(), s.ToString().c_str());
2126
14.6k
      mutex_.Lock();
2127
2128
      // TODO(AR) is this ok?
2129
14.6k
      if (!io_s.ok() && s.ok()) {
2130
0
        s = io_s;
2131
0
      }
2132
2133
14.6k
      uint64_t total_num_entries = mem->NumEntries();
2134
14.6k
      if (s.ok() && total_num_entries != flush_stats.num_input_records) {
2135
0
        std::string msg = "Expected " + std::to_string(total_num_entries) +
2136
0
                          " entries in memtable, but read " +
2137
0
                          std::to_string(flush_stats.num_input_records);
2138
0
        ROCKS_LOG_WARN(immutable_db_options_.info_log,
2139
0
                       "[%s] [JOB %d] Level-0 flush during recover: %s",
2140
0
                       cfd->GetName().c_str(), job_id, msg.c_str());
2141
0
        if (immutable_db_options_.flush_verify_memtable_count) {
2142
0
          s = Status::Corruption(msg);
2143
0
        }
2144
0
      }
2145
      // Only verify on table with format collects table properties
2146
14.6k
      const auto& mutable_cf_options = cfd->GetLatestMutableCFOptions();
2147
14.6k
      if (s.ok() &&
2148
14.6k
          (mutable_cf_options.table_factory->IsInstanceOf(
2149
14.6k
               TableFactory::kBlockBasedTableName()) ||
2150
0
           mutable_cf_options.table_factory->IsInstanceOf(
2151
0
               TableFactory::kPlainTableName())) &&
2152
14.6k
          flush_stats.num_output_records != temp_table_proerties.num_entries) {
2153
0
        std::string msg =
2154
0
            "Number of keys in flush output SST files does not match "
2155
0
            "number of keys added to the table. Expected " +
2156
0
            std::to_string(flush_stats.num_output_records) + " but there are " +
2157
0
            std::to_string(temp_table_proerties.num_entries) +
2158
0
            " in output SST files";
2159
0
        ROCKS_LOG_WARN(immutable_db_options_.info_log,
2160
0
                       "[%s] [JOB %d] Level-0 flush during recover: %s",
2161
0
                       cfd->GetName().c_str(), job_id, msg.c_str());
2162
0
        if (immutable_db_options_.flush_verify_memtable_count) {
2163
0
          s = Status::Corruption(msg);
2164
0
        }
2165
0
      }
2166
14.6k
    }
2167
14.6k
  }
2168
14.6k
  ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
2169
2170
  // Note that if file_size is zero, the file has been deleted and
2171
  // should not be added to the manifest.
2172
14.6k
  const bool has_output = meta.fd.GetFileSize() > 0;
2173
2174
14.6k
  constexpr int level = 0;
2175
2176
14.6k
  if (s.ok() && has_output) {
2177
14.6k
    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
2178
14.6k
                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
2179
14.6k
                  meta.fd.smallest_seqno, meta.fd.largest_seqno,
2180
14.6k
                  meta.marked_for_compaction, meta.temperature,
2181
14.6k
                  meta.oldest_blob_file_number, meta.oldest_ancester_time,
2182
14.6k
                  meta.file_creation_time, meta.epoch_number,
2183
14.6k
                  meta.file_checksum, meta.file_checksum_func_name,
2184
14.6k
                  meta.unique_id, meta.compensated_range_deletion_size,
2185
14.6k
                  meta.tail_size, meta.user_defined_timestamps_persisted);
2186
2187
14.6k
    for (const auto& blob : blob_file_additions) {
2188
0
      edit->AddBlobFile(blob);
2189
0
    }
2190
2191
    // For UDT in memtable only feature, move up the cutoff timestamp whenever
2192
    // a flush happens.
2193
14.6k
    if (logical_strip_timestamp) {
2194
0
      Slice mem_newest_udt = mem->GetNewestUDT();
2195
0
      std::string full_history_ts_low = cfd->GetFullHistoryTsLow();
2196
0
      if (full_history_ts_low.empty() ||
2197
0
          ucmp->CompareTimestamp(mem_newest_udt, full_history_ts_low) >= 0) {
2198
0
        std::string new_full_history_ts_low;
2199
0
        GetFullHistoryTsLowFromU64CutoffTs(&mem_newest_udt,
2200
0
                                           &new_full_history_ts_low);
2201
0
        edit->SetFullHistoryTsLow(new_full_history_ts_low);
2202
0
      }
2203
0
    }
2204
14.6k
  }
2205
2206
14.6k
  flush_stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
2207
2208
14.6k
  if (has_output) {
2209
14.6k
    flush_stats.bytes_written = meta.fd.GetFileSize();
2210
14.6k
    flush_stats.num_output_files = 1;
2211
14.6k
  }
2212
2213
14.6k
  const auto& blobs = edit->GetBlobFileAdditions();
2214
14.6k
  for (const auto& blob : blobs) {
2215
0
    flush_stats.bytes_written_blob += blob.GetTotalBlobBytes();
2216
0
  }
2217
2218
14.6k
  flush_stats.num_output_files_blob = static_cast<int>(blobs.size());
2219
2220
14.6k
  cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER,
2221
14.6k
                                            flush_stats);
2222
14.6k
  cfd->internal_stats()->AddCFStats(
2223
14.6k
      InternalStats::BYTES_FLUSHED,
2224
14.6k
      flush_stats.bytes_written + flush_stats.bytes_written_blob);
2225
14.6k
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
2226
14.6k
  return s;
2227
14.6k
}
2228
2229
Status DB::Open(const Options& options, const std::string& dbname,
2230
27.0k
                std::unique_ptr<DB>* dbptr) {
2231
27.0k
  DBOptions db_options(options);
2232
27.0k
  ColumnFamilyOptions cf_options(options);
2233
27.0k
  std::vector<ColumnFamilyDescriptor> column_families;
2234
27.0k
  column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
2235
27.0k
  if (db_options.persist_stats_to_disk) {
2236
0
    column_families.emplace_back(kPersistentStatsColumnFamilyName, cf_options);
2237
0
  }
2238
27.0k
  std::vector<ColumnFamilyHandle*> handles;
2239
27.0k
  Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
2240
27.0k
  if (s.ok()) {
2241
27.0k
    if (db_options.persist_stats_to_disk) {
2242
0
      assert(handles.size() == 2);
2243
27.0k
    } else {
2244
27.0k
      assert(handles.size() == 1);
2245
27.0k
    }
2246
    // i can delete the handle since DBImpl is always holding a reference to
2247
    // default column family
2248
27.0k
    if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
2249
0
      delete handles[1];
2250
0
    }
2251
27.0k
    delete handles[0];
2252
27.0k
  }
2253
27.0k
  return s;
2254
27.0k
}
2255
2256
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
2257
                const std::vector<ColumnFamilyDescriptor>& column_families,
2258
                std::vector<ColumnFamilyHandle*>* handles,
2259
42.2k
                std::unique_ptr<DB>* dbptr) {
2260
42.2k
  const bool kSeqPerBatch = true;
2261
42.2k
  const bool kBatchPerTxn = true;
2262
42.2k
  ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
2263
42.2k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
2264
42.2k
  bool can_retry = false;
2265
42.2k
  Status s;
2266
42.2k
  do {
2267
42.2k
    s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
2268
42.2k
                     !kSeqPerBatch, kBatchPerTxn, can_retry, &can_retry);
2269
42.2k
  } while (!s.ok() && can_retry);
2270
42.2k
  ThreadStatusUtil::ResetThreadStatus();
2271
42.2k
  return s;
2272
42.2k
}
2273
2274
// TODO: Implement the trimming in flush code path.
2275
// TODO: Perform trimming before inserting into memtable during recovery.
2276
// TODO: Pick files with max_timestamp > trim_ts by each file's timestamp meta
2277
// info, and handle only these files to reduce io.
2278
Status DB::OpenAndTrimHistory(
2279
    const DBOptions& db_options, const std::string& dbname,
2280
    const std::vector<ColumnFamilyDescriptor>& column_families,
2281
    std::vector<ColumnFamilyHandle*>* handles, std::unique_ptr<DB>* dbptr,
2282
0
    std::string trim_ts) {
2283
0
  assert(dbptr != nullptr);
2284
0
  assert(handles != nullptr);
2285
0
  auto validate_options = [&db_options] {
2286
0
    if (db_options.avoid_flush_during_recovery) {
2287
0
      return Status::InvalidArgument(
2288
0
          "avoid_flush_during_recovery incompatible with "
2289
0
          "OpenAndTrimHistory");
2290
0
    }
2291
0
    return Status::OK();
2292
0
  };
2293
0
  auto s = validate_options();
2294
0
  if (!s.ok()) {
2295
0
    return s;
2296
0
  }
2297
2298
0
  std::unique_ptr<DB> db;
2299
0
  s = DB::Open(db_options, dbname, column_families, handles, &db);
2300
0
  if (!s.ok()) {
2301
0
    return s;
2302
0
  }
2303
0
  assert(db);
2304
0
  CompactRangeOptions options;
2305
0
  options.bottommost_level_compaction =
2306
0
      BottommostLevelCompaction::kForceOptimized;
2307
0
  auto db_impl = static_cast_with_check<DBImpl>(db.get());
2308
0
  for (auto handle : *handles) {
2309
0
    assert(handle != nullptr);
2310
0
    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
2311
0
    auto cfd = cfh->cfd();
2312
0
    assert(cfd != nullptr);
2313
    // Only compact column families with timestamp enabled
2314
0
    if (cfd->user_comparator() != nullptr &&
2315
0
        cfd->user_comparator()->timestamp_size() > 0) {
2316
0
      s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr,
2317
0
                                        trim_ts);
2318
0
      if (!s.ok()) {
2319
0
        break;
2320
0
      }
2321
0
    }
2322
0
  }
2323
0
  auto clean_op = [&handles, &db] {
2324
0
    for (auto handle : *handles) {
2325
0
      auto temp_s = db->DestroyColumnFamilyHandle(handle);
2326
0
      assert(temp_s.ok());
2327
0
    }
2328
0
    handles->clear();
2329
0
    db.reset();
2330
0
  };
2331
0
  if (!s.ok()) {
2332
0
    clean_op();
2333
0
    return s;
2334
0
  }
2335
2336
0
  *dbptr = std::move(db);
2337
0
  return s;
2338
0
}
2339
2340
IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
2341
                           uint64_t log_file_num, uint64_t recycle_log_number,
2342
                           size_t preallocate_block_size,
2343
                           const PredecessorWALInfo& predecessor_wal_info,
2344
44.0k
                           log::Writer** new_log) {
2345
44.0k
  IOStatus io_s;
2346
44.0k
  std::unique_ptr<FSWritableFile> lfile;
2347
2348
44.0k
  DBOptions db_options =
2349
44.0k
      BuildDBOptions(immutable_db_options_, mutable_db_options_);
2350
44.0k
  FileOptions opt_file_options =
2351
44.0k
      fs_->OptimizeForLogWrite(file_options_, db_options);
2352
44.0k
  opt_file_options.write_hint = CalculateWALWriteHint();
2353
  // DB option takes precedence when not kUnknown
2354
44.0k
  if (immutable_db_options_.wal_write_temperature != Temperature::kUnknown) {
2355
0
    opt_file_options.temperature = immutable_db_options_.wal_write_temperature;
2356
0
  }
2357
44.0k
  std::string wal_dir = immutable_db_options_.GetWalDir();
2358
44.0k
  std::string log_fname = LogFileName(wal_dir, log_file_num);
2359
2360
44.0k
  if (recycle_log_number) {
2361
0
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
2362
0
                   "reusing log %" PRIu64 " from recycle list\n",
2363
0
                   recycle_log_number);
2364
0
    std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
2365
0
    TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
2366
0
    TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
2367
0
    io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
2368
0
                                  &lfile, /*dbg=*/nullptr);
2369
44.0k
  } else {
2370
44.0k
    io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
2371
44.0k
  }
2372
2373
44.0k
  if (io_s.ok()) {
2374
    // Subsequent attempts to override the hint via SetWriteLifeTimeHint
2375
    // with the very same value will be ignored by the fs.
2376
44.0k
    lfile->SetWriteLifeTimeHint(opt_file_options.write_hint);
2377
44.0k
    lfile->SetPreallocationBlockSize(preallocate_block_size);
2378
2379
44.0k
    const auto& listeners = immutable_db_options_.listeners;
2380
44.0k
    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
2381
44.0k
    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
2382
44.0k
        std::move(lfile), log_fname, opt_file_options,
2383
44.0k
        immutable_db_options_.clock, io_tracer_, nullptr /* stats */,
2384
44.0k
        Histograms::HISTOGRAM_ENUM_MAX /* hist_type */, listeners, nullptr,
2385
44.0k
        tmp_set.Contains(FileType::kWalFile),
2386
44.0k
        tmp_set.Contains(FileType::kWalFile)));
2387
44.0k
    *new_log = new log::Writer(std::move(file_writer), log_file_num,
2388
44.0k
                               immutable_db_options_.recycle_log_file_num > 0,
2389
44.0k
                               immutable_db_options_.manual_wal_flush,
2390
44.0k
                               immutable_db_options_.wal_compression,
2391
44.0k
                               immutable_db_options_.track_and_verify_wals);
2392
44.0k
    io_s = (*new_log)->AddCompressionTypeRecord(write_options);
2393
44.0k
    if (io_s.ok()) {
2394
44.0k
      io_s = (*new_log)->MaybeAddPredecessorWALInfo(write_options,
2395
44.0k
                                                    predecessor_wal_info);
2396
44.0k
    }
2397
44.0k
  }
2398
2399
44.0k
  return io_s;
2400
44.0k
}
2401
2402
void DBImpl::TrackExistingDataFiles(
2403
42.2k
    const std::vector<std::string>& existing_data_files) {
2404
42.2k
  TrackOrUntrackFiles(existing_data_files, /*track=*/true);
2405
42.2k
}
2406
2407
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
2408
                    const std::vector<ColumnFamilyDescriptor>& column_families,
2409
                    std::vector<ColumnFamilyHandle*>* handles,
2410
                    std::unique_ptr<DB>* dbptr, const bool seq_per_batch,
2411
                    const bool batch_per_txn, const bool is_retry,
2412
42.2k
                    bool* can_retry) {
2413
42.2k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
2414
42.2k
  const ReadOptions read_options(Env::IOActivity::kDBOpen);
2415
2416
42.2k
  Status s = ValidateOptionsByTable(db_options, column_families);
2417
42.2k
  if (!s.ok()) {
2418
0
    return s;
2419
0
  }
2420
2421
42.2k
  s = ValidateOptions(db_options, column_families);
2422
42.2k
  if (!s.ok()) {
2423
0
    return s;
2424
0
  }
2425
2426
42.2k
  *dbptr = nullptr;
2427
42.2k
  assert(handles);
2428
42.2k
  handles->clear();
2429
2430
42.2k
  size_t max_write_buffer_size = 0;
2431
42.2k
  MinAndMaxPreserveSeconds preserve_info;
2432
57.4k
  for (const auto& cf : column_families) {
2433
57.4k
    max_write_buffer_size =
2434
57.4k
        std::max(max_write_buffer_size, cf.options.write_buffer_size);
2435
57.4k
    preserve_info.Combine(cf.options);
2436
57.4k
  }
2437
2438
42.2k
  auto impl = std::make_unique<DBImpl>(db_options, dbname, seq_per_batch,
2439
42.2k
                                       batch_per_txn);
2440
42.2k
  if (!impl->immutable_db_options_.info_log) {
2441
0
    s = impl->init_logger_creation_s_;
2442
0
    return s;
2443
42.2k
  } else {
2444
42.2k
    assert(impl->init_logger_creation_s_.ok());
2445
42.2k
  }
2446
42.2k
  s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
2447
42.2k
  if (s.ok()) {
2448
42.2k
    std::vector<std::string> paths;
2449
42.2k
    for (auto& db_path : impl->immutable_db_options_.db_paths) {
2450
42.2k
      paths.emplace_back(db_path.path);
2451
42.2k
    }
2452
57.4k
    for (auto& cf : column_families) {
2453
57.4k
      for (auto& cf_path : cf.options.cf_paths) {
2454
0
        paths.emplace_back(cf_path.path);
2455
0
      }
2456
57.4k
    }
2457
42.2k
    for (const auto& path : paths) {
2458
42.2k
      s = impl->env_->CreateDirIfMissing(path);
2459
42.2k
      if (!s.ok()) {
2460
0
        break;
2461
0
      }
2462
42.2k
    }
2463
2464
    // For recovery from NoSpace() error, we can only handle
2465
    // the case where the database is stored in a single path
2466
42.2k
    if (paths.size() <= 1) {
2467
42.2k
      impl->error_handler_.EnableAutoRecovery();
2468
42.2k
    }
2469
42.2k
  }
2470
42.2k
  if (s.ok()) {
2471
42.2k
    s = impl->CreateArchivalDirectory();
2472
42.2k
  }
2473
42.2k
  if (!s.ok()) {
2474
0
    return s;
2475
0
  }
2476
2477
42.2k
  impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
2478
42.2k
  RecoveryContext recovery_ctx;
2479
42.2k
  impl->options_mutex_.Lock();
2480
42.2k
  impl->mutex_.Lock();
2481
2482
  // Handles create_if_missing, error_if_exists
2483
42.2k
  uint64_t recovered_seq(kMaxSequenceNumber);
2484
42.2k
  s = impl->Recover(column_families, false /* read_only */,
2485
42.2k
                    false /* error_if_wal_file_exists */,
2486
42.2k
                    false /* error_if_data_exists_in_wals */, is_retry,
2487
42.2k
                    &recovered_seq, &recovery_ctx, can_retry);
2488
42.2k
  if (s.ok()) {
2489
42.2k
    uint64_t new_log_number = impl->versions_->NewFileNumber();
2490
42.2k
    log::Writer* new_log = nullptr;
2491
42.2k
    const size_t preallocate_block_size =
2492
42.2k
        impl->GetWalPreallocateBlockSize(max_write_buffer_size);
2493
    // TODO(hx235): Pass in the correct `predecessor_wal_info` for the first WAL
2494
    // created during DB open with predecessor WALs from previous DB session due
2495
    // to `avoid_flush_during_recovery == true`. This can protect the last WAL
2496
    // recovered.
2497
42.2k
    s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/,
2498
42.2k
                        preallocate_block_size,
2499
42.2k
                        PredecessorWALInfo() /* predecessor_wal_info */,
2500
42.2k
                        &new_log);
2501
42.2k
    if (s.ok()) {
2502
      // Prevent log files created by previous instance from being recycled.
2503
      // They might be in alive_log_file_, and might get recycled otherwise.
2504
42.2k
      impl->min_wal_number_to_recycle_ = new_log_number;
2505
42.2k
    }
2506
42.2k
    if (s.ok()) {
2507
42.2k
      InstrumentedMutexLock wl(&impl->wal_write_mutex_);
2508
42.2k
      impl->cur_wal_number_ = new_log_number;
2509
42.2k
      assert(new_log != nullptr);
2510
42.2k
      assert(impl->logs_.empty());
2511
42.2k
      impl->logs_.emplace_back(new_log_number, new_log);
2512
42.2k
    }
2513
2514
42.2k
    if (s.ok()) {
2515
42.2k
      impl->alive_wal_files_.emplace_back(impl->cur_wal_number_);
2516
      // In WritePrepared there could be gap in sequence numbers. This breaks
2517
      // the trick we use in kPointInTimeRecovery which assumes the first seq in
2518
      // the log right after the corrupted log is one larger than the last seq
2519
      // we read from the wals. To let this trick keep working, we add a dummy
2520
      // entry with the expected sequence to the first log right after recovery.
2521
      // In non-WritePrepared case also the new log after recovery could be
2522
      // empty, and thus missing the consecutive seq hint to distinguish
2523
      // middle-log corruption to corrupted-log-remained-after-recovery. This
2524
      // case also will be addressed by a dummy write.
2525
42.2k
      if (recovered_seq != kMaxSequenceNumber) {
2526
0
        WriteBatch empty_batch;
2527
0
        WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
2528
0
        uint64_t wal_used, log_size;
2529
0
        log::Writer* log_writer = impl->logs_.back().writer;
2530
0
        WalFileNumberSize& wal_file_number_size = impl->alive_wal_files_.back();
2531
2532
0
        assert(log_writer->get_log_number() == wal_file_number_size.number);
2533
0
        impl->mutex_.AssertHeld();
2534
0
        s = impl->WriteToWAL(empty_batch, write_options, log_writer, &wal_used,
2535
0
                             &log_size, wal_file_number_size, recovered_seq);
2536
0
        if (s.ok()) {
2537
          // Need to fsync, otherwise it might get lost after a power reset.
2538
0
          s = impl->FlushWAL(write_options, false);
2539
0
          TEST_SYNC_POINT_CALLBACK("DBImpl::Open::BeforeSyncWAL", /*arg=*/&s);
2540
0
          IOOptions opts;
2541
0
          if (s.ok()) {
2542
0
            s = WritableFileWriter::PrepareIOOptions(write_options, opts);
2543
0
          }
2544
0
          if (s.ok()) {
2545
0
            s = log_writer->file()->Sync(opts,
2546
0
                                         impl->immutable_db_options_.use_fsync);
2547
0
          }
2548
0
        }
2549
0
      }
2550
42.2k
    }
2551
42.2k
  }
2552
42.2k
  if (s.ok()) {
2553
42.2k
    s = impl->LogAndApplyForRecovery(recovery_ctx);
2554
42.2k
  }
2555
2556
42.2k
  if (s.ok() && !impl->immutable_db_options_.write_identity_file) {
2557
    // On successful recovery, delete an obsolete IDENTITY file to avoid DB ID
2558
    // inconsistency
2559
0
    impl->env_->DeleteFile(IdentityFileName(impl->dbname_))
2560
0
        .PermitUncheckedError();
2561
0
  }
2562
2563
42.2k
  if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
2564
0
    impl->mutex_.AssertHeld();
2565
0
    s = impl->InitPersistStatsColumnFamily();
2566
0
  }
2567
2568
  // After reaching the post-recovery seqno but before creating SuperVersions
2569
  // ensure seqno to time mapping is pre-populated as needed.
2570
42.2k
  if (s.ok() && recovery_ctx.is_new_db_ && preserve_info.IsEnabled()) {
2571
0
    impl->PrepopulateSeqnoToTimeMapping(preserve_info);
2572
0
  }
2573
2574
42.2k
  if (s.ok()) {
2575
    // set column family handles
2576
57.4k
    for (const auto& cf : column_families) {
2577
57.4k
      auto cfd =
2578
57.4k
          impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
2579
57.4k
      if (cfd != nullptr) {
2580
57.4k
        handles->push_back(
2581
57.4k
            new ColumnFamilyHandleImpl(cfd, impl.get(), &impl->mutex_));
2582
57.4k
        impl->NewThreadStatusCfInfo(cfd);
2583
57.4k
        SuperVersionContext sv_context(/* create_superversion */ true);
2584
57.4k
        impl->InstallSuperVersionForConfigChange(cfd, &sv_context);
2585
57.4k
        sv_context.Clean();
2586
57.4k
      } else {
2587
0
        if (db_options.create_missing_column_families) {
2588
          // missing column family, create it
2589
0
          ColumnFamilyHandle* handle = nullptr;
2590
0
          impl->mutex_.Unlock();
2591
          // NOTE: the work normally done in WrapUpCreateColumnFamilies will
2592
          // be done separately below.
2593
          // This includes InstallSuperVersionForConfigChange.
2594
0
          s = impl->CreateColumnFamilyImpl(read_options, write_options,
2595
0
                                           cf.options, cf.name, &handle);
2596
0
          impl->mutex_.Lock();
2597
0
          if (s.ok()) {
2598
0
            handles->push_back(handle);
2599
0
          } else {
2600
0
            break;
2601
0
          }
2602
0
        } else {
2603
0
          s = Status::InvalidArgument("Column family not found", cf.name);
2604
0
          break;
2605
0
        }
2606
0
      }
2607
57.4k
    }
2608
42.2k
  }
2609
2610
42.2k
  if (s.ok()) {
2611
99.6k
    for (size_t i = 0; i < column_families.size(); ++i) {
2612
57.4k
      const auto& cf = column_families[i];
2613
57.4k
      auto* cfd = static_cast<ColumnFamilyHandleImpl*>((*handles)[i])->cfd();
2614
57.4k
      impl->MaybeInitBlobDirectWriteColumnFamily(cfd, cf.options, cf.name);
2615
57.4k
    }
2616
42.2k
  }
2617
2618
42.2k
  if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
2619
    // Install SuperVersion for hidden column family
2620
0
    assert(impl->persist_stats_cf_handle_);
2621
0
    assert(impl->persist_stats_cf_handle_->cfd());
2622
0
    SuperVersionContext sv_context(/* create_superversion */ true);
2623
0
    impl->InstallSuperVersionForConfigChange(
2624
0
        impl->persist_stats_cf_handle_->cfd(), &sv_context);
2625
0
    sv_context.Clean();
2626
    // try to read format version
2627
0
    s = impl->PersistentStatsProcessFormatVersion();
2628
0
  }
2629
2630
42.2k
  if (s.ok()) {
2631
57.4k
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
2632
57.4k
      if (!cfd->mem()->IsSnapshotSupported()) {
2633
0
        impl->is_snapshot_supported_ = false;
2634
0
      }
2635
57.4k
      if (cfd->ioptions().merge_operator != nullptr &&
2636
0
          !cfd->mem()->IsMergeOperatorSupported()) {
2637
0
        s = Status::InvalidArgument(
2638
0
            "The memtable of column family %s does not support merge operator "
2639
0
            "its options.merge_operator is non-null",
2640
0
            cfd->GetName().c_str());
2641
0
      }
2642
57.4k
      if (!s.ok()) {
2643
0
        break;
2644
0
      }
2645
57.4k
    }
2646
42.2k
  }
2647
42.2k
  TEST_SYNC_POINT("DBImpl::Open:Opened");
2648
42.2k
  Status persist_options_status;
2649
42.2k
  if (s.ok()) {
2650
    // Persist RocksDB Options before scheduling the compaction.
2651
    // The WriteOptionsFile() will release and lock the mutex internally.
2652
42.2k
    persist_options_status =
2653
42.2k
        impl->WriteOptionsFile(write_options, true /*db_mutex_already_held*/);
2654
42.2k
    impl->opened_successfully_ = true;
2655
42.2k
  } else {
2656
0
    persist_options_status.PermitUncheckedError();
2657
0
  }
2658
42.2k
  impl->mutex_.Unlock();
2659
2660
42.2k
  auto sfm = static_cast<SstFileManagerImpl*>(
2661
42.2k
      impl->immutable_db_options_.sst_file_manager.get());
2662
42.2k
  if (s.ok() && sfm) {
2663
    // Set Statistics ptr for SstFileManager to dump the stats of
2664
    // DeleteScheduler.
2665
42.2k
    sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
2666
42.2k
    ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
2667
42.2k
                   "SstFileManager instance %p", sfm);
2668
2669
42.2k
    impl->TrackExistingDataFiles(recovery_ctx.existing_data_files_);
2670
2671
    // Reserve some disk buffer space. This is a heuristic - when we run out
2672
    // of disk space, this ensures that there is at least write_buffer_size
2673
    // amount of free space before we resume DB writes. In low disk space
2674
    // conditions, we want to avoid a lot of small L0 files due to frequent
2675
    // WAL write failures and resultant forced flushes
2676
42.2k
    sfm->ReserveDiskBuffer(max_write_buffer_size,
2677
42.2k
                           impl->immutable_db_options_.db_paths[0].path);
2678
42.2k
  }
2679
2680
42.2k
  if (s.ok()) {
2681
    // When the DB is stopped, it's possible that there are some .trash files
2682
    // that were not deleted yet, when we open the DB we will find these .trash
2683
    // files and schedule them to be deleted (or delete immediately if
2684
    // SstFileManager was not used).
2685
    // Note that we only start doing this and below delete obsolete file after
2686
    // `TrackExistingDataFiles` are called, the `max_trash_db_ratio` is
2687
    // ineffective otherwise and these files' deletion won't be rate limited
2688
    // which can cause discard stall.
2689
42.2k
    for (const auto& path : impl->CollectAllDBPaths()) {
2690
42.2k
      DeleteScheduler::CleanupDirectory(impl->immutable_db_options_.env, sfm,
2691
42.2k
                                        path)
2692
42.2k
          .PermitUncheckedError();
2693
42.2k
    }
2694
42.2k
    impl->mutex_.Lock();
2695
    // This will do a full scan.
2696
42.2k
    impl->DeleteObsoleteFiles();
2697
42.2k
    TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
2698
42.2k
    impl->MaybeScheduleFlushOrCompaction();
2699
2700
42.2k
    if (impl->immutable_db_options_.open_files_async) {
2701
0
      impl->ScheduleAsyncFileOpening();
2702
0
    }
2703
42.2k
    impl->mutex_.Unlock();
2704
42.2k
  }
2705
2706
42.2k
  if (s.ok()) {
2707
42.2k
    ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
2708
42.2k
                     impl.get());
2709
42.2k
    LogFlush(impl->immutable_db_options_.info_log);
2710
42.2k
    if (!impl->WALBufferIsEmpty()) {
2711
0
      s = impl->FlushWAL(write_options, false);
2712
0
      if (s.ok()) {
2713
        // Sync is needed otherwise WAL buffered data might get lost after a
2714
        // power reset.
2715
0
        log::Writer* log_writer = impl->logs_.back().writer;
2716
0
        IOOptions opts;
2717
0
        s = WritableFileWriter::PrepareIOOptions(write_options, opts);
2718
0
        if (s.ok()) {
2719
0
          s = log_writer->file()->Sync(opts,
2720
0
                                       impl->immutable_db_options_.use_fsync);
2721
0
        }
2722
0
      }
2723
0
    }
2724
42.2k
    if (s.ok() && !persist_options_status.ok()) {
2725
0
      s = Status::IOError(
2726
0
          "DB::Open() failed --- Unable to persist Options file",
2727
0
          persist_options_status.ToString());
2728
0
    }
2729
42.2k
  }
2730
42.2k
  if (!s.ok()) {
2731
0
    ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
2732
0
                   "DB::Open() failed: %s", s.ToString().c_str());
2733
0
  }
2734
42.2k
  if (s.ok()) {
2735
42.2k
    s = impl->StartPeriodicTaskScheduler();
2736
42.2k
  }
2737
42.2k
  if (s.ok()) {
2738
42.2k
    s = impl->RegisterRecordSeqnoTimeWorker();
2739
42.2k
  }
2740
42.2k
  impl->options_mutex_.Unlock();
2741
42.2k
  if (s.ok()) {
2742
42.2k
    *dbptr = std::move(impl);
2743
42.2k
  } else {
2744
0
    for (auto* h : *handles) {
2745
0
      delete h;
2746
0
    }
2747
0
    handles->clear();
2748
0
  }
2749
42.2k
  return s;
2750
42.2k
}
2751
2752
struct AsyncFileOpenContext {
2753
  DBImpl* db = nullptr;
2754
  FileOptions file_options;
2755
  std::vector<Version*> versions;
2756
2757
0
  AsyncFileOpenContext() = default;
2758
  AsyncFileOpenContext(const AsyncFileOpenContext&) = delete;
2759
  AsyncFileOpenContext& operator=(const AsyncFileOpenContext&) = delete;
2760
  AsyncFileOpenContext(AsyncFileOpenContext&&) = delete;
2761
  AsyncFileOpenContext& operator=(AsyncFileOpenContext&&) = delete;
2762
2763
0
  ~AsyncFileOpenContext() {
2764
0
    db->mutex()->AssertHeld();
2765
0
    for (auto* v : versions) {
2766
      // must unref version before cfd
2767
0
      ColumnFamilyData* cfd = v->cfd();
2768
0
      v->Unref();
2769
0
      cfd->UnrefAndTryDelete();
2770
0
    }
2771
0
  }
2772
};
2773
2774
0
void DBImpl::ScheduleAsyncFileOpening() {
2775
0
  mutex_.AssertHeld();
2776
2777
0
  auto* ctx = new AsyncFileOpenContext();
2778
0
  ctx->db = this;
2779
0
  ctx->file_options = versions_->file_options();
2780
2781
0
  for (auto cfd : *versions_->GetColumnFamilySet()) {
2782
0
    assert(!cfd->IsDropped());
2783
0
    Version* current = cfd->current();
2784
0
    VersionStorageInfo* vstorage = current->storage_info();
2785
0
    bool has_files = false;
2786
0
    for (int level = 0; level < vstorage->num_levels() && !has_files; level++) {
2787
0
      has_files = !vstorage->LevelFiles(level).empty();
2788
0
    }
2789
0
    if (has_files) {
2790
0
      cfd->Ref();
2791
0
      current->Ref();
2792
0
      ctx->versions.push_back(current);
2793
0
    }
2794
0
  }
2795
2796
0
  bg_async_file_open_state_ = AsyncFileOpenState::kScheduled;
2797
2798
  // since this is a one time job, best to schedule it with high priority
2799
0
  env_->Schedule(&DBImpl::BGWorkAsyncFileOpen, ctx, Env::Priority::HIGH,
2800
0
                 nullptr);
2801
0
}
2802
2803
0
void DBImpl::MarkAsyncFileOpenNotNeeded() {
2804
0
  mutex_.AssertHeld();
2805
0
  assert(bg_async_file_open_state_ == AsyncFileOpenState::kNotScheduled);
2806
0
  bg_async_file_open_state_ = AsyncFileOpenState::kComplete;
2807
0
}
2808
2809
0
void DBImpl::BGWorkAsyncFileOpen(void* arg) {
2810
0
  TEST_SYNC_POINT("DBImpl::BGWorkAsyncFileOpen::Start");
2811
2812
0
  AsyncFileOpenContext* raw_ctx = static_cast<AsyncFileOpenContext*>(arg);
2813
0
  DBImpl* db = raw_ctx->db;
2814
2815
0
  auto deleter = [](AsyncFileOpenContext* p) {
2816
0
    auto* dbPtr = p->db;
2817
0
    InstrumentedMutexLock l(&dbPtr->mutex_);
2818
0
    delete p;
2819
0
    dbPtr->bg_async_file_open_state_ = AsyncFileOpenState::kComplete;
2820
0
    dbPtr->bg_cv_.SignalAll();
2821
0
  };
2822
0
  std::unique_ptr<AsyncFileOpenContext, decltype(deleter)> ctx(raw_ctx,
2823
0
                                                               deleter);
2824
2825
0
  ReadOptions ro;
2826
0
  for (size_t i = 0; i < ctx->versions.size(); i++) {
2827
0
    auto* version = ctx->versions[i];
2828
0
    ColumnFamilyData* cfd = version->cfd();
2829
2830
    // Skip column families that were dropped after scheduling
2831
0
    if (cfd->IsDropped()) {
2832
0
      continue;
2833
0
    }
2834
2835
0
    VersionStorageInfo* vstorage = version->storage_info();
2836
2837
0
    MutableCFOptions mutable_cf_options;
2838
0
    {
2839
0
      InstrumentedMutexLock l(&db->mutex_);
2840
0
      mutable_cf_options = cfd->GetLatestMutableCFOptions();
2841
0
    }
2842
0
    size_t max_file_size_for_l0_meta_pin =
2843
0
        MaxFileSizeForL0MetaPin(mutable_cf_options);
2844
2845
0
    std::vector<std::pair<FileMetaData*, int>> files_meta;
2846
0
    for (int level = 0; level < vstorage->num_levels(); level++) {
2847
0
      for (FileMetaData* file_meta : vstorage->LevelFiles(level)) {
2848
0
        files_meta.emplace_back(file_meta, level);
2849
0
      }
2850
0
    }
2851
2852
0
    Status s = LoadTableHandlersHelper(
2853
0
        files_meta, cfd->table_cache(), ctx->file_options,
2854
0
        *vstorage->InternalComparator(), cfd->internal_stats(),
2855
0
        db->immutable_db_options_.max_file_opening_threads,
2856
0
        false /* prefetch_index_and_filter_in_cache */, mutable_cf_options,
2857
0
        max_file_size_for_l0_meta_pin, ro, &db->shutting_down_);
2858
0
    if (!s.ok()) {
2859
0
      ROCKS_LOG_ERROR(
2860
0
          db->immutable_db_options_.info_log,
2861
0
          "BGWorkAsyncFileOpen: LoadTableHandlers failed for CF %s: "
2862
0
          "%s",
2863
0
          cfd->GetName().c_str(), s.ToString().c_str());
2864
0
      InstrumentedMutexLock l(&db->mutex_);
2865
0
      db->error_handler_.SetBGError(s, BackgroundErrorReason::kAsyncFileOpen);
2866
0
      break;
2867
0
    }
2868
0
  }
2869
0
  TEST_SYNC_POINT("DBImpl::BGWorkAsyncFileOpen:Done");
2870
0
}
2871
2872
}  // namespace ROCKSDB_NAMESPACE