Coverage Report

Created: 2024-07-17 06:46

/src/rocksdb/db/db_impl/db_impl_open.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
#include <cinttypes>
10
11
#include "db/builder.h"
12
#include "db/db_impl/db_impl.h"
13
#include "db/error_handler.h"
14
#include "db/periodic_task_scheduler.h"
15
#include "env/composite_env_wrapper.h"
16
#include "file/filename.h"
17
#include "file/read_write_util.h"
18
#include "file/sst_file_manager_impl.h"
19
#include "file/writable_file_writer.h"
20
#include "logging/logging.h"
21
#include "monitoring/persistent_stats_history.h"
22
#include "monitoring/thread_status_util.h"
23
#include "options/options_helper.h"
24
#include "rocksdb/options.h"
25
#include "rocksdb/table.h"
26
#include "rocksdb/wal_filter.h"
27
#include "test_util/sync_point.h"
28
#include "util/rate_limiter_impl.h"
29
#include "util/string_util.h"
30
#include "util/udt_util.h"
31
32
namespace ROCKSDB_NAMESPACE {
33
Options SanitizeOptions(const std::string& dbname, const Options& src,
34
1.50k
                        bool read_only, Status* logger_creation_s) {
35
1.50k
  auto db_options =
36
1.50k
      SanitizeOptions(dbname, DBOptions(src), read_only, logger_creation_s);
37
1.50k
  ImmutableDBOptions immutable_db_options(db_options);
38
1.50k
  auto cf_options =
39
1.50k
      SanitizeOptions(immutable_db_options, ColumnFamilyOptions(src));
40
1.50k
  return Options(db_options, cf_options);
41
1.50k
}
42
43
DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
44
13.0k
                          bool read_only, Status* logger_creation_s) {
45
13.0k
  DBOptions result(src);
46
47
13.0k
  if (result.env == nullptr) {
48
0
    result.env = Env::Default();
49
0
  }
50
51
  // result.max_open_files means an "infinite" open files.
52
13.0k
  if (result.max_open_files != -1) {
53
0
    int max_max_open_files = port::GetMaxOpenFiles();
54
0
    if (max_max_open_files == -1) {
55
0
      max_max_open_files = 0x400000;
56
0
    }
57
0
    ClipToRange(&result.max_open_files, 20, max_max_open_files);
58
0
    TEST_SYNC_POINT_CALLBACK("SanitizeOptions::AfterChangeMaxOpenFiles",
59
0
                             &result.max_open_files);
60
0
  }
61
62
13.0k
  if (result.info_log == nullptr && !read_only) {
63
13.0k
    Status s = CreateLoggerFromOptions(dbname, result, &result.info_log);
64
13.0k
    if (!s.ok()) {
65
      // No place suitable for logging
66
0
      result.info_log = nullptr;
67
0
      if (logger_creation_s) {
68
0
        *logger_creation_s = s;
69
0
      }
70
0
    }
71
13.0k
  }
72
73
13.0k
  if (!result.write_buffer_manager) {
74
13.0k
    result.write_buffer_manager.reset(
75
13.0k
        new WriteBufferManager(result.db_write_buffer_size));
76
13.0k
  }
77
13.0k
  auto bg_job_limits = DBImpl::GetBGJobLimits(
78
13.0k
      result.max_background_flushes, result.max_background_compactions,
79
13.0k
      result.max_background_jobs, true /* parallelize_compactions */);
80
13.0k
  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_compactions,
81
13.0k
                                           Env::Priority::LOW);
82
13.0k
  result.env->IncBackgroundThreadsIfNeeded(bg_job_limits.max_flushes,
83
13.0k
                                           Env::Priority::HIGH);
84
85
13.0k
  if (result.rate_limiter.get() != nullptr) {
86
0
    if (result.bytes_per_sync == 0) {
87
0
      result.bytes_per_sync = 1024 * 1024;
88
0
    }
89
0
  }
90
91
13.0k
  if (result.delayed_write_rate == 0) {
92
13.0k
    if (result.rate_limiter.get() != nullptr) {
93
0
      result.delayed_write_rate = result.rate_limiter->GetBytesPerSecond();
94
0
    }
95
13.0k
    if (result.delayed_write_rate == 0) {
96
13.0k
      result.delayed_write_rate = 16 * 1024 * 1024;
97
13.0k
    }
98
13.0k
  }
99
100
13.0k
  if (result.WAL_ttl_seconds > 0 || result.WAL_size_limit_MB > 0) {
101
0
    result.recycle_log_file_num = false;
102
0
  }
103
104
13.0k
  if (result.recycle_log_file_num &&
105
13.0k
      (result.wal_recovery_mode ==
106
0
           WALRecoveryMode::kTolerateCorruptedTailRecords ||
107
0
       result.wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency)) {
108
    // - kTolerateCorruptedTailRecords is inconsistent with recycle log file
109
    //   feature. WAL recycling expects recovery success upon encountering a
110
    //   corrupt record at the point where new data ends and recycled data
111
    //   remains at the tail. However, `kTolerateCorruptedTailRecords` must fail
112
    //   upon encountering any such corrupt record, as it cannot differentiate
113
    //   between this and a real corruption, which would cause committed updates
114
    //   to be truncated -- a violation of the recovery guarantee.
115
    // - kPointInTimeRecovery and kAbsoluteConsistency are incompatible with
116
    //   recycle log file feature temporarily due to a bug found introducing a
117
    //   hole in the recovered data
118
    //   (https://github.com/facebook/rocksdb/pull/7252#issuecomment-673766236).
119
    //   Besides this bug, we believe the features are fundamentally compatible.
120
0
    result.recycle_log_file_num = 0;
121
0
  }
122
123
13.0k
  if (result.db_paths.size() == 0) {
124
13.0k
    result.db_paths.emplace_back(dbname, std::numeric_limits<uint64_t>::max());
125
13.0k
  } else if (result.wal_dir.empty()) {
126
    // Use dbname as default
127
0
    result.wal_dir = dbname;
128
0
  }
129
13.0k
  if (!result.wal_dir.empty()) {
130
    // If there is a wal_dir already set, check to see if the wal_dir is the
131
    // same as the dbname AND the same as the db_path[0] (which must exist from
132
    // a few lines ago). If the wal_dir matches both of these values, then clear
133
    // the wal_dir value, which will make wal_dir == dbname.  Most likely this
134
    // condition was the result of reading an old options file where we forced
135
    // wal_dir to be set (to dbname).
136
0
    auto npath = NormalizePath(dbname + "/");
137
0
    if (npath == NormalizePath(result.wal_dir + "/") &&
138
0
        npath == NormalizePath(result.db_paths[0].path + "/")) {
139
0
      result.wal_dir.clear();
140
0
    }
141
0
  }
142
143
13.0k
  if (!result.wal_dir.empty() && result.wal_dir.back() == '/') {
144
0
    result.wal_dir = result.wal_dir.substr(0, result.wal_dir.size() - 1);
145
0
  }
146
147
  // Force flush on DB open if 2PC is enabled, since with 2PC we have no
148
  // guarantee that consecutive log files have consecutive sequence id, which
149
  // make recovery complicated.
150
13.0k
  if (result.allow_2pc) {
151
0
    result.avoid_flush_during_recovery = false;
152
0
  }
153
154
13.0k
  ImmutableDBOptions immutable_db_options(result);
155
13.0k
  if (!immutable_db_options.IsWalDirSameAsDBPath()) {
156
    // Either the WAL dir and db_paths[0]/db_name are not the same, or we
157
    // cannot tell for sure. In either case, assume they're different and
158
    // explicitly cleanup the trash log files (bypass DeleteScheduler)
159
    // Do this first so even if we end up calling
160
    // DeleteScheduler::CleanupDirectory on the same dir later, it will be
161
    // safe
162
0
    std::vector<std::string> filenames;
163
0
    IOOptions io_opts;
164
0
    io_opts.do_not_recurse = true;
165
0
    auto wal_dir = immutable_db_options.GetWalDir();
166
0
    Status s = immutable_db_options.fs->GetChildren(
167
0
        wal_dir, io_opts, &filenames, /*IODebugContext*=*/nullptr);
168
0
    s.PermitUncheckedError();  //**TODO: What to do on error?
169
0
    for (std::string& filename : filenames) {
170
0
      if (filename.find(".log.trash", filename.length() -
171
0
                                          std::string(".log.trash").length()) !=
172
0
          std::string::npos) {
173
0
        std::string trash_file = wal_dir + "/" + filename;
174
0
        result.env->DeleteFile(trash_file).PermitUncheckedError();
175
0
      }
176
0
    }
177
0
  }
178
179
  // Create a default SstFileManager for purposes of tracking compaction size
180
  // and facilitating recovery from out of space errors.
181
13.0k
  if (result.sst_file_manager.get() == nullptr) {
182
13.0k
    std::shared_ptr<SstFileManager> sst_file_manager(
183
13.0k
        NewSstFileManager(result.env, result.info_log));
184
13.0k
    result.sst_file_manager = sst_file_manager;
185
13.0k
  }
186
187
  // Supported wal compression types
188
13.0k
  if (!StreamingCompressionTypeSupported(result.wal_compression)) {
189
0
    result.wal_compression = kNoCompression;
190
0
    ROCKS_LOG_WARN(result.info_log,
191
0
                   "wal_compression is disabled since only zstd is supported");
192
0
  }
193
194
13.0k
  if (!result.paranoid_checks) {
195
0
    result.skip_checking_sst_file_sizes_on_db_open = true;
196
0
    ROCKS_LOG_INFO(result.info_log,
197
0
                   "file size check will be skipped during open.");
198
0
  }
199
200
13.0k
  return result;
201
13.0k
}
202
203
namespace {
204
Status ValidateOptionsByTable(
205
    const DBOptions& db_opts,
206
11.5k
    const std::vector<ColumnFamilyDescriptor>& column_families) {
207
11.5k
  Status s;
208
17.5k
  for (auto& cf : column_families) {
209
17.5k
    s = ValidateOptions(db_opts, cf.options);
210
17.5k
    if (!s.ok()) {
211
0
      return s;
212
0
    }
213
17.5k
  }
214
11.5k
  return Status::OK();
215
11.5k
}
216
}  // namespace
217
218
Status DBImpl::ValidateOptions(
219
    const DBOptions& db_options,
220
11.5k
    const std::vector<ColumnFamilyDescriptor>& column_families) {
221
11.5k
  Status s;
222
17.5k
  for (auto& cfd : column_families) {
223
17.5k
    s = ColumnFamilyData::ValidateOptions(db_options, cfd.options);
224
17.5k
    if (!s.ok()) {
225
0
      return s;
226
0
    }
227
17.5k
  }
228
11.5k
  s = ValidateOptions(db_options);
229
11.5k
  return s;
230
11.5k
}
231
232
11.5k
Status DBImpl::ValidateOptions(const DBOptions& db_options) {
233
11.5k
  if (db_options.db_paths.size() > 4) {
234
0
    return Status::NotSupported(
235
0
        "More than four DB paths are not supported yet. ");
236
0
  }
237
238
11.5k
  if (db_options.allow_mmap_reads && db_options.use_direct_reads) {
239
    // Protect against assert in PosixMMapReadableFile constructor
240
0
    return Status::NotSupported(
241
0
        "If memory mapped reads (allow_mmap_reads) are enabled "
242
0
        "then direct I/O reads (use_direct_reads) must be disabled. ");
243
0
  }
244
245
11.5k
  if (db_options.allow_mmap_writes &&
246
11.5k
      db_options.use_direct_io_for_flush_and_compaction) {
247
0
    return Status::NotSupported(
248
0
        "If memory mapped writes (allow_mmap_writes) are enabled "
249
0
        "then direct I/O writes (use_direct_io_for_flush_and_compaction) must "
250
0
        "be disabled. ");
251
0
  }
252
253
11.5k
  if (db_options.keep_log_file_num == 0) {
254
0
    return Status::InvalidArgument("keep_log_file_num must be greater than 0");
255
0
  }
256
257
11.5k
  if (db_options.unordered_write &&
258
11.5k
      !db_options.allow_concurrent_memtable_write) {
259
0
    return Status::InvalidArgument(
260
0
        "unordered_write is incompatible with "
261
0
        "!allow_concurrent_memtable_write");
262
0
  }
263
264
11.5k
  if (db_options.unordered_write && db_options.enable_pipelined_write) {
265
0
    return Status::InvalidArgument(
266
0
        "unordered_write is incompatible with enable_pipelined_write");
267
0
  }
268
269
11.5k
  if (db_options.atomic_flush && db_options.enable_pipelined_write) {
270
0
    return Status::InvalidArgument(
271
0
        "atomic_flush is incompatible with enable_pipelined_write");
272
0
  }
273
274
11.5k
  if (db_options.use_direct_io_for_flush_and_compaction &&
275
11.5k
      0 == db_options.writable_file_max_buffer_size) {
276
0
    return Status::InvalidArgument(
277
0
        "writes in direct IO require writable_file_max_buffer_size > 0");
278
0
  }
279
280
11.5k
  if (db_options.daily_offpeak_time_utc != "") {
281
0
    int start_time, end_time;
282
0
    if (!TryParseTimeRangeString(db_options.daily_offpeak_time_utc, start_time,
283
0
                                 end_time)) {
284
0
      return Status::InvalidArgument(
285
0
          "daily_offpeak_time_utc should be set in the format HH:mm-HH:mm "
286
0
          "(e.g. 04:30-07:30)");
287
0
    } else if (start_time == end_time) {
288
0
      return Status::InvalidArgument(
289
0
          "start_time and end_time cannot be the same");
290
0
    }
291
0
  }
292
11.5k
  return Status::OK();
293
11.5k
}
294
295
1.50k
Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
296
1.50k
  VersionEdit new_db;
297
1.50k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
298
1.50k
  Status s = SetIdentityFile(write_options, env_, dbname_);
299
1.50k
  if (!s.ok()) {
300
0
    return s;
301
0
  }
302
1.50k
  if (immutable_db_options_.write_dbid_to_manifest) {
303
0
    std::string temp_db_id;
304
0
    s = GetDbIdentityFromIdentityFile(&temp_db_id);
305
0
    if (!s.ok()) {
306
0
      return s;
307
0
    }
308
0
    new_db.SetDBId(temp_db_id);
309
0
  }
310
1.50k
  new_db.SetLogNumber(0);
311
1.50k
  new_db.SetNextFile(2);
312
1.50k
  new_db.SetLastSequence(0);
313
314
1.50k
  ROCKS_LOG_INFO(immutable_db_options_.info_log, "Creating manifest 1 \n");
315
1.50k
  const std::string manifest = DescriptorFileName(dbname_, 1);
