Coverage Report

Created: 2026-05-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/wal_manager.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/wal_manager.h"
11
12
#include <algorithm>
13
#include <cinttypes>
14
#include <memory>
15
#include <vector>
16
17
#include "db/log_reader.h"
18
#include "db/log_writer.h"
19
#include "db/transaction_log_impl.h"
20
#include "db/write_batch_internal.h"
21
#include "file/file_util.h"
22
#include "file/filename.h"
23
#include "file/sequence_file_reader.h"
24
#include "logging/logging.h"
25
#include "port/port.h"
26
#include "rocksdb/env.h"
27
#include "rocksdb/options.h"
28
#include "rocksdb/write_batch.h"
29
#include "test_util/sync_point.h"
30
#include "util/cast_util.h"
31
#include "util/coding.h"
32
#include "util/mutexlock.h"
33
#include "util/string_util.h"
34
35
namespace ROCKSDB_NAMESPACE {
36
37
0
Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
38
0
  auto s = env_->DeleteFile(wal_dir_ + "/" + fname);
39
0
  if (s.ok()) {
40
0
    MutexLock l(&read_first_record_cache_mutex_);
41
0
    read_first_record_cache_.erase(number);
42
0
  }
43
0
  return s;
44
0
}
45
Status WalManager::GetSortedWalFiles(VectorWalPtr& files, bool need_seqnos,
46
0
                                     bool include_archived) {
47
  // First get sorted files in db dir, then get sorted files from archived
48
  // dir, to avoid a race condition where a log file is moved to archived
49
  // dir in between.
50
0
  Status s;
51
  // list wal files in main db dir.
52
0
  VectorWalPtr logs;
53
0
  s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile, need_seqnos);
54
55
0
  if (!include_archived || !s.ok()) {
56
0
    return s;
57
0
  }
58
59
  // Reproduce the race condition where a log file is moved
60
  // to archived dir, between these two sync points, used in
61
  // (DBTest,TransactionLogIteratorRace)
62
0
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
63
0
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
64
65
0
  files.clear();
66
  // list wal files in archive dir.
67
0
  std::string archivedir = ArchivalDirectory(wal_dir_);
68
0
  Status exists = env_->FileExists(archivedir);
69
0
  if (exists.ok()) {
70
0
    s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile, need_seqnos);
71
0
    if (!s.ok()) {
72
0
      return s;
73
0
    }
74
0
  } else if (!exists.IsNotFound()) {
75
0
    assert(s.ok());
76
0
    return exists;
77
0
  }
78
79
0
  uint64_t latest_archived_log_number = 0;
80
0
  if (!files.empty()) {
81
0
    latest_archived_log_number = files.back()->LogNumber();
82
0
    ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
83
0
                   latest_archived_log_number);
84
0
  }
85
86
0
  files.reserve(files.size() + logs.size());
87
0
  for (auto& log : logs) {
88
0
    if (log->LogNumber() > latest_archived_log_number) {
89
0
      files.push_back(std::move(log));
90
0
    } else {
91
      // When the race condition happens, we could see the
92
      // same log in both db dir and archived dir. Simply
93
      // ignore the one in db dir. Note that, if we read
94
      // archived dir first, we would have missed the log file.
95
0
      ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
96
0
                     log->PathName().c_str());
97
0
    }
98
0
  }
99
100
0
  return s;