316
1.50k
  {
317
1.50k
    if (fs_->FileExists(manifest, IOOptions(), nullptr).ok()) {
318
0
      fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
319
0
    }
320
1.50k
    std::unique_ptr<FSWritableFile> file;
321
1.50k
    FileOptions file_options = fs_->OptimizeForManifestWrite(file_options_);
322
1.50k
    s = NewWritableFile(fs_.get(), manifest, &file, file_options);
323
1.50k
    if (!s.ok()) {
324
0
      return s;
325
0
    }
326
1.50k
    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
327
1.50k
    file->SetPreallocationBlockSize(
328
1.50k
        immutable_db_options_.manifest_preallocation_size);
329
1.50k
    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
330
1.50k
        std::move(file), manifest, file_options, immutable_db_options_.clock,
331
1.50k
        io_tracer_, nullptr /* stats */,
332
1.50k
        Histograms::HISTOGRAM_ENUM_MAX /* hist_type */,
333
1.50k
        immutable_db_options_.listeners, nullptr,
334
1.50k
        tmp_set.Contains(FileType::kDescriptorFile),
335
1.50k
        tmp_set.Contains(FileType::kDescriptorFile)));
336
1.50k
    log::Writer log(std::move(file_writer), 0, false);
337
1.50k
    std::string record;
338
1.50k
    new_db.EncodeTo(&record);
339
1.50k
    s = log.AddRecord(write_options, record);
340
1.50k
    if (s.ok()) {
341
1.50k
      s = SyncManifest(&immutable_db_options_, write_options, log.file());
342
1.50k
    }
343
1.50k
  }
344
1.50k
  if (s.ok()) {
345
    // Make "CURRENT" file that points to the new manifest file.
346
1.50k
    s = SetCurrentFile(write_options, fs_.get(), dbname_, 1,
347
1.50k
                       directories_.GetDbDir());
348
1.50k
    if (new_filenames) {
349
1.50k
      new_filenames->emplace_back(
350
1.50k
          manifest.substr(manifest.find_last_of("/\\") + 1));
351
1.50k
    }
352
1.50k
  } else {
353
0
    fs_->DeleteFile(manifest, IOOptions(), nullptr).PermitUncheckedError();
354
0
  }
355
1.50k
  return s;
356
1.50k
}
357
358
IOStatus DBImpl::CreateAndNewDirectory(
359
    FileSystem* fs, const std::string& dirname,
360
29.0k
    std::unique_ptr<FSDirectory>* directory) {
361
  // We call CreateDirIfMissing() as the directory may already exist (if we
362
  // are reopening a DB), when this happens we don't want creating the
363
  // directory to cause an error. However, we need to check if creating the
364
  // directory fails or else we may get an obscure message about the lock
365
  // file not existing. One real-world example of this occurring is if
366
  // env->CreateDirIfMissing() doesn't create intermediate directories, e.g.
367
  // when dbname_ is "dir/db" but when "dir" doesn't exist.
368
29.0k
  IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr);
369
29.0k
  if (!io_s.ok()) {
370
0
    return io_s;
371
0
  }
372
29.0k
  return fs->NewDirectory(dirname, IOOptions(), directory, nullptr);
373
29.0k
}
374
375
IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname,
376
                                     const std::string& wal_dir,
377
11.5k
                                     const std::vector<DbPath>& data_paths) {
378
11.5k
  IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_);
379
11.5k
  if (!io_s.ok()) {
380
0
    return io_s;
381
0
  }
382
11.5k
  if (!wal_dir.empty() && dbname != wal_dir) {
383
0
    io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_);
384
0
    if (!io_s.ok()) {
385
0
      return io_s;
386
0
    }
387
0
  }
388
389
11.5k
  data_dirs_.clear();
390
11.5k
  for (auto& p : data_paths) {
391
11.5k
    const std::string db_path = p.path;
392
11.5k
    if (db_path == dbname) {
393
11.5k
      data_dirs_.emplace_back(nullptr);
394
11.5k
    } else {
395
0
      std::unique_ptr<FSDirectory> path_directory;
396
0
      io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory);
397
0
      if (!io_s.ok()) {
398
0
        return io_s;
399
0
      }
400
0
      data_dirs_.emplace_back(path_directory.release());
401
0
    }
402
11.5k
  }
403
11.5k
  assert(data_dirs_.size() == data_paths.size());
404
11.5k
  return IOStatus::OK();
405
11.5k
}
406
407
Status DBImpl::Recover(
408
    const std::vector<ColumnFamilyDescriptor>& column_families, bool read_only,
409
    bool error_if_wal_file_exists, bool error_if_data_exists_in_wals,
410
    bool is_retry, uint64_t* recovered_seq, RecoveryContext* recovery_ctx,
411
11.5k
    bool* can_retry) {
412
11.5k
  mutex_.AssertHeld();
413
414
11.5k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
415
11.5k
  bool tmp_is_new_db = false;
416
11.5k
  bool& is_new_db = recovery_ctx ? recovery_ctx->is_new_db_ : tmp_is_new_db;
417
11.5k
  assert(db_lock_ == nullptr);
418
11.5k
  std::vector<std::string> files_in_dbname;
419
11.5k
  if (!read_only) {
420
11.5k
    Status s = directories_.SetDirectories(fs_.get(), dbname_,
421
11.5k
                                           immutable_db_options_.wal_dir,
422
11.5k
                                           immutable_db_options_.db_paths);
423
11.5k
    if (!s.ok()) {
424
0
      return s;
425
0
    }
426
427
11.5k
    s = env_->LockFile(LockFileName(dbname_), &db_lock_);
428
11.5k
    if (!s.ok()) {
429
0
      return s;
430
0
    }
431
432
11.5k
    std::string current_fname = CurrentFileName(dbname_);
433
    // Path to any MANIFEST file in the db dir. It does not matter which one.
434
    // Since best-efforts recovery ignores CURRENT file, existence of a
435
    // MANIFEST indicates the recovery to recover existing db. If no MANIFEST
436
    // can be found, a new db will be created.
437
11.5k
    std::string manifest_path;
438
11.5k
    if (!immutable_db_options_.best_efforts_recovery) {
439
11.5k
      s = env_->FileExists(current_fname);
440
11.5k
    } else {
441
0
      s = Status::NotFound();
442
0
      IOOptions io_opts;
443
0
      io_opts.do_not_recurse = true;
444
0
      Status io_s = immutable_db_options_.fs->GetChildren(
445
0
          dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
446
0
      if (!io_s.ok()) {
447
0
        s = io_s;
448
0
        files_in_dbname.clear();
449
0
      }
450
0
      for (const std::string& file : files_in_dbname) {
451
0
        uint64_t number = 0;
452
0
        FileType type = kWalFile;  // initialize
453
0
        if (ParseFileName(file, &number, &type) && type == kDescriptorFile) {
454
0
          uint64_t bytes;
455
0
          s = env_->GetFileSize(DescriptorFileName(dbname_, number), &bytes);
456
0
          if (s.ok() && bytes != 0) {
457
            // Found non-empty MANIFEST (descriptor log), thus best-efforts
458
            // recovery does not have to treat the db as empty.
459
0
            manifest_path = dbname_ + "/" + file;
460
0
            break;
461
0
          }
462
0
        }
463
0
      }
464
0
    }
465
11.5k
    if (s.IsNotFound()) {
466
1.50k
      if (immutable_db_options_.create_if_missing) {
467
1.50k
        s = NewDB(&files_in_dbname);
468
1.50k
        is_new_db = true;
469
1.50k
        if (!s.ok()) {
470
0
          return s;
471
0
        }
472
1.50k
      } else {
473
0
        return Status::InvalidArgument(
474
0
            current_fname, "does not exist (create_if_missing is false)");
475
0
      }
476
10.0k
    } else if (s.ok()) {
477
10.0k
      if (immutable_db_options_.error_if_exists) {
478
0
        return Status::InvalidArgument(dbname_,
479
0
                                       "exists (error_if_exists is true)");
480
0
      }
481
10.0k
    } else {
482
      // Unexpected error reading file
483
0
      assert(s.IsIOError());
484
0
      return s;
485
0
    }
486
    // Verify compatibility of file_options_ and filesystem
487
11.5k
    {
488
11.5k
      std::unique_ptr<FSRandomAccessFile> idfile;
489
11.5k
      FileOptions customized_fs(file_options_);
490
11.5k
      customized_fs.use_direct_reads |=
491
11.5k
          immutable_db_options_.use_direct_io_for_flush_and_compaction;
492
11.5k
      const std::string& fname =
493
11.5k
          manifest_path.empty() ? current_fname : manifest_path;
494
11.5k
      s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
495
11.5k
      if (!s.ok()) {
496
0
        std::string error_str = s.ToString();
497
        // Check if unsupported Direct I/O is the root cause
498
0
        customized_fs.use_direct_reads = false;
499
0
        s = fs_->NewRandomAccessFile(fname, customized_fs, &idfile, nullptr);
500
0
        if (s.ok()) {
501
0
          return Status::InvalidArgument(
502
0
              "Direct I/O is not supported by the specified DB.");
503
0
        } else {
504
0
          return Status::InvalidArgument(
505
0
              "Found options incompatible with filesystem", error_str.c_str());
506
0
        }
507
0
      }
508
11.5k
    }
509
11.5k
  } else if (immutable_db_options_.best_efforts_recovery) {
510
0
    assert(files_in_dbname.empty());
511
0
    IOOptions io_opts;
512
0
    io_opts.do_not_recurse = true;
513
0
    Status s = immutable_db_options_.fs->GetChildren(
514
0
        dbname_, io_opts, &files_in_dbname, /*IODebugContext*=*/nullptr);
515
0
    if (s.IsNotFound()) {
516
0
      return Status::InvalidArgument(dbname_,
517
0
                                     "does not exist (open for read only)");
518
0
    } else if (s.IsIOError()) {
519
0
      return s;
520
0
    }
521
0
    assert(s.ok());
522
0
  }
523
11.5k
  assert(db_id_.empty());
524
11.5k
  Status s;
525
11.5k
  bool missing_table_file = false;
526
11.5k
  if (!immutable_db_options_.best_efforts_recovery) {
527
    // Status of reading the descriptor file
528
11.5k
    Status desc_status;
529
11.5k
    s = versions_->Recover(column_families, read_only, &db_id_,
530
11.5k
                           /*no_error_if_files_missing=*/false, is_retry,
531
11.5k
                           &desc_status);
532
11.5k
    desc_status.PermitUncheckedError();
533
11.5k
    if (can_retry) {
534
      // If we're opening for the first time and the failure is likely due to
535
      // a corrupt MANIFEST file (could result in either the log::Reader
536
      // detecting a corrupt record, or SST files not found error due to
537
      // discarding badly formed tail records)
538
11.5k
      if (!is_retry &&
539
11.5k
          (desc_status.IsCorruption() || s.IsNotFound() || s.IsCorruption()) &&
540
11.5k
          CheckFSFeatureSupport(fs_.get(),
541
0
                                FSSupportedOps::kVerifyAndReconstructRead)) {
542
0
        *can_retry = true;
543
0
        ROCKS_LOG_ERROR(
544
0
            immutable_db_options_.info_log,
545
0
            "Possible corruption detected while replaying MANIFEST %s, %s. "
546
0
            "Will be retried.",
547
0
            desc_status.ToString().c_str(), s.ToString().c_str());
548
11.5k
      } else {
549
11.5k
        *can_retry = false;
550
11.5k
      }
551
11.5k
    }
552
11.5k
  } else {
553
0
    assert(!files_in_dbname.empty());
554
0
    s = versions_->TryRecover(column_families, read_only, files_in_dbname,
555
0
                              &db_id_, &missing_table_file);
556
0
    if (s.ok()) {
557
      // TryRecover may delete previous column_family_set_.
558
0
      column_family_memtables_.reset(
559
0
          new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet()));
560
0
    }
561
0
  }
562
11.5k
  if (!s.ok()) {
563
0
    return s;
564
0
  }
565
11.5k
  if (s.ok() && !read_only) {
566
17.5k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
567
      // Try to trivially move files down the LSM tree to start from bottommost
568
      // level when level_compaction_dynamic_level_bytes is enabled. This should
569
      // only be useful when user is migrating to turning on this option.
570
      // If a user is migrating from Level Compaction with a smaller level
571
      // multiplier or from Universal Compaction, there may be too many
572
      // non-empty levels and the trivial moves here are not sufficed for
573
      // migration. Additional compactions are needed to drain unnecessary
574
      // levels.
575
      //
576
      // Note that this step moves files down LSM without consulting
577
      // SSTPartitioner. Further compactions are still needed if
578
      // the user wants to partition SST files.
579
      // Note that files moved in this step may not respect the compression
580
      // option in target level.
581
17.5k
      if (cfd->ioptions()->compaction_style ==
582
17.5k
              CompactionStyle::kCompactionStyleLevel &&
583
17.5k
          cfd->ioptions()->level_compaction_dynamic_level_bytes &&
584
17.5k
          !cfd->GetLatestMutableCFOptions()->disable_auto_compactions) {
585
17.5k
        int to_level = cfd->ioptions()->num_levels - 1;
586
        // last level is reserved
587
        // allow_ingest_behind does not support Level Compaction,
588
        // and per_key_placement can have infinite compaction loop for Level
589
        // Compaction. Adjust to_level here just to be safe.
590
17.5k
        if (cfd->ioptions()->allow_ingest_behind ||
591
17.5k
            cfd->ioptions()->preclude_last_level_data_seconds > 0) {
592
0
          to_level -= 1;
593
0
        }
594
        // Whether this column family has a level trivially moved
595
17.5k
        bool moved = false;
596
        // Fill the LSM starting from to_level and going up one level at a time.
597
        // Some loop invariants (when last level is not reserved):
598
        // - levels in (from_level, to_level] are empty, and
599
        // - levels in (to_level, last_level] are non-empty.
600
140k
        for (int from_level = to_level; from_level >= 0; --from_level) {
601
122k
          const std::vector<FileMetaData*>& level_files =
602
122k
              cfd->current()->storage_info()->LevelFiles(from_level);
603
122k
          if (level_files.empty() || from_level == 0) {
604
117k
            continue;
605
117k
          }
606
4.69k
          assert(from_level <= to_level);
607
          // Trivial move files from `from_level` to `to_level`
608
4.69k
          if (from_level < to_level) {
609
0
            if (!moved) {
610
              // lsm_state will look like "[1,2,3,4,5,6,0]" for an LSM with
611
              // 7 levels
612
0
              std::string lsm_state = "[";
613
0
              for (int i = 0; i < cfd->ioptions()->num_levels; ++i) {
614
0
                lsm_state += std::to_string(
615
0
                    cfd->current()->storage_info()->NumLevelFiles(i));
616
0
                if (i < cfd->ioptions()->num_levels - 1) {
617
0
                  lsm_state += ",";
618
0
                }
619
0
              }
620
0
              lsm_state += "]";
621
0
              ROCKS_LOG_WARN(immutable_db_options_.info_log,
622
0
                             "[%s] Trivially move files down the LSM when open "
623
0
                             "with level_compaction_dynamic_level_bytes=true,"
624
0
                             " lsm_state: %s (Files are moved only if DB "
625
0
                             "Recovery is successful).",
626
0
                             cfd->GetName().c_str(), lsm_state.c_str());
627
0
              moved = true;
628
0
            }
629
0
            ROCKS_LOG_WARN(
630
0
                immutable_db_options_.info_log,
631
0
                "[%s] Moving %zu files from from_level-%d to from_level-%d",
632
0
                cfd->GetName().c_str(), level_files.size(), from_level,
633
0
                to_level);
634
0
            VersionEdit edit;
635
0
            edit.SetColumnFamily(cfd->GetID());
636
0
            for (const FileMetaData* f : level_files) {
637
0
              edit.DeleteFile(from_level, f->fd.GetNumber());
638
0
              edit.AddFile(to_level, f->fd.GetNumber(), f->fd.GetPathId(),
639
0
                           f->fd.GetFileSize(), f->smallest, f->largest,
640
0
                           f->fd.smallest_seqno, f->fd.largest_seqno,
641
0
                           f->marked_for_compaction,
642
0
                           f->temperature,  // this can be different from
643
                                            // `last_level_temperature`
644
0
                           f->oldest_blob_file_number, f->oldest_ancester_time,
645
0
                           f->file_creation_time, f->epoch_number,
646
0
                           f->file_checksum, f->file_checksum_func_name,
647
0
                           f->unique_id, f->compensated_range_deletion_size,
648
0
                           f->tail_size, f->user_defined_timestamps_persisted);
649
0
              ROCKS_LOG_WARN(immutable_db_options_.info_log,
650
0
                             "[%s] Moving #%" PRIu64
651
0
                             " from from_level-%d to from_level-%d %" PRIu64
652
0
                             " bytes\n",
653
0
                             cfd->GetName().c_str(), f->fd.GetNumber(),
654
0
                             from_level, to_level, f->fd.GetFileSize());
655
0
            }
656
0
            recovery_ctx->UpdateVersionEdits(cfd, edit);
657
0
          }
658
4.69k
          --to_level;
659
4.69k
        }
660
17.5k
      }
661
17.5k
    }
662
11.5k
  }
663
11.5k
  s = SetupDBId(write_options, read_only, recovery_ctx);
664
11.5k
  ROCKS_LOG_INFO(immutable_db_options_.info_log, "DB ID: %s\n", db_id_.c_str());
665
11.5k
  if (s.ok() && !read_only) {
666
11.5k
    s = MaybeUpdateNextFileNumber(recovery_ctx);
667
11.5k
  }
668
669
11.5k
  if (immutable_db_options_.paranoid_checks && s.ok()) {
670
11.5k
    s = CheckConsistency();
671
11.5k
  }
672
11.5k
  if (s.ok() && !read_only) {
673
    // TODO: share file descriptors (FSDirectory) with SetDirectories above
674
11.5k
    std::map<std::string, std::shared_ptr<FSDirectory>> created_dirs;
675
17.5k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
676
17.5k
      s = cfd->AddDirectories(&created_dirs);
677
17.5k
      if (!s.ok()) {
678
0
        return s;
679
0
      }
680
17.5k
    }
681
11.5k
  }
682
683
11.5k
  std::vector<std::string> files_in_wal_dir;
684
11.5k
  if (s.ok()) {
685
    // Initial max_total_in_memory_state_ before recovery wals. Log recovery
686
    // may check this value to decide whether to flush.
687
11.5k
    max_total_in_memory_state_ = 0;
688
17.5k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
689
17.5k
      auto* mutable_cf_options = cfd->GetLatestMutableCFOptions();
690
17.5k
      max_total_in_memory_state_ += mutable_cf_options->write_buffer_size *
691
17.5k
                                    mutable_cf_options->max_write_buffer_number;
692
17.5k
    }
693
694
11.5k
    SequenceNumber next_sequence(kMaxSequenceNumber);
695
11.5k
    default_cf_handle_ = new ColumnFamilyHandleImpl(
696
11.5k
        versions_->GetColumnFamilySet()->GetDefault(), this, &mutex_);
697
11.5k
    default_cf_internal_stats_ = default_cf_handle_->cfd()->internal_stats();
698
699
    // Recover from all newer log files than the ones named in the
700
    // descriptor (new log files may have been added by the previous
701
    // incarnation without registering them in the descriptor).
702
    //
703
    // Note that prev_log_number() is no longer used, but we pay
704
    // attention to it in case we are recovering a database
705
    // produced by an older version of rocksdb.
706
11.5k
    auto wal_dir = immutable_db_options_.GetWalDir();
707
11.5k
    if (!immutable_db_options_.best_efforts_recovery) {
708
11.5k
      IOOptions io_opts;
709
11.5k
      io_opts.do_not_recurse = true;
710
11.5k
      s = immutable_db_options_.fs->GetChildren(
711
11.5k
          wal_dir, io_opts, &files_in_wal_dir, /*IODebugContext*=*/nullptr);
712
11.5k
    }
713
11.5k
    if (s.IsNotFound()) {
714
0
      return Status::InvalidArgument("wal_dir not found", wal_dir);
715
11.5k
    } else if (!s.ok()) {
716
0
      return s;
717
0
    }
718
719
11.5k
    std::unordered_map<uint64_t, std::string> wal_files;
720
293k
    for (const auto& file : files_in_wal_dir) {
721
293k
      uint64_t number;
722
293k
      FileType type;
723
293k
      if (ParseFileName(file, &number, &type) && type == kWalFile) {
724
10.0k
        if (is_new_db) {
725
0
          return Status::Corruption(
726
0
              "While creating a new Db, wal_dir contains "
727
0
              "existing log file: ",
728
0
              file);
729
10.0k
        } else {
730
10.0k
          wal_files[number] = LogFileName(wal_dir, number);
731
10.0k
        }
732
10.0k
      }
733
293k
    }
734
735
11.5k
    if (immutable_db_options_.track_and_verify_wals_in_manifest) {
736
0
      if (!immutable_db_options_.best_efforts_recovery) {
737
        // Verify WALs in MANIFEST.
738
0
        s = versions_->GetWalSet().CheckWals(env_, wal_files);
739
0
      }  // else since best effort recovery does not recover from WALs, no need
740
         // to check WALs.
741
11.5k
    } else if (!versions_->GetWalSet().GetWals().empty()) {
742
      // Tracking is disabled, clear previously tracked WALs from MANIFEST,
743
      // otherwise, in the future, if WAL tracking is enabled again,
744
      // since the WALs deleted when WAL tracking is disabled are not persisted
745
      // into MANIFEST, WAL check may fail.
746
0
      VersionEdit edit;
747
0
      WalNumber max_wal_number =
748
0
          versions_->GetWalSet().GetWals().rbegin()->first;
749
0
      edit.DeleteWalsBefore(max_wal_number + 1);
750
0
      assert(recovery_ctx != nullptr);
751
0
      assert(versions_->GetColumnFamilySet() != nullptr);
752
0
      recovery_ctx->UpdateVersionEdits(
753
0
          versions_->GetColumnFamilySet()->GetDefault(), edit);
754
0
    }
755
11.5k
    if (!s.ok()) {
756
0
      return s;
757
0
    }
758
759
11.5k
    if (!wal_files.empty()) {
760
10.0k
      if (error_if_wal_file_exists) {
761
0
        return Status::Corruption(
762
0
            "The db was opened in readonly mode with error_if_wal_file_exists"
763
0
            "flag but a WAL file already exists");
764
10.0k
      } else if (error_if_data_exists_in_wals) {
765
0
        for (auto& wal_file : wal_files) {
766
0
          uint64_t bytes;
767
0
          s = env_->GetFileSize(wal_file.second, &bytes);
768
0
          if (s.ok()) {
769
0
            if (bytes > 0) {
770
0
              return Status::Corruption(
771
0
                  "error_if_data_exists_in_wals is set but there are data "
772
0
                  " in WAL files.");
773
0
            }
774
0
          }
775
0
        }
776
0
      }
777
10.0k
    }
778
779
11.5k
    if (!wal_files.empty()) {
780
      // Recover in the order in which the wals were generated
781
10.0k
      std::vector<uint64_t> wals;
782
10.0k
      wals.reserve(wal_files.size());
783
10.0k
      for (const auto& wal_file : wal_files) {
784
10.0k
        wals.push_back(wal_file.first);
785
10.0k
      }
786
10.0k
      std::sort(wals.begin(), wals.end());
787
788
10.0k
      bool corrupted_wal_found = false;
789
10.0k
      s = RecoverLogFiles(wals, &next_sequence, read_only, is_retry,
790
10.0k
                          &corrupted_wal_found, recovery_ctx);
791
10.0k
      if (corrupted_wal_found && recovered_seq != nullptr) {
792
0
        *recovered_seq = next_sequence;
793
0
      }
794
10.0k
      if (!s.ok()) {
795
        // Clear memtables if recovery failed
796
0
        for (auto cfd : *versions_->GetColumnFamilySet()) {
797
0
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
798
0
                                 kMaxSequenceNumber);
799
0
        }
800
0
      }
801
10.0k
    }
802
11.5k
  }
803
804
11.5k
  if (read_only) {
805
    // If we are opening as read-only, we need to update options_file_number_
806
    // to reflect the most recent OPTIONS file. It does not matter for regular
807
    // read-write db instance because options_file_number_ will later be
808
    // updated to versions_->NewFileNumber() in RenameTempFileToOptionsFile.
809
0
    std::vector<std::string> filenames;
810
0
    if (s.ok()) {
811
0
      const std::string normalized_dbname = NormalizePath(dbname_);
812
0
      const std::string normalized_wal_dir =
813
0
          NormalizePath(immutable_db_options_.GetWalDir());
814
0
      if (immutable_db_options_.best_efforts_recovery) {
815
0
        filenames = std::move(files_in_dbname);
816
0
      } else if (normalized_dbname == normalized_wal_dir) {
817
0
        filenames = std::move(files_in_wal_dir);
818
0
      } else {
819
0
        IOOptions io_opts;
820
0
        io_opts.do_not_recurse = true;
821
0
        s = immutable_db_options_.fs->GetChildren(
822
0
            GetName(), io_opts, &filenames, /*IODebugContext*=*/nullptr);
823
0
      }
824
0
    }
825
0
    if (s.ok()) {
826
0
      uint64_t number = 0;
827
0
      uint64_t options_file_number = 0;
828
0
      FileType type;
829
0
      for (const auto& fname : filenames) {
830
0
        if (ParseFileName(fname, &number, &type) && type == kOptionsFile) {
831
0
          options_file_number = std::max(number, options_file_number);
832
0
        }
833
0
      }
834
0
      versions_->options_file_number_ = options_file_number;
835
0
      uint64_t options_file_size = 0;
836
0
      if (options_file_number > 0) {
837
0
        s = env_->GetFileSize(OptionsFileName(GetName(), options_file_number),
838
0
                              &options_file_size);
839
0
      }
840
0
      versions_->options_file_size_ = options_file_size;
841
0
    }
842
0
  }
843
11.5k
  return s;