101
0
}
102
103
Status WalManager::GetUpdatesSince(
104
    SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
105
    const TransactionLogIterator::ReadOptions& read_options,
106
0
    VersionSet* version_set) {
107
0
  if (seq_per_batch_) {
108
0
    return Status::NotSupported();
109
0
  }
110
111
0
  assert(!seq_per_batch_);
112
113
  //  Get all sorted Wal Files.
114
  //  Do binary search and open files and find the seq number.
115
116
0
  std::unique_ptr<VectorWalPtr> wal_files(new VectorWalPtr);
117
0
  Status s = GetSortedWalFiles(*wal_files);
118
0
  if (!s.ok()) {
119
0
    return s;
120
0
  }
121
122
0
  s = RetainProbableWalFiles(*wal_files, seq);
123
0
  if (!s.ok()) {
124
0
    return s;
125
0
  }
126
0
  iter->reset(new TransactionLogIteratorImpl(
127
0
      wal_dir_, &db_options_, read_options, file_options_, seq,
128
0
      std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
129
0
  return (*iter)->status();
130
0
}
131
132
// 1. Go through all archived files and
133
//    a. if ttl is enabled, delete outdated files
134
//    b. if archive size limit is enabled, delete empty files,
135
//        compute file number and size.
136
// 2. If size limit is enabled:
137
//    a. compute how many files should be deleted
138
//    b. get sorted non-empty archived logs
139
//    c. delete what should be deleted
140
107k
void WalManager::PurgeObsoleteWALFiles() {
141
107k
  bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
142
107k
  bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
143
107k
  if (!ttl_enabled && !size_limit_enabled) {
144
107k
    return;
145
107k
  }
146
147
0
  int64_t current_time = 0;
148
0
  Status s = db_options_.clock->GetCurrentTime(&current_time);
149
0
  if (!s.ok()) {
150
0
    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
151
0
                    s.ToString().c_str());
152
0
    assert(false);
153
0
    return;
154
0
  }
155
0
  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
156
0
  uint64_t const time_to_check =
157
0
      ttl_enabled
158
0
          ? std::min(kDefaultIntervalToDeleteObsoleteWAL,
159
0
                     std::max(uint64_t{1}, db_options_.WAL_ttl_seconds / 2))
160
0
          : kDefaultIntervalToDeleteObsoleteWAL;
161
0
  uint64_t old_last_run_time = purge_wal_files_last_run_.LoadRelaxed();
162
0
  do {
163
0
    if (old_last_run_time + time_to_check > now_seconds) {
164
      // last run is recent enough, no need to purge
165
0
      return;
166
0
    }
167
0
  } while (!purge_wal_files_last_run_.CasWeakRelaxed(
168
0
      /*expected=*/old_last_run_time, /*desired=*/now_seconds));
169
170
0
  std::string archival_dir = ArchivalDirectory(wal_dir_);
171
0
  std::vector<std::string> files;
172
0
  s = env_->GetChildren(archival_dir, &files);
173
0
  if (!s.ok()) {
174
0
    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
175
0
                    s.ToString().c_str());
176
0
    return;
177
0
  }
178
179
0
  size_t log_files_num = 0;
180
0
  uint64_t log_file_size = 0;
181
0
  for (auto& f : files) {
182
0
    uint64_t number;
183
0
    FileType type;
184
0
    if (ParseFileName(f, &number, &type) && type == kWalFile) {
185
0
      std::string const file_path = archival_dir + "/" + f;
186
0
      if (ttl_enabled) {
187
0
        uint64_t file_m_time;
188
0
        s = env_->GetFileModificationTime(file_path, &file_m_time);
189
0
        if (!s.ok()) {
190
0
          ROCKS_LOG_WARN(db_options_.info_log,
191
0
                         "Can't get file mod time: %s: %s", file_path.c_str(),
192
0
                         s.ToString().c_str());
193
0
          continue;
194
0
        }
195
196
        // Avoid expression `now_seconds - file_m_time` when
197
        // `file_m_time > now_seconds` to prevent unsigned underflow in case
198
        // system clock goes backwards. Both timestamps are based on wall clock
199
        // time, which is not guaranteed to be monotonic.
200
0
        if (file_m_time <= now_seconds &&
201
0
            now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
202
0
          s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
203
0
                           /*force_fg=*/!wal_in_db_path_);
204
0
          if (!s.ok()) {
205
0
            ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
206
0
                           file_path.c_str(), s.ToString().c_str());
207
0
            continue;
208
0
          } else {
209
0
            MutexLock l(&read_first_record_cache_mutex_);
210
0
            read_first_record_cache_.erase(number);
211
0
          }
212
0
          continue;
213
0
        }
214
0
      }
215
216
0
      if (size_limit_enabled) {
217
0
        uint64_t file_size;
218
0
        s = env_->GetFileSize(file_path, &file_size);
219
0
        if (!s.ok()) {
220
0
          ROCKS_LOG_ERROR(db_options_.info_log,
221
0
                          "Unable to get file size: %s: %s", file_path.c_str(),
222
0
                          s.ToString().c_str());
223
0
          return;
224
0
        } else {
225
0
          if (file_size > 0) {
226
0
            log_file_size = std::max(log_file_size, file_size);
227
0
            ++log_files_num;
228
0
          } else {
229
0
            s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
230
0
                             /*force_fg=*/!wal_in_db_path_);
231
0
            if (!s.ok()) {
232
0
              ROCKS_LOG_WARN(db_options_.info_log,
233
0
                             "Unable to delete file: %s: %s", file_path.c_str(),
234
0
                             s.ToString().c_str());
235
0
              continue;
236
0
            } else {
237
0
              MutexLock l(&read_first_record_cache_mutex_);
238
0
              read_first_record_cache_.erase(number);
239
0
            }
240
0
          }
241
0
        }
242
0
      }