844
11.5k
}
845
846
0
Status DBImpl::PersistentStatsProcessFormatVersion() {
847
0
  mutex_.AssertHeld();
848
0
  Status s;
849
  // persist version when stats CF doesn't exist
850
0
  bool should_persist_format_version = !persistent_stats_cfd_exists_;
851
0
  mutex_.Unlock();
852
0
  if (persistent_stats_cfd_exists_) {
853
    // Check persistent stats format version compatibility. Drop and recreate
854
    // persistent stats CF if format version is incompatible
855
0
    uint64_t format_version_recovered = 0;
856
0
    Status s_format = DecodePersistentStatsVersionNumber(
857
0
        this, StatsVersionKeyType::kFormatVersion, &format_version_recovered);
858
0
    uint64_t compatible_version_recovered = 0;
859
0
    Status s_compatible = DecodePersistentStatsVersionNumber(
860
0
        this, StatsVersionKeyType::kCompatibleVersion,
861
0
        &compatible_version_recovered);
862
    // abort reading from existing stats CF if any of following is true:
863
    // 1. failed to read format version or compatible version from disk
864
    // 2. sst's format version is greater than current format version, meaning
865
    // this sst is encoded with a newer RocksDB release, and current compatible
866
    // version is below the sst's compatible version
867
0
    if (!s_format.ok() || !s_compatible.ok() ||
868
0
        (kStatsCFCurrentFormatVersion < format_version_recovered &&
869
0
         kStatsCFCompatibleFormatVersion < compatible_version_recovered)) {
870
0
      if (!s_format.ok() || !s_compatible.ok()) {
871
0
        ROCKS_LOG_WARN(
872
0
            immutable_db_options_.info_log,
873
0
            "Recreating persistent stats column family since reading "
874
0
            "persistent stats version key failed. Format key: %s, compatible "
875
0
            "key: %s",
876
0
            s_format.ToString().c_str(), s_compatible.ToString().c_str());
877
0
      } else {
878
0
        ROCKS_LOG_WARN(
879
0
            immutable_db_options_.info_log,
880
0
            "Recreating persistent stats column family due to corrupted or "
881
0
            "incompatible format version. Recovered format: %" PRIu64
882
0
            "; recovered format compatible since: %" PRIu64 "\n",
883
0
            format_version_recovered, compatible_version_recovered);
884
0
      }
885
0
      s = DropColumnFamily(persist_stats_cf_handle_);
886
0
      if (s.ok()) {
887
0
        s = DestroyColumnFamilyHandle(persist_stats_cf_handle_);
888
0
      }
889
0
      ColumnFamilyHandle* handle = nullptr;
890
0
      if (s.ok()) {
891
0
        ColumnFamilyOptions cfo;
892
0
        OptimizeForPersistentStats(&cfo);
893
0
        s = CreateColumnFamilyImpl(ReadOptions(Env::IOActivity::kDBOpen),
894
0
                                   WriteOptions(Env::IOActivity::kDBOpen), cfo,
895
0
                                   kPersistentStatsColumnFamilyName, &handle);
896
0
      }
897
0
      if (s.ok()) {
898
0
        persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
899
        // should also persist version here because old stats CF is discarded
900
0
        should_persist_format_version = true;
901
0
      }
902
0
    }
903
0
  }
904
0
  if (should_persist_format_version) {
905
    // Persistent stats CF being created for the first time, need to write
906
    // format version key
907
0
    WriteBatch batch;
908
0
    if (s.ok()) {
909
0
      s = batch.Put(persist_stats_cf_handle_, kFormatVersionKeyString,
910
0
                    std::to_string(kStatsCFCurrentFormatVersion));
911
0
    }
912
0
    if (s.ok()) {
913
0
      s = batch.Put(persist_stats_cf_handle_, kCompatibleVersionKeyString,
914
0
                    std::to_string(kStatsCFCompatibleFormatVersion));
915
0
    }
916
0
    if (s.ok()) {
917
      // TODO: plumb Env::IOActivity, Env::IOPriority
918
0
      WriteOptions wo;
919
0
      wo.low_pri = true;
920
0
      wo.no_slowdown = true;
921
0
      wo.sync = false;
922
0
      s = Write(wo, &batch);
923
0
    }
924
0
  }
925
0
  mutex_.Lock();
926
0
  return s;
927
0
}
928
929
0
Status DBImpl::InitPersistStatsColumnFamily() {
930
0
  mutex_.AssertHeld();
931
0
  assert(!persist_stats_cf_handle_);
932
0
  ColumnFamilyData* persistent_stats_cfd =
933
0
      versions_->GetColumnFamilySet()->GetColumnFamily(
934
0
          kPersistentStatsColumnFamilyName);
935
0
  persistent_stats_cfd_exists_ = persistent_stats_cfd != nullptr;
936
937
0
  Status s;
938
0
  if (persistent_stats_cfd != nullptr) {
939
    // We are recovering from a DB which already contains persistent stats CF,
940
    // the CF is already created in VersionSet::ApplyOneVersionEdit, but
941
    // column family handle was not. Need to explicitly create handle here.
942
0
    persist_stats_cf_handle_ =
943
0
        new ColumnFamilyHandleImpl(persistent_stats_cfd, this, &mutex_);
944
0
  } else {
945
0
    mutex_.Unlock();
946
0
    ColumnFamilyHandle* handle = nullptr;
947
0
    ColumnFamilyOptions cfo;
948
0
    OptimizeForPersistentStats(&cfo);
949
0
    s = CreateColumnFamilyImpl(ReadOptions(Env::IOActivity::kDBOpen),
950
0
                               WriteOptions(Env::IOActivity::kDBOpen), cfo,
951
0
                               kPersistentStatsColumnFamilyName, &handle);
952
0
    persist_stats_cf_handle_ = static_cast<ColumnFamilyHandleImpl*>(handle);
953
0
    mutex_.Lock();
954
0
  }
955
0
  return s;
956
0
}
957
958
11.5k
Status DBImpl::LogAndApplyForRecovery(const RecoveryContext& recovery_ctx) {
959
11.5k
  mutex_.AssertHeld();
960
11.5k
  assert(versions_->descriptor_log_ == nullptr);
961
11.5k
  const ReadOptions read_options(Env::IOActivity::kDBOpen);
962
11.5k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
963
964
11.5k
  Status s = versions_->LogAndApply(recovery_ctx.cfds_,
965
11.5k
                                    recovery_ctx.mutable_cf_opts_, read_options,
966
11.5k
                                    write_options, recovery_ctx.edit_lists_,
967
11.5k
                                    &mutex_, directories_.GetDbDir());
968
11.5k
  return s;
969
11.5k
}
970
971
10.0k
void DBImpl::InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap() {
972
10.0k
  if (immutable_db_options_.wal_filter == nullptr) {
973
10.0k
    return;
974
10.0k
  }
975
0
  assert(immutable_db_options_.wal_filter != nullptr);
976
0
  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
977
978
0
  std::map<std::string, uint32_t> cf_name_id_map;
979
0
  std::map<uint32_t, uint64_t> cf_lognumber_map;
980
0
  assert(versions_);
981
0
  assert(versions_->GetColumnFamilySet());
982
0
  for (auto cfd : *versions_->GetColumnFamilySet()) {
983
0
    assert(cfd);
984
0
    cf_name_id_map.insert(std::make_pair(cfd->GetName(), cfd->GetID()));
985
0
    cf_lognumber_map.insert(std::make_pair(cfd->GetID(), cfd->GetLogNumber()));
986
0
  }
987
988
0
  wal_filter.ColumnFamilyLogNumberMap(cf_lognumber_map, cf_name_id_map);
989
0
}
990
991
bool DBImpl::InvokeWalFilterIfNeededOnWalRecord(uint64_t wal_number,
992
                                                const std::string& wal_fname,
993
                                                log::Reader::Reporter& reporter,
994
                                                Status& status,
995
                                                bool& stop_replay,
996
14.7k
                                                WriteBatch& batch) {
997
14.7k
  if (immutable_db_options_.wal_filter == nullptr) {
998
14.7k
    return true;
999
14.7k
  }
1000
0
  assert(immutable_db_options_.wal_filter != nullptr);
1001
0
  WalFilter& wal_filter = *(immutable_db_options_.wal_filter);
1002
1003
0
  WriteBatch new_batch;
1004
0
  bool batch_changed = false;
1005
1006
0
  bool process_current_record = true;
1007
1008
0
  WalFilter::WalProcessingOption wal_processing_option =
1009
0
      wal_filter.LogRecordFound(wal_number, wal_fname, batch, &new_batch,
1010
0
                                &batch_changed);
1011
1012
0
  switch (wal_processing_option) {
1013
0
    case WalFilter::WalProcessingOption::kContinueProcessing:
1014
      // do nothing, proceeed normally
1015
0
      break;
1016
0
    case WalFilter::WalProcessingOption::kIgnoreCurrentRecord:
1017
      // skip current record
1018
0
      process_current_record = false;
1019
0
      break;
1020
0
    case WalFilter::WalProcessingOption::kStopReplay:
1021
      // skip current record and stop replay
1022
0
      process_current_record = false;
1023
0
      stop_replay = true;
1024
0
      break;
1025
0
    case WalFilter::WalProcessingOption::kCorruptedRecord: {
1026
0
      status = Status::Corruption("Corruption reported by Wal Filter ",
1027
0
                                  wal_filter.Name());
1028
0
      MaybeIgnoreError(&status);
1029
0
      if (!status.ok()) {
1030
0
        process_current_record = false;
1031
0
        reporter.Corruption(batch.GetDataSize(), status);
1032
0
      }
1033
0
      break;
1034
0
    }
1035
0
    default: {
1036
      // logical error which should not happen. If RocksDB throws, we would
1037
      // just do `throw std::logic_error`.
1038
0
      assert(false);
1039
0
      status = Status::NotSupported(
1040
0
          "Unknown WalProcessingOption returned by Wal Filter ",
1041
0
          wal_filter.Name());
1042
0
      MaybeIgnoreError(&status);
1043
0
      if (!status.ok()) {
1044
        // Ignore the error with current record processing.
1045
0
        stop_replay = true;
1046
0
      }
1047
0
      break;
1048
0
    }
1049
0
  }
1050
1051
0
  if (!process_current_record) {
1052
0
    return false;
1053
0
  }
1054
1055
0
  if (batch_changed) {
1056
    // Make sure that the count in the new batch is
1057
    // within the orignal count.
1058
0
    int new_count = WriteBatchInternal::Count(&new_batch);
1059
0
    int original_count = WriteBatchInternal::Count(&batch);
1060
0
    if (new_count > original_count) {
1061
0
      ROCKS_LOG_FATAL(
1062
0
          immutable_db_options_.info_log,
1063
0
          "Recovering log #%" PRIu64
1064
0
          " mode %d log filter %s returned "
1065
0
          "more records (%d) than original (%d) which is not allowed. "
1066
0
          "Aborting recovery.",
1067
0
          wal_number, static_cast<int>(immutable_db_options_.wal_recovery_mode),
1068
0
          wal_filter.Name(), new_count, original_count);
1069
0
      status = Status::NotSupported(
1070
0
          "More than original # of records "
1071
0
          "returned by Wal Filter ",
1072
0
          wal_filter.Name());
1073
0
      return false;
1074
0
    }
1075
    // Set the same sequence number in the new_batch
1076
    // as the original batch.
1077
0
    WriteBatchInternal::SetSequence(&new_batch,
1078
0
                                    WriteBatchInternal::Sequence(&batch));
1079
0
    batch = new_batch;
1080
0
  }
1081
0
  return true;
1082
0
}
1083
1084
// REQUIRES: wal_numbers are sorted in ascending order
1085
Status DBImpl::RecoverLogFiles(const std::vector<uint64_t>& wal_numbers,
1086
                               SequenceNumber* next_sequence, bool read_only,
1087
                               bool is_retry, bool* corrupted_wal_found,
1088
10.0k
                               RecoveryContext* recovery_ctx) {
1089
10.0k
  struct LogReporter : public log::Reader::Reporter {
1090
10.0k
    Env* env;
1091
10.0k
    Logger* info_log;
1092
10.0k
    const char* fname;
1093
10.0k
    Status* status;  // nullptr if immutable_db_options_.paranoid_checks==false
1094
10.0k
    bool* old_log_record;
1095
10.0k
    void Corruption(size_t bytes, const Status& s) override {
1096
0
      ROCKS_LOG_WARN(info_log, "%s%s: dropping %d bytes; %s",
1097
0
                     (status == nullptr ? "(ignoring error) " : ""), fname,
1098
0
                     static_cast<int>(bytes), s.ToString().c_str());
1099
0
      if (status != nullptr && status->ok()) {
1100
0
        *status = s;
1101
0
      }
1102
0
    }
1103
1104
10.0k
    void OldLogRecord(size_t bytes) override {
1105
0
      if (old_log_record != nullptr) {
1106
0
        *old_log_record = true;
1107
0
      }
1108
0
      ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes; possibly recycled",
1109
0
                     fname, static_cast<int>(bytes));
1110
0
    }
1111
10.0k
  };
1112
1113
10.0k
  mutex_.AssertHeld();
1114
10.0k
  Status status;
1115
10.0k
  bool old_log_record = false;
1116
10.0k
  std::unordered_map<int, VersionEdit> version_edits;
1117
  // no need to refcount because iteration is under mutex
1118
16.0k
  for (auto cfd : *versions_->GetColumnFamilySet()) {
1119
16.0k
    VersionEdit edit;
1120
16.0k
    edit.SetColumnFamily(cfd->GetID());
1121
16.0k
    version_edits.insert({cfd->GetID(), edit});
1122
16.0k
  }
1123
10.0k
  int job_id = next_job_id_.fetch_add(1);
1124
10.0k
  {
1125
10.0k
    auto stream = event_logger_.Log();
1126
10.0k
    stream << "job" << job_id << "event"
1127
10.0k
           << "recovery_started";
1128
10.0k
    stream << "wal_files";
1129
10.0k
    stream.StartArray();
1130
10.0k
    for (auto wal_number : wal_numbers) {
1131
10.0k
      stream << wal_number;
1132
10.0k
    }
1133
10.0k
    stream.EndArray();
1134
10.0k
  }
1135
1136
  // No-op for immutable_db_options_.wal_filter == nullptr.
1137
10.0k
  InvokeWalFilterIfNeededOnColumnFamilyToWalNumberMap();
1138
1139
10.0k
  bool stop_replay_by_wal_filter = false;
1140
10.0k
  bool stop_replay_for_corruption = false;
1141
10.0k
  bool flushed = false;
1142
10.0k
  uint64_t corrupted_wal_number = kMaxSequenceNumber;
1143
10.0k
  uint64_t min_wal_number = MinLogNumberToKeep();
1144
10.0k
  if (!allow_2pc()) {
1145
    // In non-2pc mode, we skip WALs that do not back unflushed data.
1146
10.0k
    min_wal_number =
1147
10.0k
        std::max(min_wal_number, versions_->MinLogNumberWithUnflushedData());
1148
10.0k
  }
1149
10.0k
  for (auto wal_number : wal_numbers) {
1150
10.0k
    if (wal_number < min_wal_number) {
1151
0
      ROCKS_LOG_INFO(immutable_db_options_.info_log,
1152
0
                     "Skipping log #%" PRIu64
1153
0
                     " since it is older than min log to keep #%" PRIu64,
1154
0
                     wal_number, min_wal_number);
1155
0
      continue;
1156
0
    }
1157
    // The previous incarnation may not have written any MANIFEST
1158
    // records after allocating this log number.  So we manually
1159
    // update the file number allocation counter in VersionSet.
1160
10.0k
    versions_->MarkFileNumberUsed(wal_number);
1161
    // Open the log file
1162
10.0k
    std::string fname =
1163
10.0k
        LogFileName(immutable_db_options_.GetWalDir(), wal_number);
1164
1165
10.0k
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
1166
10.0k
                   "Recovering log #%" PRIu64 " mode %d", wal_number,
1167
10.0k
                   static_cast<int>(immutable_db_options_.wal_recovery_mode));
1168
10.0k
    auto logFileDropped = [this, &fname]() {
1169
0
      uint64_t bytes;
1170
0
      if (env_->GetFileSize(fname, &bytes).ok()) {
1171
0
        auto info_log = immutable_db_options_.info_log.get();
1172
0
        ROCKS_LOG_WARN(info_log, "%s: dropping %d bytes", fname.c_str(),
1173
0
                       static_cast<int>(bytes));
1174
0
      }
1175
0
    };
1176
10.0k
    if (stop_replay_by_wal_filter) {
1177
0
      logFileDropped();
1178
0
      continue;
1179
0
    }
1180
1181
10.0k
    std::unique_ptr<SequentialFileReader> file_reader;
1182
10.0k
    {
1183
10.0k
      std::unique_ptr<FSSequentialFile> file;
1184
10.0k
      status = fs_->NewSequentialFile(
1185
10.0k
          fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
1186
10.0k
      if (!status.ok()) {
1187
0
        MaybeIgnoreError(&status);
1188
0
        if (!status.ok()) {
1189
0
          return status;
1190
0
        } else {
1191
          // Fail with one log file, but that's ok.
1192
          // Try next one.
1193
0
          continue;
1194
0
        }
1195
0
      }
1196
10.0k
      file_reader.reset(new SequentialFileReader(
1197
10.0k
          std::move(file), fname, immutable_db_options_.log_readahead_size,
1198
10.0k
          io_tracer_, /*listeners=*/{}, /*rate_limiter=*/nullptr, is_retry));
1199
10.0k
    }
1200
1201
    // Create the log reader.
1202
0
    LogReporter reporter;
1203
10.0k
    reporter.env = env_;
1204
10.0k
    reporter.info_log = immutable_db_options_.info_log.get();
1205
10.0k
    reporter.fname = fname.c_str();
1206
10.0k
    reporter.old_log_record = &old_log_record;
1207
10.0k
    if (!immutable_db_options_.paranoid_checks ||
1208
10.0k
        immutable_db_options_.wal_recovery_mode ==
1209
10.0k
            WALRecoveryMode::kSkipAnyCorruptedRecords) {
1210
0
      reporter.status = nullptr;
1211
10.0k
    } else {
1212
10.0k
      reporter.status = &status;
1213
10.0k
    }
1214
    // We intentially make log::Reader do checksumming even if
1215
    // paranoid_checks==false so that corruptions cause entire commits
1216
    // to be skipped instead of propagating bad information (like overly
1217
    // large sequence numbers).
1218
10.0k
    log::Reader reader(immutable_db_options_.info_log, std::move(file_reader),
1219
10.0k
                       &reporter, true /*checksum*/, wal_number);
1220
1221
    // Determine if we should tolerate incomplete records at the tail end of the
1222
    // Read all the records and add to a memtable
1223
10.0k
    std::string scratch;
1224
10.0k
    Slice record;
1225
1226
10.0k
    const UnorderedMap<uint32_t, size_t>& running_ts_sz =
1227
10.0k
        versions_->GetRunningColumnFamiliesTimestampSize();
1228
1229
10.0k
    TEST_SYNC_POINT_CALLBACK("DBImpl::RecoverLogFiles:BeforeReadWal",
1230
10.0k
                             /*arg=*/nullptr);
1231
10.0k
    uint64_t record_checksum;
1232
24.7k
    while (!stop_replay_by_wal_filter &&
1233
24.7k
           reader.ReadRecord(&record, &scratch,
1234
24.7k
                             immutable_db_options_.wal_recovery_mode,
1235
24.7k
                             &record_checksum) &&
1236
24.7k
           status.ok()) {
1237
14.7k
      if (record.size() < WriteBatchInternal::kHeader) {
1238
0
        reporter.Corruption(record.size(),
1239
0
                            Status::Corruption("log record too small"));
1240
0
        continue;
1241
0
      }
1242
      // We create a new batch and initialize with a valid prot_info_ to store
1243
      // the data checksums
1244
14.7k
      WriteBatch batch;
1245
14.7k
      std::unique_ptr<WriteBatch> new_batch;
1246
1247
14.7k
      status = WriteBatchInternal::SetContents(&batch, record);
1248
14.7k
      if (!status.ok()) {
1249
0
        return status;
1250
0
      }
1251
1252
14.7k
      const UnorderedMap<uint32_t, size_t>& record_ts_sz =
1253
14.7k
          reader.GetRecordedTimestampSize();
1254
14.7k
      status = HandleWriteBatchTimestampSizeDifference(
1255
14.7k
          &batch, running_ts_sz, record_ts_sz,
1256
14.7k
          TimestampSizeConsistencyMode::kReconcileInconsistency, &new_batch);
1257
14.7k
      if (!status.ok()) {
1258
0
        return status;
1259
0
      }
1260
1261
14.7k
      bool batch_updated = new_batch != nullptr;
1262
14.7k
      WriteBatch* batch_to_use = batch_updated ? new_batch.get() : &batch;
1263
14.7k
      TEST_SYNC_POINT_CALLBACK(
1264
14.7k
          "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:batch",
1265
14.7k
          batch_to_use);
1266
14.7k
      TEST_SYNC_POINT_CALLBACK(
1267
14.7k
          "DBImpl::RecoverLogFiles:BeforeUpdateProtectionInfo:checksum",
1268
14.7k
          &record_checksum);
1269
14.7k
      status = WriteBatchInternal::UpdateProtectionInfo(
1270
14.7k
          batch_to_use, 8 /* bytes_per_key */,
1271
14.7k
          batch_updated ? nullptr : &record_checksum);
1272
14.7k
      if (!status.ok()) {
1273
0
        return status;
1274
0
      }
1275
1276
14.7k
      SequenceNumber sequence = WriteBatchInternal::Sequence(batch_to_use);
1277
14.7k
      if (sequence > kMaxSequenceNumber) {
1278
0
        reporter.Corruption(
1279
0
            record.size(),
1280
0
            Status::Corruption("sequence " + std::to_string(sequence) +
1281
0
                               " is too large"));
1282
0
        continue;
1283
0
      }
1284
1285
14.7k
      if (immutable_db_options_.wal_recovery_mode ==
1286
14.7k
          WALRecoveryMode::kPointInTimeRecovery) {
1287
        // In point-in-time recovery mode, if sequence id of log files are
1288
        // consecutive, we continue recovery despite corruption. This could
1289
        // happen when we open and write to a corrupted DB, where sequence id
1290
        // will start from the last sequence id we recovered.
1291
14.7k
        if (sequence == *next_sequence) {
1292
7.60k
          stop_replay_for_corruption = false;
1293
7.60k
        }
1294
14.7k
        if (stop_replay_for_corruption) {
1295
0
          logFileDropped();
1296
0
          break;
1297
0
        }
1298
14.7k
      }
1299
1300
      // For the default case of wal_filter == nullptr, always performs no-op
1301
      // and returns true.
1302
14.7k
      if (!InvokeWalFilterIfNeededOnWalRecord(wal_number, fname, reporter,
1303
14.7k
                                              status, stop_replay_by_wal_filter,
1304
14.7k
                                              *batch_to_use)) {
1305
0
        continue;
1306
0
      }
1307
1308
      // If column family was not found, it might mean that the WAL write
1309
      // batch references to the column family that was dropped after the
1310
      // insert. We don't want to fail the whole write batch in that case --
1311
      // we just ignore the update.
1312
      // That's why we set ignore missing column families to true
1313
14.7k
      bool has_valid_writes = false;
1314
14.7k
      status = WriteBatchInternal::InsertInto(
1315
14.7k
          batch_to_use, column_family_memtables_.get(), &flush_scheduler_,
1316
14.7k
          &trim_history_scheduler_, true, wal_number, this,
1317
14.7k
          false /* concurrent_memtable_writes */, next_sequence,
1318
14.7k
          &has_valid_writes, seq_per_batch_, batch_per_txn_);
1319
14.7k
      MaybeIgnoreError(&status);
1320
14.7k
      if (!status.ok()) {
1321
        // We are treating this as a failure while reading since we read valid
1322
        // blocks that do not form coherent data
1323
0
        reporter.Corruption(record.size(), status);
1324
0
        continue;
1325
0
      }
1326
1327
14.7k
      if (has_valid_writes && !read_only) {
1328
        // we can do this because this is called before client has access to the
1329
        // DB and there is only a single thread operating on DB
1330
9.59k
        ColumnFamilyData* cfd;
1331
1332
9.59k
        while ((cfd = flush_scheduler_.TakeNextColumnFamily()) != nullptr) {
1333
0
          cfd->UnrefAndTryDelete();
1334
          // If this asserts, it means that InsertInto failed in
1335
          // filtering updates to already-flushed column families
1336
0
          assert(cfd->GetLogNumber() <= wal_number);
1337
0
          auto iter = version_edits.find(cfd->GetID());
1338
0
          assert(iter != version_edits.end());
1339
0
          VersionEdit* edit = &iter->second;
1340
0
          status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1341
0
          if (!status.ok()) {
1342
            // Reflect errors immediately so that conditions like full
1343
            // file-systems cause the DB::Open() to fail.
1344
0
            return status;
1345
0
          }
1346
0
          flushed = true;
1347
1348
0
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1349
0
                                 *next_sequence - 1);
1350
0
        }
1351
9.59k
      }
1352
14.7k
    }
1353
1354
10.0k
    if (!status.ok() || old_log_record) {
1355
0
      if (status.IsNotSupported()) {
1356
        // We should not treat NotSupported as corruption. It is rather a clear
1357
        // sign that we are processing a WAL that is produced by an incompatible
1358
        // version of the code.
1359
0
        return status;
1360
0
      }
1361
0
      if (immutable_db_options_.wal_recovery_mode ==
1362
0
          WALRecoveryMode::kSkipAnyCorruptedRecords) {
1363
        // We should ignore all errors unconditionally
1364
0
        status = Status::OK();
1365
0
      } else if (immutable_db_options_.wal_recovery_mode ==
1366
0
                 WALRecoveryMode::kPointInTimeRecovery) {
1367
0
        if (status.IsIOError()) {
1368
0
          ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1369
0
                          "IOError during point-in-time reading log #%" PRIu64
1370
0
                          " seq #%" PRIu64
1371
0
                          ". %s. This likely mean loss of synced WAL, "
1372
0
                          "thus recovery fails.",
1373
0
                          wal_number, *next_sequence,
1374
0
                          status.ToString().c_str());
1375
0
          return status;
1376
0
        }
1377
        // We should ignore the error but not continue replaying
1378
0
        status = Status::OK();
1379
0
        old_log_record = false;
1380
0
        stop_replay_for_corruption = true;
1381
0
        corrupted_wal_number = wal_number;
1382
0
        if (corrupted_wal_found != nullptr) {
1383
0
          *corrupted_wal_found = true;
1384
0
        }
1385
0
        ROCKS_LOG_INFO(immutable_db_options_.info_log,
1386
0
                       "Point in time recovered to log #%" PRIu64
1387
0
                       " seq #%" PRIu64,
1388
0
                       wal_number, *next_sequence);
1389
0
      } else {
1390
0
        assert(immutable_db_options_.wal_recovery_mode ==
1391
0
                   WALRecoveryMode::kTolerateCorruptedTailRecords ||
1392
0
               immutable_db_options_.wal_recovery_mode ==
1393
0
                   WALRecoveryMode::kAbsoluteConsistency);
1394
0
        return status;
1395
0
      }
1396
0
    }
1397
1398
10.0k
    flush_scheduler_.Clear();
1399
10.0k
    trim_history_scheduler_.Clear();
1400
10.0k
    auto last_sequence = *next_sequence - 1;
1401
10.0k
    if ((*next_sequence != kMaxSequenceNumber) &&
1402
10.0k
        (versions_->LastSequence() <= last_sequence)) {
1403
7.09k
      versions_->SetLastAllocatedSequence(last_sequence);
1404
7.09k
      versions_->SetLastPublishedSequence(last_sequence);
1405
7.09k
      versions_->SetLastSequence(last_sequence);
1406
7.09k
    }
1407
10.0k
  }
1408
  // Compare the corrupted log number to all columnfamily's current log number.
1409
  // Abort Open() if any column family's log number is greater than
1410
  // the corrupted log number, which means CF contains data beyond the point of
1411
  // corruption. This could during PIT recovery when the WAL is corrupted and
1412
  // some (but not all) CFs are flushed
1413
  // Exclude the PIT case where no log is dropped after the corruption point.
1414
  // This is to cover the case for empty wals after corrupted log, in which we
1415
  // don't reset stop_replay_for_corruption.
1416
10.0k
  if (stop_replay_for_corruption == true &&
1417
10.0k
      (immutable_db_options_.wal_recovery_mode ==
1418
0
           WALRecoveryMode::kPointInTimeRecovery ||
1419
0
       immutable_db_options_.wal_recovery_mode ==
1420
0
           WALRecoveryMode::kTolerateCorruptedTailRecords)) {
1421
0
    for (auto cfd : *versions_->GetColumnFamilySet()) {
1422
      // One special case cause cfd->GetLogNumber() > corrupted_wal_number but
1423
      // the CF is still consistent: If a new column family is created during
1424
      // the flush and the WAL sync fails at the same time, the new CF points to
1425
      // the new WAL but the old WAL is curropted. Since the new CF is empty, it
1426
      // is still consistent. We add the check of CF sst file size to avoid the
1427
      // false positive alert.
1428
1429
      // Note that, the check of (cfd->GetLiveSstFilesSize() > 0) may leads to
1430
      // the ignorance of a very rare inconsistency case caused in data
1431
      // canclation. One CF is empty due to KV deletion. But those operations
1432
      // are in the WAL. If the WAL is corrupted, the status of this CF might
1433
      // not be consistent with others. However, the consistency check will be
1434
      // bypassed due to empty CF.
1435
      // TODO: a better and complete implementation is needed to ensure strict
1436
      // consistency check in WAL recovery including hanlding the tailing
1437
      // issues.
1438
0
      if (cfd->GetLogNumber() > corrupted_wal_number &&
1439
0
          cfd->GetLiveSstFilesSize() > 0) {
1440
0
        ROCKS_LOG_ERROR(immutable_db_options_.info_log,
1441
0
                        "Column family inconsistency: SST file contains data"
1442
0
                        " beyond the point of corruption.");
1443
0
        return Status::Corruption("SST file is ahead of WALs in CF " +
1444
0
                                  cfd->GetName());
1445
0
      }
1446
0
    }
1447
0
  }
1448
1449
  // True if there's any data in the WALs; if not, we can skip re-processing
1450
  // them later
1451
10.0k
  bool data_seen = false;
1452
10.0k
  if (!read_only) {
1453
    // no need to refcount since client still doesn't have access
1454
    // to the DB and can not drop column families while we iterate
1455
10.0k
    const WalNumber max_wal_number = wal_numbers.back();
1456
16.0k
    for (auto cfd : *versions_->GetColumnFamilySet()) {
1457
16.0k
      auto iter = version_edits.find(cfd->GetID());
1458
16.0k
      assert(iter != version_edits.end());
1459
16.0k
      VersionEdit* edit = &iter->second;
1460
1461
16.0k
      if (cfd->GetLogNumber() > max_wal_number) {
1462
        // Column family cfd has already flushed the data
1463
        // from all wals. Memtable has to be empty because
1464
        // we filter the updates based on wal_number
1465
        // (in WriteBatch::InsertInto)
1466
0
        assert(cfd->mem()->GetFirstSequenceNumber() == 0);
1467
0
        assert(edit->NumEntries() == 0);
1468
0
        continue;
1469
0
      }
1470
1471
16.0k
      TEST_SYNC_POINT_CALLBACK(
1472
16.0k
          "DBImpl::RecoverLogFiles:BeforeFlushFinalMemtable", /*arg=*/nullptr);
1473
1474
      // flush the final memtable (if non-empty)
1475
16.0k
      if (cfd->mem()->GetFirstSequenceNumber() != 0) {
1476
        // If flush happened in the middle of recovery (e.g. due to memtable
1477
        // being full), we flush at the end. Otherwise we'll need to record
1478
        // where we were on last flush, which make the logic complicated.
1479
2.83k
        if (flushed || !immutable_db_options_.avoid_flush_during_recovery) {
1480
2.83k
          status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
1481
2.83k
          if (!status.ok()) {
1482
            // Recovery failed
1483
0
            break;
1484
0
          }
1485
2.83k
          flushed = true;
1486
1487
2.83k
          cfd->CreateNewMemtable(*cfd->GetLatestMutableCFOptions(),
1488
2.83k
                                 versions_->LastSequence());
1489
2.83k
        }
1490
2.83k
        data_seen = true;
1491
2.83k
      }
1492
1493
      // Update the log number info in the version edit corresponding to this
1494
      // column family. Note that the version edits will be written to MANIFEST
1495
      // together later.
1496
      // writing wal_number in the manifest means that any log file
1497
      // with number strongly less than (wal_number + 1) is already
1498
      // recovered and should be ignored on next reincarnation.
1499
      // Since we already recovered max_wal_number, we want all wals
1500
      // with numbers `<= max_wal_number` (includes this one) to be ignored
1501
16.0k
      if (flushed || cfd->mem()->GetFirstSequenceNumber() == 0) {
1502
16.0k
        edit->SetLogNumber(max_wal_number + 1);
1503
16.0k
      }
1504
16.0k
    }
1505
10.0k
    if (status.ok()) {
1506
      // we must mark the next log number as used, even though it's
1507
      // not actually used. that is because VersionSet assumes
1508
      // VersionSet::next_file_number_ always to be strictly greater than any
1509
      // log number
1510
10.0k
      versions_->MarkFileNumberUsed(max_wal_number + 1);
1511
10.0k
      assert(recovery_ctx != nullptr);
1512
1513
16.0k
      for (auto* cfd : *versions_->GetColumnFamilySet()) {
1514
16.0k
        auto iter = version_edits.find(cfd->GetID());
1515
16.0k
        assert(iter != version_edits.end());
1516
16.0k
        recovery_ctx->UpdateVersionEdits(cfd, iter->second);
1517
16.0k
      }
1518
1519
10.0k
      if (flushed || !data_seen) {
1520
10.0k
        VersionEdit wal_deletion;
1521
10.0k
        if (immutable_db_options_.track_and_verify_wals_in_manifest) {
1522
0
          wal_deletion.DeleteWalsBefore(max_wal_number + 1);
1523
0
        }
1524
10.0k
        if (!allow_2pc()) {
1525
          // In non-2pc mode, flushing the memtables of the column families
1526
          // means we can advance min_log_number_to_keep.
1527
10.0k
          wal_deletion.SetMinLogNumberToKeep(max_wal_number + 1);
1528
10.0k
        }
1529
10.0k
        assert(versions_->GetColumnFamilySet() != nullptr);
1530
10.0k
        recovery_ctx->UpdateVersionEdits(
1531
10.0k
            versions_->GetColumnFamilySet()->GetDefault(), wal_deletion);
1532
10.0k
      }
1533
10.0k
    }
1534
10.0k
  }