243
0
    }
244
0
  }
245
246
0
  if (0 == log_files_num || !size_limit_enabled) {
247
0
    return;
248
0
  }
249
250
0
  size_t const files_keep_num = static_cast<size_t>(
251
0
      db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size);
252
0
  if (log_files_num <= files_keep_num) {
253
0
    return;
254
0
  }
255
256
0
  size_t files_del_num = log_files_num - files_keep_num;
257
0
  VectorWalPtr archived_logs;
258
0
  s = GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile,
259
0
                          /*need_seqno=*/false);
260
0
  if (!s.ok()) {
261
0
    ROCKS_LOG_WARN(db_options_.info_log,
262
0
                   "Unable to get archived WALs from: %s: %s",
263
0
                   archival_dir.c_str(), s.ToString().c_str());
264
0
    files_del_num = 0;
265
0
  } else if (files_del_num > archived_logs.size()) {
266
0
    ROCKS_LOG_WARN(db_options_.info_log,
267
0
                   "Trying to delete more archived log files than "
268
0
                   "exist. Deleting all");
269
0
    files_del_num = archived_logs.size();
270
0
  }
271
272
0
  for (size_t i = 0; i < files_del_num; ++i) {
273
0
    std::string const file_path = archived_logs[i]->PathName();
274
0
    s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false,
275
0
                     /*force_fg=*/!wal_in_db_path_);
276
0
    if (!s.ok()) {
277
0
      ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
278
0
                     file_path.c_str(), s.ToString().c_str());
279
0
      continue;
280
0
    } else {
281
0
      MutexLock l(&read_first_record_cache_mutex_);
282
0
      read_first_record_cache_.erase(archived_logs[i]->LogNumber());
283
0
    }
284
0
  }
285
0
}
286
287
0
void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
288
0
  auto archived_log_name = ArchivedLogFileName(wal_dir_, number);
289
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
290
0
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
291
0
  Status s = env_->RenameFile(fname, archived_log_name);
292
0
  IGNORE_STATUS_IF_ERROR(s);
293
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
294
0
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
295
  // The sync point below is used in
296
  // (CheckPointTest, CheckpointWithArchievedLog)
297
0
  TEST_SYNC_POINT("WalManager::ArchiveWALFile");
298
0
  ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
299
0
                 fname.c_str(), archived_log_name.c_str(),
300
0
                 s.ToString().c_str());
301
0
}
302
303
Status WalManager::GetSortedWalsOfType(const std::string& path,
304
                                       VectorWalPtr& log_files,
305
0
                                       WalFileType log_type, bool need_seqnos) {
306
0
  std::vector<std::string> all_files;
307
0
  const Status status = env_->GetChildren(path, &all_files);
308
0
  if (!status.ok()) {
309
0
    return status;
310
0
  }
311
0
  log_files.reserve(all_files.size());
312
0
  for (const auto& f : all_files) {
313
0
    uint64_t number;
314
0
    FileType type;
315
0
    if (ParseFileName(f, &number, &type) && type == kWalFile) {
316
0
      SequenceNumber sequence;
317
0
      if (need_seqnos) {
318
0
        Status s = ReadFirstRecord(log_type, number, &sequence);
319
0
        if (!s.ok()) {
320
0
          return s;
321
0
        }
322
0
        if (sequence == 0) {
323
          // empty file
324
0
          continue;
325
0
        }
326
0
      } else {
327
0
        sequence = 0;
328
0
      }
329
330
      // Reproduce the race condition where a log file is moved
331
      // to archived dir, between these two sync points, used in
332
      // (DBTest,TransactionLogIteratorRace)
333
0
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
334
0
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
335
336
0
      uint64_t size_bytes;
337
0
      Status s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
338
      // re-try in case the alive log file has been moved to archive.
339
0
      if (!s.ok() && log_type == kAliveLogFile) {
340
0
        std::string archived_file = ArchivedLogFileName(path, number);
341
0
        if (env_->FileExists(archived_file).ok()) {
342
0
          s = env_->GetFileSize(archived_file, &size_bytes);
343
0
          if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
344
            // oops, the file just got deleted from archived dir! move on
345
0
            s = Status::OK();
346
0
            continue;
347
0
          }
348
0
        }
349
0
      }
350
0
      if (!s.ok()) {
351
0
        return s;
352
0
      }
353
354
0
      log_files.emplace_back(
355
0
          new WalFileImpl(number, log_type, sequence, size_bytes));
356
0
    }
357
0
  }