1535
1536
10.0k
  if (status.ok()) {
1537
10.0k
    if (data_seen && !flushed) {
1538
0
      status = RestoreAliveLogFiles(wal_numbers);
1539
10.0k
    } else if (!wal_numbers.empty()) {  // If there's no data in the WAL, or we
1540
                                        // flushed all the data, still
1541
      // truncate the log file. If the process goes into a crash loop before
1542
      // the file is deleted, the preallocated space will never get freed.
1543
10.0k
      const bool truncate = !read_only;
1544
10.0k
      GetLogSizeAndMaybeTruncate(wal_numbers.back(), truncate, nullptr)
1545
10.0k
          .PermitUncheckedError();
1546
10.0k
    }
1547
10.0k
  }
1548
1549
10.0k
  event_logger_.Log() << "job" << job_id << "event"
1550
10.0k
                      << "recovery_finished";
1551
1552
10.0k
  return status;
1553
10.0k
}
1554
1555
Status DBImpl::GetLogSizeAndMaybeTruncate(uint64_t wal_number, bool truncate,
1556
10.0k
                                          LogFileNumberSize* log_ptr) {
1557
10.0k
  LogFileNumberSize log(wal_number);
1558
10.0k
  std::string fname =
1559
10.0k
      LogFileName(immutable_db_options_.GetWalDir(), wal_number);
1560
10.0k
  Status s;
1561
  // This gets the appear size of the wals, not including preallocated space.
1562
10.0k
  s = env_->GetFileSize(fname, &log.size);
1563
10.0k
  TEST_SYNC_POINT_CALLBACK("DBImpl::GetLogSizeAndMaybeTruncate:0", /*arg=*/&s);
1564
10.0k
  if (s.ok() && truncate) {
1565
10.0k
    std::unique_ptr<FSWritableFile> last_log;
1566
10.0k
    Status truncate_status = fs_->ReopenWritableFile(
1567
10.0k
        fname,
1568
10.0k
        fs_->OptimizeForLogWrite(
1569
10.0k
            file_options_,
1570
10.0k
            BuildDBOptions(immutable_db_options_, mutable_db_options_)),
1571
10.0k
        &last_log, nullptr);
1572
10.0k
    if (truncate_status.ok()) {
1573
10.0k
      truncate_status = last_log->Truncate(log.size, IOOptions(), nullptr);
1574
10.0k
    }
1575
10.0k
    if (truncate_status.ok()) {
1576
10.0k
      truncate_status = last_log->Close(IOOptions(), nullptr);
1577
10.0k
    }
1578
    // Not a critical error if fail to truncate.
1579
10.0k
    if (!truncate_status.ok() && !truncate_status.IsNotSupported()) {
1580
0
      ROCKS_LOG_WARN(immutable_db_options_.info_log,
1581
0
                     "Failed to truncate log #%" PRIu64 ": %s", wal_number,
1582
0
                     truncate_status.ToString().c_str());
1583
0
    }
1584
10.0k
  }
1585
10.0k
  if (log_ptr) {
1586
0
    *log_ptr = log;
1587
0
  }
1588
10.0k
  return s;
1589
10.0k
}
1590
1591
0
Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
1592
0
  if (wal_numbers.empty()) {
1593
0
    return Status::OK();
1594
0
  }
1595
0
  Status s;
1596
0
  mutex_.AssertHeld();
1597
0
  assert(immutable_db_options_.avoid_flush_during_recovery);
1598
  // Mark these as alive so they'll be considered for deletion later by
1599
  // FindObsoleteFiles()
1600
0
  total_log_size_ = 0;
1601
0
  log_empty_ = false;
1602
0
  uint64_t min_wal_with_unflushed_data =
1603
0
      versions_->MinLogNumberWithUnflushedData();
1604
0
  for (auto wal_number : wal_numbers) {
1605
0
    if (!allow_2pc() && wal_number < min_wal_with_unflushed_data) {
1606
      // In non-2pc mode, the WAL files not backing unflushed data are not
1607
      // alive, thus should not be added to the alive_log_files_.
1608
0
      continue;
1609
0
    }
1610
    // We preallocate space for wals, but then after a crash and restart, those
1611
    // preallocated space are not needed anymore. It is likely only the last
1612
    // log has such preallocated space, so we only truncate for the last log.
1613
0
    LogFileNumberSize log;
1614
0
    s = GetLogSizeAndMaybeTruncate(
1615
0
        wal_number, /*truncate=*/(wal_number == wal_numbers.back()), &log);
1616
0
    if (!s.ok()) {
1617
0
      break;
1618
0
    }
1619
0
    total_log_size_ += log.size;
1620
0
    alive_log_files_.push_back(log);
1621
0
  }
1622
0
  return s;
1623
0
}
1624
1625
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
1626
2.83k
                                           MemTable* mem, VersionEdit* edit) {
1627
2.83k
  mutex_.AssertHeld();
1628
2.83k
  assert(cfd);
1629
2.83k
  assert(cfd->imm());
1630
  // The immutable memtable list must be empty.
1631
2.83k
  assert(std::numeric_limits<uint64_t>::max() ==
1632
2.83k
         cfd->imm()->GetEarliestMemTableID());
1633
1634
2.83k
  const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
1635
1636
2.83k
  FileMetaData meta;
1637
2.83k
  std::vector<BlobFileAddition> blob_file_additions;
1638
1639
2.83k
  std::unique_ptr<std::list<uint64_t>::iterator> pending_outputs_inserted_elem(
1640
2.83k
      new std::list<uint64_t>::iterator(
1641
2.83k
          CaptureCurrentFileNumberInPendingOutputs()));
1642
2.83k
  meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0);
1643
2.83k
  ReadOptions ro;
1644
2.83k
  ro.total_order_seek = true;
1645
2.83k
  ro.io_activity = Env::IOActivity::kDBOpen;
1646
2.83k
  Arena arena;
1647
2.83k
  Status s;
1648
2.83k
  TableProperties table_properties;
1649
2.83k
  {
1650
2.83k
    ScopedArenaPtr<InternalIterator> iter(
1651
2.83k
        mem->NewIterator(ro, /*seqno_to_time_mapping=*/nullptr, &arena));
1652
2.83k
    ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1653
2.83k
                    "[%s] [WriteLevel0TableForRecovery]"
1654
2.83k
                    " Level-0 table #%" PRIu64 ": started",
1655
2.83k
                    cfd->GetName().c_str(), meta.fd.GetNumber());
1656
1657
    // Get the latest mutable cf options while the mutex is still locked
1658
2.83k
    const MutableCFOptions mutable_cf_options =
1659
2.83k
        *cfd->GetLatestMutableCFOptions();
1660
2.83k
    bool paranoid_file_checks =
1661
2.83k
        cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
1662
1663
2.83k
    int64_t _current_time = 0;
1664
2.83k
    immutable_db_options_.clock->GetCurrentTime(&_current_time)
1665
2.83k
        .PermitUncheckedError();  // ignore error
1666
2.83k
    const uint64_t current_time = static_cast<uint64_t>(_current_time);
1667
2.83k
    meta.oldest_ancester_time = current_time;
1668
2.83k
    meta.epoch_number = cfd->NewEpochNumber();
1669
2.83k
    {
1670
2.83k
      auto write_hint = cfd->CalculateSSTWriteHint(0);
1671
2.83k
      mutex_.Unlock();
1672
1673
2.83k
      SequenceNumber earliest_write_conflict_snapshot;
1674
2.83k
      std::vector<SequenceNumber> snapshot_seqs =
1675
2.83k
          snapshots_.GetAll(&earliest_write_conflict_snapshot);
1676
2.83k
      SequenceNumber earliest_snapshot =
1677
2.83k
          (snapshot_seqs.empty() ? kMaxSequenceNumber : snapshot_seqs.at(0));
1678
2.83k
      auto snapshot_checker = snapshot_checker_.get();
1679
2.83k
      if (use_custom_gc_ && snapshot_checker == nullptr) {
1680
0
        snapshot_checker = DisableGCSnapshotChecker::Instance();
1681
0
      }
1682
2.83k
      std::vector<std::unique_ptr<FragmentedRangeTombstoneIterator>>
1683
2.83k
          range_del_iters;
1684
2.83k
      auto range_del_iter =
1685
          // This is called during recovery, where a live memtable is flushed
1686
          // directly. In this case, no fragmented tombstone list is cached in
1687
          // this memtable yet.
1688
2.83k
          mem->NewRangeTombstoneIterator(ro, kMaxSequenceNumber,
1689
2.83k
                                         false /* immutable_memtable */);
1690
2.83k
      if (range_del_iter != nullptr) {
1691
0
        range_del_iters.emplace_back(range_del_iter);
1692
0
      }
1693
1694
2.83k
      IOStatus io_s;
1695
2.83k
      const ReadOptions read_option(Env::IOActivity::kDBOpen);
1696
2.83k
      const WriteOptions write_option(Env::IO_HIGH, Env::IOActivity::kDBOpen);
1697
1698
2.83k
      TableBuilderOptions tboptions(
1699
2.83k
          *cfd->ioptions(), mutable_cf_options, read_option, write_option,
1700
2.83k
          cfd->internal_comparator(), cfd->internal_tbl_prop_coll_factories(),
1701
2.83k
          GetCompressionFlush(*cfd->ioptions(), mutable_cf_options),
1702
2.83k
          mutable_cf_options.compression_opts, cfd->GetID(), cfd->GetName(),
1703
2.83k
          0 /* level */, false /* is_bottommost */,
1704
2.83k
          TableFileCreationReason::kRecovery, 0 /* oldest_key_time */,
1705
2.83k
          0 /* file_creation_time */, db_id_, db_session_id_,
1706
2.83k
          0 /* target_file_size */, meta.fd.GetNumber(), kMaxSequenceNumber);
1707
2.83k
      Version* version = cfd->current();
1708
2.83k
      version->Ref();
1709
2.83k
      uint64_t num_input_entries = 0;
1710
2.83k
      s = BuildTable(dbname_, versions_.get(), immutable_db_options_, tboptions,
1711
2.83k
                     file_options_for_compaction_, cfd->table_cache(),
1712
2.83k
                     iter.get(), std::move(range_del_iters), &meta,
1713
2.83k
                     &blob_file_additions, snapshot_seqs, earliest_snapshot,
1714
2.83k
                     earliest_write_conflict_snapshot, kMaxSequenceNumber,
1715
2.83k
                     snapshot_checker, paranoid_file_checks,
1716
2.83k
                     cfd->internal_stats(), &io_s, io_tracer_,
1717
2.83k
                     BlobFileCreationReason::kRecovery,
1718
2.83k
                     nullptr /* seqno_to_time_mapping */, &event_logger_,
1719
2.83k
                     job_id, nullptr /* table_properties */, write_hint,
1720
2.83k
                     nullptr /*full_history_ts_low*/, &blob_callback_, version,
1721
2.83k
                     &num_input_entries);
1722
2.83k
      version->Unref();
1723
2.83k
      LogFlush(immutable_db_options_.info_log);
1724
2.83k
      ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
1725
2.83k
                      "[%s] [WriteLevel0TableForRecovery]"
1726
2.83k
                      " Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s",
1727
2.83k
                      cfd->GetName().c_str(), meta.fd.GetNumber(),
1728
2.83k
                      meta.fd.GetFileSize(), s.ToString().c_str());
1729
2.83k
      mutex_.Lock();
1730
1731
      // TODO(AR) is this ok?
1732
2.83k
      if (!io_s.ok() && s.ok()) {
1733
0
        s = io_s;
1734
0
      }
1735
1736
2.83k
      uint64_t total_num_entries = mem->num_entries();
1737
2.83k
      if (s.ok() && total_num_entries != num_input_entries) {
1738
0
        std::string msg = "Expected " + std::to_string(total_num_entries) +
1739
0
                          " entries in memtable, but read " +
1740
0
                          std::to_string(num_input_entries);
1741
0
        ROCKS_LOG_WARN(immutable_db_options_.info_log,
1742
0
                       "[%s] [JOB %d] Level-0 flush during recover: %s",
1743
0
                       cfd->GetName().c_str(), job_id, msg.c_str());
1744
0
        if (immutable_db_options_.flush_verify_memtable_count) {
1745
0
          s = Status::Corruption(msg);
1746
0
        }
1747
0
      }
1748
2.83k
    }
1749
2.83k
  }
1750
2.83k
  ReleaseFileNumberFromPendingOutputs(pending_outputs_inserted_elem);
1751
1752
  // Note that if file_size is zero, the file has been deleted and
1753
  // should not be added to the manifest.
1754
2.83k
  const bool has_output = meta.fd.GetFileSize() > 0;
1755
1756
2.83k
  constexpr int level = 0;