358
0
  std::sort(
359
0
      log_files.begin(), log_files.end(),
360
0
      [](const std::unique_ptr<WalFile>& a, const std::unique_ptr<WalFile>& b) {
361
0
        WalFileImpl* a_impl = static_cast_with_check<WalFileImpl>(a.get());
362
0
        WalFileImpl* b_impl = static_cast_with_check<WalFileImpl>(b.get());
363
0
        return *a_impl < *b_impl;
364
0
      });
365
0
  return status;
366
0
}
367
368
Status WalManager::RetainProbableWalFiles(VectorWalPtr& all_logs,
369
0
                                          const SequenceNumber target) {
370
0
  int64_t start = 0;  // signed to avoid overflow when target is < first file.
371
0
  int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
372
  // Binary Search. avoid opening all files.
373
0
  while (end >= start) {
374
0
    int64_t mid = start + (end - start) / 2;  // Avoid overflow.
375
0
    SequenceNumber current_seq_num =
376
0
        all_logs.at(static_cast<size_t>(mid))->StartSequence();
377
0
    if (current_seq_num == target) {
378
0
      end = mid;
379
0
      break;
380
0
    } else if (current_seq_num < target) {
381
0
      start = mid + 1;
382
0
    } else {
383
0
      end = mid - 1;
384
0
    }
385
0
  }
386
  // end could be -ve.
387
0
  size_t start_index =
388
0
      static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
389
  // The last wal file is always included
390
0
  all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
391
0
  return Status::OK();
392
0
}
393
394
Status WalManager::ReadFirstRecord(const WalFileType type,
395
                                   const uint64_t number,
396
0
                                   SequenceNumber* sequence) {
397
0
  *sequence = 0;
398
0
  if (type != kAliveLogFile && type != kArchivedLogFile) {
399
0
    ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
400
0
                    std::to_string(type).c_str());
401
0
    return Status::NotSupported("File Type Not Known " + std::to_string(type));
402
0
  }
403
0
  {
404
0
    MutexLock l(&read_first_record_cache_mutex_);
405
0
    auto itr = read_first_record_cache_.find(number);
406
0
    if (itr != read_first_record_cache_.end()) {
407
0
      *sequence = itr->second;
408
0
      return Status::OK();
409
0
    }
410
0
  }
411
0
  Status s;
412
0
  if (type == kAliveLogFile) {
413
0
    std::string fname = LogFileName(wal_dir_, number);
414
0
    s = ReadFirstLine(fname, number, sequence);
415
0
    if (!s.ok() && env_->FileExists(fname).ok()) {
416
      // return any error that is not caused by non-existing file
417
0
      return s;
418
0
    }
419
0
  }
420
421
0
  if (type == kArchivedLogFile || !s.ok()) {
422
    //  check if the file got moved to archive.
423
0
    std::string archived_file = ArchivedLogFileName(wal_dir_, number);
424
0
    s = ReadFirstLine(archived_file, number, sequence);
425
    // maybe the file was deleted from archive dir. If that's the case, return
426
    // Status::OK(). The caller with identify this as empty file because
427
    // *sequence == 0
428
0
    if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
429
0
      return Status::OK();
430
0
    }
431
0
  }
432
433
0
  if (s.ok() && *sequence != 0) {
434
0
    MutexLock l(&read_first_record_cache_mutex_);
435
0
    read_first_record_cache_.insert({number, *sequence});
436
0
  }
437
0
  return s;