1757
1758
2.83k
  if (s.ok() && has_output) {
1759
2.83k
    edit->AddFile(level, meta.fd.GetNumber(), meta.fd.GetPathId(),
1760
2.83k
                  meta.fd.GetFileSize(), meta.smallest, meta.largest,
1761
2.83k
                  meta.fd.smallest_seqno, meta.fd.largest_seqno,
1762
2.83k
                  meta.marked_for_compaction, meta.temperature,
1763
2.83k
                  meta.oldest_blob_file_number, meta.oldest_ancester_time,
1764
2.83k
                  meta.file_creation_time, meta.epoch_number,
1765
2.83k
                  meta.file_checksum, meta.file_checksum_func_name,
1766
2.83k
                  meta.unique_id, meta.compensated_range_deletion_size,
1767
2.83k
                  meta.tail_size, meta.user_defined_timestamps_persisted);
1768
1769
2.83k
    for (const auto& blob : blob_file_additions) {
1770
0
      edit->AddBlobFile(blob);
1771
0
    }
1772
1773
    // For UDT in memtable only feature, move up the cutoff timestamp whenever
1774
    // a flush happens.
1775
2.83k
    const Comparator* ucmp = cfd->user_comparator();
1776
2.83k
    size_t ts_sz = ucmp->timestamp_size();
1777
2.83k
    if (ts_sz > 0 && !cfd->ioptions()->persist_user_defined_timestamps) {
1778
0
      Slice mem_newest_udt = mem->GetNewestUDT();
1779
0
      std::string full_history_ts_low = cfd->GetFullHistoryTsLow();
1780
0
      if (full_history_ts_low.empty() ||
1781
0
          ucmp->CompareTimestamp(mem_newest_udt, full_history_ts_low) >= 0) {
1782
0
        std::string new_full_history_ts_low;
1783
0
        GetFullHistoryTsLowFromU64CutoffTs(&mem_newest_udt,
1784
0
                                           &new_full_history_ts_low);
1785
0
        edit->SetFullHistoryTsLow(new_full_history_ts_low);
1786
0
      }
1787
0
    }
1788
2.83k
  }
1789
1790
2.83k
  InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
1791
2.83k
  stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
1792
1793
2.83k
  if (has_output) {
1794
2.83k
    stats.bytes_written = meta.fd.GetFileSize();
1795
2.83k
    stats.num_output_files = 1;
1796
2.83k
  }
1797
1798
2.83k
  const auto& blobs = edit->GetBlobFileAdditions();
1799
2.83k
  for (const auto& blob : blobs) {
1800
0
    stats.bytes_written_blob += blob.GetTotalBlobBytes();
1801
0
  }
1802
1803
2.83k
  stats.num_output_files_blob = static_cast<int>(blobs.size());
1804
1805
2.83k
  cfd->internal_stats()->AddCompactionStats(level, Env::Priority::USER, stats);
1806
2.83k
  cfd->internal_stats()->AddCFStats(
1807
2.83k
      InternalStats::BYTES_FLUSHED,
1808
2.83k
      stats.bytes_written + stats.bytes_written_blob);
1809
2.83k
  RecordTick(stats_, COMPACT_WRITE_BYTES, meta.fd.GetFileSize());
1810
2.83k
  return s;
1811
2.83k
}
1812
1813
5.64k
Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {
1814
5.64k
  DBOptions db_options(options);
1815
5.64k
  ColumnFamilyOptions cf_options(options);
1816
5.64k
  std::vector<ColumnFamilyDescriptor> column_families;
1817
5.64k
  column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
1818
5.64k
  if (db_options.persist_stats_to_disk) {
1819
0
    column_families.emplace_back(kPersistentStatsColumnFamilyName, cf_options);
1820
0
  }
1821
5.64k
  std::vector<ColumnFamilyHandle*> handles;
1822
5.64k
  Status s = DB::Open(db_options, dbname, column_families, &handles, dbptr);
1823
5.64k
  if (s.ok()) {
1824
5.64k
    if (db_options.persist_stats_to_disk) {
1825
0
      assert(handles.size() == 2);
1826
5.64k
    } else {
1827
5.64k
      assert(handles.size() == 1);
1828
5.64k
    }
1829
    // i can delete the handle since DBImpl is always holding a reference to
1830
    // default column family
1831
5.64k
    if (db_options.persist_stats_to_disk && handles[1] != nullptr) {
1832
0
      delete handles[1];
1833
0
    }
1834
5.64k
    delete handles[0];
1835
5.64k
  }
1836
5.64k
  return s;
1837
5.64k
}
1838
1839
Status DB::Open(const DBOptions& db_options, const std::string& dbname,
1840
                const std::vector<ColumnFamilyDescriptor>& column_families,
1841
11.5k
                std::vector<ColumnFamilyHandle*>* handles, DB** dbptr) {
1842
11.5k
  const bool kSeqPerBatch = true;
1843
11.5k
  const bool kBatchPerTxn = true;
1844
11.5k
  ThreadStatusUtil::SetEnableTracking(db_options.enable_thread_tracking);
1845
11.5k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_DBOPEN);
1846
11.5k
  bool can_retry = false;
1847
11.5k
  Status s;
1848
11.5k
  do {
1849
11.5k
    s = DBImpl::Open(db_options, dbname, column_families, handles, dbptr,
1850
11.5k
                     !kSeqPerBatch, kBatchPerTxn, can_retry, &can_retry);
1851
11.5k
  } while (!s.ok() && can_retry);
1852
11.5k
  ThreadStatusUtil::ResetThreadStatus();
1853
11.5k
  return s;
1854
11.5k
}
1855
1856
// TODO: Implement the trimming in flush code path.
1857
// TODO: Perform trimming before inserting into memtable during recovery.
1858
// TODO: Pick files with max_timestamp > trim_ts by each file's timestamp meta
1859
// info, and handle only these files to reduce io.
1860
Status DB::OpenAndTrimHistory(
1861
    const DBOptions& db_options, const std::string& dbname,
1862
    const std::vector<ColumnFamilyDescriptor>& column_families,
1863
    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
1864
0
    std::string trim_ts) {
1865
0
  assert(dbptr != nullptr);
1866
0
  assert(handles != nullptr);
1867
0
  auto validate_options = [&db_options] {
1868
0
    if (db_options.avoid_flush_during_recovery) {
1869
0
      return Status::InvalidArgument(
1870
0
          "avoid_flush_during_recovery incompatible with "
1871
0
          "OpenAndTrimHistory");
1872
0
    }
1873
0
    return Status::OK();
1874
0
  };
1875
0
  auto s = validate_options();
1876
0
  if (!s.ok()) {
1877
0
    return s;
1878
0
  }
1879
1880
0
  DB* db = nullptr;
1881
0
  s = DB::Open(db_options, dbname, column_families, handles, &db);
1882
0
  if (!s.ok()) {
1883
0
    return s;
1884
0
  }
1885
0
  assert(db);
1886
0
  CompactRangeOptions options;
1887
0
  options.bottommost_level_compaction =
1888
0
      BottommostLevelCompaction::kForceOptimized;
1889
0
  auto db_impl = static_cast_with_check<DBImpl>(db);
1890
0
  for (auto handle : *handles) {
1891
0
    assert(handle != nullptr);
1892
0
    auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(handle);
1893
0
    auto cfd = cfh->cfd();
1894
0
    assert(cfd != nullptr);
1895
    // Only compact column families with timestamp enabled
1896
0
    if (cfd->user_comparator() != nullptr &&
1897
0
        cfd->user_comparator()->timestamp_size() > 0) {
1898
0
      s = db_impl->CompactRangeInternal(options, handle, nullptr, nullptr,
1899
0
                                        trim_ts);
1900
0
      if (!s.ok()) {
1901
0
        break;
1902
0
      }
1903
0
    }
1904
0
  }
1905
0
  auto clean_op = [&handles, &db] {
1906
0
    for (auto handle : *handles) {
1907
0
      auto temp_s = db->DestroyColumnFamilyHandle(handle);
1908
0
      assert(temp_s.ok());
1909
0
    }
1910
0
    handles->clear();
1911
0
    delete db;
1912
0
  };
1913
0
  if (!s.ok()) {
1914
0
    clean_op();
1915
0
    return s;
1916
0
  }
1917
1918
0
  *dbptr = db;
1919
0
  return s;
1920
0
}
1921
1922
IOStatus DBImpl::CreateWAL(const WriteOptions& write_options,
1923
                           uint64_t log_file_num, uint64_t recycle_log_number,
1924
                           size_t preallocate_block_size,
1925
12.6k
                           log::Writer** new_log) {
1926
12.6k
  IOStatus io_s;
1927
12.6k
  std::unique_ptr<FSWritableFile> lfile;
1928
1929
12.6k
  DBOptions db_options =
1930
12.6k
      BuildDBOptions(immutable_db_options_, mutable_db_options_);
1931
12.6k
  FileOptions opt_file_options =
1932
12.6k
      fs_->OptimizeForLogWrite(file_options_, db_options);
1933
12.6k
  std::string wal_dir = immutable_db_options_.GetWalDir();
1934
12.6k
  std::string log_fname = LogFileName(wal_dir, log_file_num);
1935
1936
12.6k
  if (recycle_log_number) {
1937
0
    ROCKS_LOG_INFO(immutable_db_options_.info_log,
1938
0
                   "reusing log %" PRIu64 " from recycle list\n",
1939
0
                   recycle_log_number);
1940
0
    std::string old_log_fname = LogFileName(wal_dir, recycle_log_number);
1941
0
    TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile1");
1942
0
    TEST_SYNC_POINT("DBImpl::CreateWAL:BeforeReuseWritableFile2");
1943
0
    io_s = fs_->ReuseWritableFile(log_fname, old_log_fname, opt_file_options,
1944
0
                                  &lfile, /*dbg=*/nullptr);
1945
12.6k
  } else {
1946
12.6k
    io_s = NewWritableFile(fs_.get(), log_fname, &lfile, opt_file_options);
1947
12.6k
  }
1948
1949
12.6k
  if (io_s.ok()) {
1950
12.6k
    lfile->SetWriteLifeTimeHint(CalculateWALWriteHint());
1951
12.6k
    lfile->SetPreallocationBlockSize(preallocate_block_size);
1952
1953
12.6k
    const auto& listeners = immutable_db_options_.listeners;
1954
12.6k
    FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
1955
12.6k
    std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
1956
12.6k
        std::move(lfile), log_fname, opt_file_options,
1957
12.6k
        immutable_db_options_.clock, io_tracer_, nullptr /* stats */,
1958
12.6k
        Histograms::HISTOGRAM_ENUM_MAX /* hist_type */, listeners, nullptr,
1959
12.6k
        tmp_set.Contains(FileType::kWalFile),
1960
12.6k
        tmp_set.Contains(FileType::kWalFile)));
1961
12.6k
    *new_log = new log::Writer(std::move(file_writer), log_file_num,
1962
12.6k
                               immutable_db_options_.recycle_log_file_num > 0,
1963
12.6k
                               immutable_db_options_.manual_wal_flush,
1964
12.6k
                               immutable_db_options_.wal_compression);
1965
12.6k
    io_s = (*new_log)->AddCompressionTypeRecord(write_options);
1966
12.6k
  }
1967
12.6k
  return io_s;
1968
12.6k
}
1969
1970
void DBImpl::TrackExistingDataFiles(
1971
11.5k
    const std::vector<std::string>& existing_data_files) {
1972
11.5k
  auto sfm = static_cast<SstFileManagerImpl*>(
1973
11.5k
      immutable_db_options_.sst_file_manager.get());
1974
11.5k
  assert(sfm);
1975
11.5k
  std::vector<ColumnFamilyMetaData> metadata;
1976
11.5k
  GetAllColumnFamilyMetaData(&metadata);
1977
1978
11.5k
  std::unordered_set<std::string> referenced_files;
1979
17.5k
  for (const auto& md : metadata) {
1980
122k
    for (const auto& lmd : md.levels) {
1981
122k
      for (const auto& fmd : lmd.files) {
1982
        // We're assuming that each sst file name exists in at most one of
1983
        // the paths.
1984
24.0k
        std::string file_path =
1985
24.0k
            fmd.directory + kFilePathSeparator + fmd.relative_filename;
1986
24.0k
        sfm->OnAddFile(file_path, fmd.size).PermitUncheckedError();
1987
24.0k
        referenced_files.insert(file_path);
1988
24.0k
      }
1989
122k
    }
1990
17.5k
    for (const auto& bmd : md.blob_files) {
1991
0
      std::string name = bmd.blob_file_name;
1992
      // The BlobMetaData.blob_file_name may start with "/".
1993
0
      if (!name.empty() && name[0] == kFilePathSeparator) {
1994
0
        name = name.substr(1);
1995
0
      }
1996
      // We're assuming that each blob file name exists in at most one of
1997
      // the paths.
1998
0
      std::string file_path = bmd.blob_file_path + kFilePathSeparator + name;
1999
0
      sfm->OnAddFile(file_path, bmd.blob_file_size).PermitUncheckedError();
2000
0
      referenced_files.insert(file_path);
2001
0
    }
2002
17.5k
  }
2003
2004
21.2k
  for (const auto& file_path : existing_data_files) {
2005
21.2k
    if (referenced_files.find(file_path) != referenced_files.end()) {
2006
21.2k
      continue;
2007
21.2k
    }
2008
    // There shouldn't be any duplicated files. In case there is, SstFileManager
2009
    // will take care of deduping it.
2010
0
    sfm->OnAddFile(file_path).PermitUncheckedError();
2011
0
  }
2012
11.5k
}
2013
2014
Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname,
2015
                    const std::vector<ColumnFamilyDescriptor>& column_families,
2016
                    std::vector<ColumnFamilyHandle*>* handles, DB** dbptr,
2017
                    const bool seq_per_batch, const bool batch_per_txn,