438
0
}
439
440
Status WalManager::GetLiveWalFile(uint64_t number,
441
0
                                  std::unique_ptr<WalFile>* log_file) {
442
0
  if (!log_file) {
443
0
    return Status::InvalidArgument("log_file not preallocated.");
444
0
  }
445
446
0
  if (!number) {
447
0
    return Status::PathNotFound("log file not available");
448
0
  }
449
450
0
  Status s;
451
452
0
  uint64_t size_bytes;
453
0
  s = env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes);
454
455
0
  if (!s.ok()) {
456
0
    return s;
457
0
  }
458
459
0
  log_file->reset(new WalFileImpl(number, kAliveLogFile,
460
0
                                  0,  // SequenceNumber
461
0
                                  size_bytes));
462
463
0
  return Status::OK();
464
0
}
465
466
// the function returns status.ok() and sequence == 0 if the file exists, but is
467
// empty
468
Status WalManager::ReadFirstLine(const std::string& fname,
469
                                 const uint64_t number,
470
0
                                 SequenceNumber* sequence) {
471
0
  struct LogReporter : public log::Reader::Reporter {
472
0
    Env* env;
473
0
    Logger* info_log;
474
0
    const char* fname;
475
476
0
    Status* status;
477
0
    bool ignore_error;  // true if db_options_.paranoid_checks==false
478
0
    void Corruption(size_t bytes, const Status& s,
479
0
                    uint64_t /*log_number*/ = kMaxSequenceNumber) override {
480
0
      ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
481
0
                     (this->ignore_error ? "(ignoring error) " : ""), fname,
482
0
                     static_cast<int>(bytes), s.ToString().c_str());
483
0
      if (this->status->ok()) {
484
        // only keep the first error
485
0
        *this->status = s;
486
0
      }
487
0
    }
488
0
  };
489
490
0
  std::unique_ptr<FSSequentialFile> file;
491
0
  Status status = fs_->NewSequentialFile(
492
0
      fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
493
0
  std::unique_ptr<SequentialFileReader> file_reader(
494
0
      new SequentialFileReader(std::move(file), fname, io_tracer_));
495
496
0
  if (!status.ok()) {
497
0
    return status;
498
0
  }
499
500
0
  LogReporter reporter;
501
0
  reporter.env = env_;
502
0
  reporter.info_log = db_options_.info_log.get();
503
0
  reporter.fname = fname.c_str();
504
0
  reporter.status = &status;
505
0
  reporter.ignore_error = !db_options_.paranoid_checks;
506
0
  log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
507
0
                     true /*checksum*/, number);
508
0
  std::string scratch;
509
0
  Slice record;
510
511
0
  if (reader.ReadRecord(&record, &scratch) &&
512
0
      (status.ok() || !db_options_.paranoid_checks)) {
513
0
    if (record.size() < WriteBatchInternal::kHeader) {
514
0
      reporter.Corruption(record.size(),
515
0
                          Status::Corruption("log record too small"));
516
      // TODO read record's till the first no corrupt entry?
517
0
    } else {
518
0
      WriteBatch batch;
519
      // We can overwrite an existing non-OK Status since it'd only reach here
520
      // with `paranoid_checks == false`.
521
0
      status = WriteBatchInternal::SetContents(&batch, record);
522
0
      if (status.ok()) {
523
0
        *sequence = WriteBatchInternal::Sequence(&batch);
524
0
        return status;
525
0
      }
526
0
    }
527
0
  }
528
529
0
  if (status.ok() && reader.IsCompressedAndEmptyFile()) {
530
    // In case of wal_compression, it writes a `kSetCompressionType` record
531
    // which is not associated with any sequence number. As result for an empty
532
    // file, GetSortedWalsOfType() will skip these WALs causing the operations
533
    // to fail.
534
    // Therefore, in order to avoid that failure, it sets sequence_number to 1
535
    // indicating those WALs should be included.
536
0
    *sequence = 1;
537
0
  } else {
538
    // ReadRecord might have returned false on EOF, which means that the log
539
    // file is empty. Or, a failure may have occurred while processing the first
540
    // entry. In any case, return status and set sequence number to 0.
541
0
    *sequence = 0;
542
0
  }
543
0
  return status;
544
0
}
545
546
}  // namespace ROCKSDB_NAMESPACE