2018
11.5k
                    const bool is_retry, bool* can_retry) {
2019
11.5k
  const WriteOptions write_options(Env::IOActivity::kDBOpen);
2020
11.5k
  const ReadOptions read_options(Env::IOActivity::kDBOpen);
2021
2022
11.5k
  Status s = ValidateOptionsByTable(db_options, column_families);
2023
11.5k
  if (!s.ok()) {
2024
0
    return s;
2025
0
  }
2026
2027
11.5k
  s = ValidateOptions(db_options, column_families);
2028
11.5k
  if (!s.ok()) {
2029
0
    return s;
2030
0
  }
2031
2032
11.5k
  *dbptr = nullptr;
2033
11.5k
  assert(handles);
2034
11.5k
  handles->clear();
2035
2036
11.5k
  size_t max_write_buffer_size = 0;
2037
17.5k
  for (const auto& cf : column_families) {
2038
17.5k
    max_write_buffer_size =
2039
17.5k
        std::max(max_write_buffer_size, cf.options.write_buffer_size);
2040
17.5k
  }
2041
2042
11.5k
  DBImpl* impl = new DBImpl(db_options, dbname, seq_per_batch, batch_per_txn);
2043
11.5k
  if (!impl->immutable_db_options_.info_log) {
2044
0
    s = impl->init_logger_creation_s_;
2045
0
    delete impl;
2046
0
    return s;
2047
11.5k
  } else {
2048
11.5k
    assert(impl->init_logger_creation_s_.ok());
2049
11.5k
  }
2050
11.5k
  s = impl->env_->CreateDirIfMissing(impl->immutable_db_options_.GetWalDir());
2051
11.5k
  if (s.ok()) {
2052
11.5k
    std::vector<std::string> paths;
2053
11.5k
    for (auto& db_path : impl->immutable_db_options_.db_paths) {
2054
11.5k
      paths.emplace_back(db_path.path);
2055
11.5k
    }
2056
17.5k
    for (auto& cf : column_families) {
2057
17.5k
      for (auto& cf_path : cf.options.cf_paths) {
2058
0
        paths.emplace_back(cf_path.path);
2059
0
      }
2060
17.5k
    }
2061
11.5k
    for (const auto& path : paths) {
2062
11.5k
      s = impl->env_->CreateDirIfMissing(path);
2063
11.5k
      if (!s.ok()) {
2064
0
        break;
2065
0
      }
2066
11.5k
    }
2067
2068
    // For recovery from NoSpace() error, we can only handle
2069
    // the case where the database is stored in a single path
2070
11.5k
    if (paths.size() <= 1) {
2071
11.5k
      impl->error_handler_.EnableAutoRecovery();
2072
11.5k
    }
2073
11.5k
  }
2074
11.5k
  if (s.ok()) {
2075
11.5k
    s = impl->CreateArchivalDirectory();
2076
11.5k
  }
2077
11.5k
  if (!s.ok()) {
2078
0
    delete impl;
2079
0
    return s;
2080
0
  }
2081
2082
11.5k
  impl->wal_in_db_path_ = impl->immutable_db_options_.IsWalDirSameAsDBPath();
2083
11.5k
  RecoveryContext recovery_ctx;
2084
11.5k
  impl->options_mutex_.Lock();
2085
11.5k
  impl->mutex_.Lock();
2086
2087
  // Handles create_if_missing, error_if_exists
2088
11.5k
  uint64_t recovered_seq(kMaxSequenceNumber);
2089
11.5k
  s = impl->Recover(column_families, false /* read_only */,
2090
11.5k
                    false /* error_if_wal_file_exists */,
2091
11.5k
                    false /* error_if_data_exists_in_wals */, is_retry,
2092
11.5k
                    &recovered_seq, &recovery_ctx, can_retry);
2093
11.5k
  if (s.ok()) {
2094
11.5k
    uint64_t new_log_number = impl->versions_->NewFileNumber();
2095
11.5k
    log::Writer* new_log = nullptr;
2096
11.5k
    const size_t preallocate_block_size =
2097
11.5k
        impl->GetWalPreallocateBlockSize(max_write_buffer_size);
2098
11.5k
    s = impl->CreateWAL(write_options, new_log_number, 0 /*recycle_log_number*/,
2099
11.5k
                        preallocate_block_size, &new_log);
2100
11.5k
    if (s.ok()) {
2101
      // Prevent log files created by previous instance from being recycled.
2102
      // They might be in alive_log_file_, and might get recycled otherwise.
2103
11.5k
      impl->min_log_number_to_recycle_ = new_log_number;
2104
11.5k
    }
2105
11.5k
    if (s.ok()) {
2106
11.5k
      InstrumentedMutexLock wl(&impl->log_write_mutex_);
2107
11.5k
      impl->logfile_number_ = new_log_number;
2108
11.5k
      assert(new_log != nullptr);
2109
11.5k
      assert(impl->logs_.empty());
2110
11.5k
      impl->logs_.emplace_back(new_log_number, new_log);
2111
11.5k
    }
2112
2113
11.5k
    if (s.ok()) {
2114
11.5k
      impl->alive_log_files_.emplace_back(impl->logfile_number_);
2115
      // In WritePrepared there could be gap in sequence numbers. This breaks
2116
      // the trick we use in kPointInTimeRecovery which assumes the first seq in
2117
      // the log right after the corrupted log is one larger than the last seq
2118
      // we read from the wals. To let this trick keep working, we add a dummy
2119
      // entry with the expected sequence to the first log right after recovery.
2120
      // In non-WritePrepared case also the new log after recovery could be
2121
      // empty, and thus missing the consecutive seq hint to distinguish
2122
      // middle-log corruption to corrupted-log-remained-after-recovery. This
2123
      // case also will be addressed by a dummy write.
2124
11.5k
      if (recovered_seq != kMaxSequenceNumber) {
2125
0
        WriteBatch empty_batch;
2126
0
        WriteBatchInternal::SetSequence(&empty_batch, recovered_seq);
2127
0
        uint64_t log_used, log_size;
2128
0
        log::Writer* log_writer = impl->logs_.back().writer;
2129
0
        LogFileNumberSize& log_file_number_size = impl->alive_log_files_.back();
2130
2131
0
        assert(log_writer->get_log_number() == log_file_number_size.number);
2132
0
        impl->mutex_.AssertHeld();
2133
0
        s = impl->WriteToWAL(empty_batch, write_options, log_writer, &log_used,
2134
0
                             &log_size, log_file_number_size);
2135
0
        if (s.ok()) {
2136
          // Need to fsync, otherwise it might get lost after a power reset.
2137
0
          s = impl->FlushWAL(write_options, false);
2138
0
          TEST_SYNC_POINT_CALLBACK("DBImpl::Open::BeforeSyncWAL", /*arg=*/&s);
2139
0
          IOOptions opts;
2140
0
          if (s.ok()) {
2141
0
            s = WritableFileWriter::PrepareIOOptions(write_options, opts);
2142
0
          }
2143
0
          if (s.ok()) {
2144
0
            s = log_writer->file()->Sync(opts,
2145
0
                                         impl->immutable_db_options_.use_fsync);
2146
0
          }
2147
0
        }
2148
0
      }
2149
11.5k
    }
2150
11.5k
  }
2151
11.5k
  if (s.ok()) {
2152
11.5k
    s = impl->LogAndApplyForRecovery(recovery_ctx);
2153
11.5k
  }
2154
2155
11.5k
  if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
2156
0
    impl->mutex_.AssertHeld();
2157
0
    s = impl->InitPersistStatsColumnFamily();
2158
0
  }
2159
2160
11.5k
  if (s.ok()) {
2161
    // set column family handles
2162
17.5k
    for (const auto& cf : column_families) {
2163
17.5k
      auto cfd =
2164
17.5k
          impl->versions_->GetColumnFamilySet()->GetColumnFamily(cf.name);
2165
17.5k
      if (cfd != nullptr) {
2166
17.5k
        handles->push_back(
2167
17.5k
            new ColumnFamilyHandleImpl(cfd, impl, &impl->mutex_));
2168
17.5k
        impl->NewThreadStatusCfInfo(cfd);
2169
17.5k
      } else {
2170
0
        if (db_options.create_missing_column_families) {
2171
          // missing column family, create it
2172
0
          ColumnFamilyHandle* handle = nullptr;
2173
0
          impl->mutex_.Unlock();
2174
          // NOTE: the work normally done in WrapUpCreateColumnFamilies will
2175
          // be done separately below.
2176
0
          s = impl->CreateColumnFamilyImpl(read_options, write_options,
2177
0
                                           cf.options, cf.name, &handle);
2178
0
          impl->mutex_.Lock();
2179
0
          if (s.ok()) {
2180
0
            handles->push_back(handle);
2181
0
          } else {
2182
0
            break;
2183
0
          }
2184
0
        } else {
2185
0
          s = Status::InvalidArgument("Column family not found", cf.name);
2186
0
          break;
2187
0
        }
2188
0
      }
2189
17.5k
    }
2190
11.5k
  }
2191
2192
11.5k
  if (s.ok()) {
2193
11.5k
    SuperVersionContext sv_context(/* create_superversion */ true);
2194
17.5k
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
2195
17.5k
      impl->InstallSuperVersionAndScheduleWork(
2196
17.5k
          cfd, &sv_context, *cfd->GetLatestMutableCFOptions());
2197
17.5k
    }
2198
11.5k
    sv_context.Clean();
2199
11.5k
  }
2200
2201
11.5k
  if (s.ok() && impl->immutable_db_options_.persist_stats_to_disk) {
2202
    // try to read format version
2203
0
    s = impl->PersistentStatsProcessFormatVersion();
2204
0
  }
2205
2206
11.5k
  if (s.ok()) {
2207
17.5k
    for (auto cfd : *impl->versions_->GetColumnFamilySet()) {
2208
17.5k
      if (!cfd->mem()->IsSnapshotSupported()) {
2209
0
        impl->is_snapshot_supported_ = false;
2210
0
      }
2211
17.5k
      if (cfd->ioptions()->merge_operator != nullptr &&
2212
17.5k
          !cfd->mem()->IsMergeOperatorSupported()) {
2213
0
        s = Status::InvalidArgument(
2214
0
            "The memtable of column family %s does not support merge operator "
2215
0
            "its options.merge_operator is non-null",
2216
0
            cfd->GetName().c_str());
2217
0
      }
2218
17.5k
      if (!s.ok()) {
2219
0
        break;
2220
0
      }
2221
17.5k
    }
2222
11.5k
  }
2223
11.5k
  TEST_SYNC_POINT("DBImpl::Open:Opened");
2224
11.5k
  Status persist_options_status;
2225
11.5k
  if (s.ok()) {
2226
    // Persist RocksDB Options before scheduling the compaction.
2227
    // The WriteOptionsFile() will release and lock the mutex internally.
2228
11.5k
    persist_options_status =
2229
11.5k
        impl->WriteOptionsFile(write_options, true /*db_mutex_already_held*/);
2230
11.5k
    *dbptr = impl;
2231
11.5k
    impl->opened_successfully_ = true;
2232
11.5k
  } else {
2233
0
    persist_options_status.PermitUncheckedError();
2234
0
  }
2235
11.5k
  impl->mutex_.Unlock();
2236
2237
11.5k
  auto sfm = static_cast<SstFileManagerImpl*>(
2238
11.5k
      impl->immutable_db_options_.sst_file_manager.get());
2239
11.5k
  if (s.ok() && sfm) {
2240
    // Set Statistics ptr for SstFileManager to dump the stats of
2241
    // DeleteScheduler.
2242
11.5k
    sfm->SetStatisticsPtr(impl->immutable_db_options_.statistics);
2243
11.5k
    ROCKS_LOG_INFO(impl->immutable_db_options_.info_log,
2244
11.5k
                   "SstFileManager instance %p", sfm);
2245
2246
11.5k
    impl->TrackExistingDataFiles(recovery_ctx.existing_data_files_);
2247
2248
    // Reserve some disk buffer space. This is a heuristic - when we run out
2249
    // of disk space, this ensures that there is at least write_buffer_size
2250
    // amount of free space before we resume DB writes. In low disk space
2251
    // conditions, we want to avoid a lot of small L0 files due to frequent
2252
    // WAL write failures and resultant forced flushes
2253
11.5k
    sfm->ReserveDiskBuffer(max_write_buffer_size,
2254
11.5k
                           impl->immutable_db_options_.db_paths[0].path);
2255
11.5k
  }
2256
2257
11.5k
  if (s.ok()) {
2258
    // When the DB is stopped, it's possible that there are some .trash files
2259
    // that were not deleted yet, when we open the DB we will find these .trash
2260
    // files and schedule them to be deleted (or delete immediately if
2261
    // SstFileManager was not used).
2262
    // Note that we only start doing this and below delete obsolete file after
2263
    // `TrackExistingDataFiles` are called, the `max_trash_db_ratio` is
2264
    // ineffective otherwise and these files' deletion won't be rate limited
2265
    // which can cause discard stall.
2266
11.5k
    for (const auto& path : impl->CollectAllDBPaths()) {
2267
11.5k
      DeleteScheduler::CleanupDirectory(impl->immutable_db_options_.env, sfm,
2268
11.5k
                                        path)
2269
11.5k
          .PermitUncheckedError();
2270
11.5k
    }
2271
11.5k
    impl->mutex_.Lock();
2272
    // This will do a full scan.
2273
11.5k
    impl->DeleteObsoleteFiles();
2274
11.5k
    TEST_SYNC_POINT("DBImpl::Open:AfterDeleteFiles");
2275
11.5k
    impl->MaybeScheduleFlushOrCompaction();
2276
11.5k
    impl->mutex_.Unlock();
2277
11.5k
  }
2278
2279
11.5k
  if (s.ok()) {
2280
11.5k
    ROCKS_LOG_HEADER(impl->immutable_db_options_.info_log, "DB pointer %p",
2281
11.5k
                     impl);
2282
11.5k
    LogFlush(impl->immutable_db_options_.info_log);
2283
11.5k
    if (!impl->WALBufferIsEmpty()) {
2284
0
      s = impl->FlushWAL(write_options, false);
2285
0
      if (s.ok()) {
2286
        // Sync is needed otherwise WAL buffered data might get lost after a
2287
        // power reset.
2288
0
        log::Writer* log_writer = impl->logs_.back().writer;
2289
0
        IOOptions opts;
2290
0
        s = WritableFileWriter::PrepareIOOptions(write_options, opts);
2291
0
        if (s.ok()) {
2292
0
          s = log_writer->file()->Sync(opts,
2293
0
                                       impl->immutable_db_options_.use_fsync);
2294
0
        }
2295
0
      }
2296
0
    }
2297
11.5k
    if (s.ok() && !persist_options_status.ok()) {
2298
0
      s = Status::IOError(
2299
0
          "DB::Open() failed --- Unable to persist Options file",
2300
0
          persist_options_status.ToString());
2301
0
    }
2302
11.5k
  }
2303
11.5k
  if (!s.ok()) {
2304
0
    ROCKS_LOG_WARN(impl->immutable_db_options_.info_log,
2305
0
                   "DB::Open() failed: %s", s.ToString().c_str());
2306
0
  }
2307
11.5k
  if (s.ok()) {
2308
11.5k
    s = impl->StartPeriodicTaskScheduler();
2309
11.5k
  }
2310
11.5k
  if (s.ok()) {
2311
11.5k
    s = impl->RegisterRecordSeqnoTimeWorker(read_options, write_options,
2312
11.5k
                                            recovery_ctx.is_new_db_);
2313
11.5k
  }
2314
11.5k
  impl->options_mutex_.Unlock();
2315
11.5k
  if (!s.ok()) {
2316
0
    for (auto* h : *handles) {
2317
0
      delete h;
2318
0
    }
2319
0
    handles->clear();
2320
0
    delete impl;
2321
0
    *dbptr = nullptr;
2322
0
  }
2323
11.5k
  return s;
2324
11.5k
}
2325
}  // namespace ROCKSDB_NAMESPACE