Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/db/wal_manager.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/wal_manager.h"
11
12
#include <algorithm>
13
#include <cinttypes>
14
#include <memory>
15
#include <vector>
16
17
#include "db/log_reader.h"
18
#include "db/log_writer.h"
19
#include "db/transaction_log_impl.h"
20
#include "db/write_batch_internal.h"
21
#include "file/file_util.h"
22
#include "file/filename.h"
23
#include "file/sequence_file_reader.h"
24
#include "logging/logging.h"
25
#include "port/port.h"
26
#include "rocksdb/env.h"
27
#include "rocksdb/options.h"
28
#include "rocksdb/write_batch.h"
29
#include "test_util/sync_point.h"
30
#include "util/cast_util.h"
31
#include "util/coding.h"
32
#include "util/mutexlock.h"
33
#include "util/string_util.h"
34
35
namespace ROCKSDB_NAMESPACE {
36
37
38
0
Status WalManager::DeleteFile(const std::string& fname, uint64_t number) {
39
0
  auto s = env_->DeleteFile(wal_dir_ + "/" + fname);
40
0
  if (s.ok()) {
41
0
    MutexLock l(&read_first_record_cache_mutex_);
42
0
    read_first_record_cache_.erase(number);
43
0
  }
44
0
  return s;
45
0
}
46
Status WalManager::GetSortedWalFiles(VectorWalPtr& files, bool need_seqnos,
47
0
                                     bool include_archived) {
48
  // First get sorted files in db dir, then get sorted files from archived
49
  // dir, to avoid a race condition where a log file is moved to archived
50
  // dir in between.
51
0
  Status s;
52
  // list wal files in main db dir.
53
0
  VectorWalPtr logs;
54
0
  s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile, need_seqnos);
55
56
0
  if (!include_archived || !s.ok()) {
57
0
    return s;
58
0
  }
59
60
  // Reproduce the race condition where a log file is moved
61
  // to archived dir, between these two sync points, used in
62
  // (DBTest,TransactionLogIteratorRace)
63
0
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1");
64
0
  TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2");
65
66
0
  files.clear();
67
  // list wal files in archive dir.
68
0
  std::string archivedir = ArchivalDirectory(wal_dir_);
69
0
  Status exists = env_->FileExists(archivedir);
70
0
  if (exists.ok()) {
71
0
    s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile, need_seqnos);
72
0
    if (!s.ok()) {
73
0
      return s;
74
0
    }
75
0
  } else if (!exists.IsNotFound()) {
76
0
    assert(s.ok());
77
0
    return exists;
78
0
  }
79
80
0
  uint64_t latest_archived_log_number = 0;
81
0
  if (!files.empty()) {
82
0
    latest_archived_log_number = files.back()->LogNumber();
83
0
    ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64,
84
0
                   latest_archived_log_number);
85
0
  }
86
87
0
  files.reserve(files.size() + logs.size());
88
0
  for (auto& log : logs) {
89
0
    if (log->LogNumber() > latest_archived_log_number) {
90
0
      files.push_back(std::move(log));
91
0
    } else {
92
      // When the race condition happens, we could see the
93
      // same log in both db dir and archived dir. Simply
94
      // ignore the one in db dir. Note that, if we read
95
      // archived dir first, we would have missed the log file.
96
0
      ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive",
97
0
                     log->PathName().c_str());
98
0
    }
99
0
  }
100
101
0
  return s;
102
0
}
103
104
Status WalManager::GetUpdatesSince(
105
    SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter,
106
    const TransactionLogIterator::ReadOptions& read_options,
107
0
    VersionSet* version_set) {
108
0
  if (seq_per_batch_) {
109
0
    return Status::NotSupported();
110
0
  }
111
112
0
  assert(!seq_per_batch_);
113
114
  //  Get all sorted Wal Files.
115
  //  Do binary search and open files and find the seq number.
116
117
0
  std::unique_ptr<VectorWalPtr> wal_files(new VectorWalPtr);
118
0
  Status s = GetSortedWalFiles(*wal_files);
119
0
  if (!s.ok()) {
120
0
    return s;
121
0
  }
122
123
0
  s = RetainProbableWalFiles(*wal_files, seq);
124
0
  if (!s.ok()) {
125
0
    return s;
126
0
  }
127
0
  iter->reset(new TransactionLogIteratorImpl(
128
0
      wal_dir_, &db_options_, read_options, file_options_, seq,
129
0
      std::move(wal_files), version_set, seq_per_batch_, io_tracer_));
130
0
  return (*iter)->status();
131
0
}
132
133
// 1. Go through all archived files and
134
//    a. if ttl is enabled, delete outdated files
135
//    b. if archive size limit is enabled, delete empty files,
136
//        compute file number and size.
137
// 2. If size limit is enabled:
138
//    a. compute how many files should be deleted
139
//    b. get sorted non-empty archived logs
140
//    c. delete what should be deleted
141
49.9k
void WalManager::PurgeObsoleteWALFiles() {
142
49.9k
  bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0;
143
49.9k
  bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0;
144
49.9k
  if (!ttl_enabled && !size_limit_enabled) {
145
49.9k
    return;
146
49.9k
  }
147
148
0
  int64_t current_time = 0;
149
0
  Status s = db_options_.clock->GetCurrentTime(&current_time);
150
0
  if (!s.ok()) {
151
0
    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
152
0
                    s.ToString().c_str());
153
0
    assert(false);
154
0
    return;
155
0
  }
156
0
  uint64_t const now_seconds = static_cast<uint64_t>(current_time);
157
0
  uint64_t const time_to_check =
158
0
      ttl_enabled
159
0
          ? std::min(kDefaultIntervalToDeleteObsoleteWAL,
160
0
                     std::max(uint64_t{1}, db_options_.WAL_ttl_seconds / 2))
161
0
          : kDefaultIntervalToDeleteObsoleteWAL;
162
0
  uint64_t old_last_run_time = purge_wal_files_last_run_.LoadRelaxed();
163
0
  do {
164
0
    if (old_last_run_time + time_to_check > now_seconds) {
165
      // last run is recent enough, no need to purge
166
0
      return;
167
0
    }
168
0
  } while (!purge_wal_files_last_run_.CasWeakRelaxed(
169
0
      /*expected=*/old_last_run_time, /*desired=*/now_seconds));
170
171
0
  std::string archival_dir = ArchivalDirectory(wal_dir_);
172
0
  std::vector<std::string> files;
173
0
  s = env_->GetChildren(archival_dir, &files);
174
0
  if (!s.ok()) {
175
0
    ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s",
176
0
                    s.ToString().c_str());
177
0
    return;
178
0
  }
179
180
0
  size_t log_files_num = 0;
181
0
  uint64_t log_file_size = 0;
182
0
  for (auto& f : files) {
183
0
    uint64_t number;
184
0
    FileType type;
185
0
    if (ParseFileName(f, &number, &type) && type == kWalFile) {
186
0
      std::string const file_path = archival_dir + "/" + f;
187
0
      if (ttl_enabled) {
188
0
        uint64_t file_m_time;
189
0
        s = env_->GetFileModificationTime(file_path, &file_m_time);
190
0
        if (!s.ok()) {
191
0
          ROCKS_LOG_WARN(db_options_.info_log,
192
0
                         "Can't get file mod time: %s: %s", file_path.c_str(),
193
0
                         s.ToString().c_str());
194
0
          continue;
195
0
        }
196
0
        if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) {
197
0
          s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
198
0
                           /*force_fg=*/!wal_in_db_path_);
199
0
          if (!s.ok()) {
200
0
            ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s",
201
0
                           file_path.c_str(), s.ToString().c_str());
202
0
            continue;
203
0
          } else {
204
0
            MutexLock l(&read_first_record_cache_mutex_);
205
0
            read_first_record_cache_.erase(number);
206
0
          }
207
0
          continue;
208
0
        }
209
0
      }
210
211
0
      if (size_limit_enabled) {
212
0
        uint64_t file_size;
213
0
        s = env_->GetFileSize(file_path, &file_size);
214
0
        if (!s.ok()) {
215
0
          ROCKS_LOG_ERROR(db_options_.info_log,
216
0
                          "Unable to get file size: %s: %s", file_path.c_str(),
217
0
                          s.ToString().c_str());
218
0
          return;
219
0
        } else {
220
0
          if (file_size > 0) {
221
0
            log_file_size = std::max(log_file_size, file_size);
222
0
            ++log_files_num;
223
0
          } else {
224
0
            s = DeleteDBFile(&db_options_, file_path, archival_dir, false,
225
0
                             /*force_fg=*/!wal_in_db_path_);
226
0
            if (!s.ok()) {
227
0
              ROCKS_LOG_WARN(db_options_.info_log,
228
0
                             "Unable to delete file: %s: %s", file_path.c_str(),
229
0
                             s.ToString().c_str());
230
0
              continue;
231
0
            } else {
232
0
              MutexLock l(&read_first_record_cache_mutex_);
233
0
              read_first_record_cache_.erase(number);
234
0
            }
235
0
          }
236
0
        }
237
0
      }
238
0
    }
239
0
  }
240
241
0
  if (0 == log_files_num || !size_limit_enabled) {
242
0
    return;
243
0
  }
244
245
0
  size_t const files_keep_num = static_cast<size_t>(
246
0
      db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size);
247
0
  if (log_files_num <= files_keep_num) {
248
0
    return;
249
0
  }
250
251
0
  size_t files_del_num = log_files_num - files_keep_num;
252
0
  VectorWalPtr archived_logs;
253
0
  s = GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile,
254
0
                          /*need_seqno=*/false);
255
0
  if (!s.ok()) {
256
0
    ROCKS_LOG_WARN(db_options_.info_log,
257
0
                   "Unable to get archived WALs from: %s: %s",
258
0
                   archival_dir.c_str(), s.ToString().c_str());
259
0
    files_del_num = 0;
260
0
  } else if (files_del_num > archived_logs.size()) {
261
0
    ROCKS_LOG_WARN(db_options_.info_log,
262
0
                   "Trying to delete more archived log files than "
263
0
                   "exist. Deleting all");
264
0
    files_del_num = archived_logs.size();
265
0
  }
266
267
0
  for (size_t i = 0; i < files_del_num; ++i) {
268
0
    std::string const file_path = archived_logs[i]->PathName();
269
0
    s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false,
270
0
                     /*force_fg=*/!wal_in_db_path_);
271
0
    if (!s.ok()) {
272
0
      ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s",
273
0
                     file_path.c_str(), s.ToString().c_str());
274
0
      continue;
275
0
    } else {
276
0
      MutexLock l(&read_first_record_cache_mutex_);
277
0
      read_first_record_cache_.erase(archived_logs[i]->LogNumber());
278
0
    }
279
0
  }
280
0
}
281
282
0
void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) {
283
0
  auto archived_log_name = ArchivedLogFileName(wal_dir_, number);
284
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
285
0
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1");
286
0
  Status s = env_->RenameFile(fname, archived_log_name);
287
  // The sync point below is used in (DBTest,TransactionLogIteratorRace)
288
0
  TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2");
289
  // The sync point below is used in
290
  // (CheckPointTest, CheckpointWithArchievedLog)
291
0
  TEST_SYNC_POINT("WalManager::ArchiveWALFile");
292
0
  ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n",
293
0
                 fname.c_str(), archived_log_name.c_str(),
294
0
                 s.ToString().c_str());
295
0
}
296
297
Status WalManager::GetSortedWalsOfType(const std::string& path,
298
                                       VectorWalPtr& log_files,
299
0
                                       WalFileType log_type, bool need_seqnos) {
300
0
  std::vector<std::string> all_files;
301
0
  const Status status = env_->GetChildren(path, &all_files);
302
0
  if (!status.ok()) {
303
0
    return status;
304
0
  }
305
0
  log_files.reserve(all_files.size());
306
0
  for (const auto& f : all_files) {
307
0
    uint64_t number;
308
0
    FileType type;
309
0
    if (ParseFileName(f, &number, &type) && type == kWalFile) {
310
0
      SequenceNumber sequence;
311
0
      if (need_seqnos) {
312
0
        Status s = ReadFirstRecord(log_type, number, &sequence);
313
0
        if (!s.ok()) {
314
0
          return s;
315
0
        }
316
0
        if (sequence == 0) {
317
          // empty file
318
0
          continue;
319
0
        }
320
0
      } else {
321
0
        sequence = 0;
322
0
      }
323
324
      // Reproduce the race condition where a log file is moved
325
      // to archived dir, between these two sync points, used in
326
      // (DBTest,TransactionLogIteratorRace)
327
0
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1");
328
0
      TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2");
329
330
0
      uint64_t size_bytes;
331
0
      Status s = env_->GetFileSize(LogFileName(path, number), &size_bytes);
332
      // re-try in case the alive log file has been moved to archive.
333
0
      if (!s.ok() && log_type == kAliveLogFile) {
334
0
        std::string archived_file = ArchivedLogFileName(path, number);
335
0
        if (env_->FileExists(archived_file).ok()) {
336
0
          s = env_->GetFileSize(archived_file, &size_bytes);
337
0
          if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
338
            // oops, the file just got deleted from archived dir! move on
339
0
            s = Status::OK();
340
0
            continue;
341
0
          }
342
0
        }
343
0
      }
344
0
      if (!s.ok()) {
345
0
        return s;
346
0
      }
347
348
0
      log_files.emplace_back(
349
0
          new WalFileImpl(number, log_type, sequence, size_bytes));
350
0
    }
351
0
  }
352
0
  std::sort(
353
0
      log_files.begin(), log_files.end(),
354
0
      [](const std::unique_ptr<WalFile>& a, const std::unique_ptr<WalFile>& b) {
355
0
        WalFileImpl* a_impl = static_cast_with_check<WalFileImpl>(a.get());
356
0
        WalFileImpl* b_impl = static_cast_with_check<WalFileImpl>(b.get());
357
0
        return *a_impl < *b_impl;
358
0
      });
359
0
  return status;
360
0
}
361
362
Status WalManager::RetainProbableWalFiles(VectorWalPtr& all_logs,
363
0
                                          const SequenceNumber target) {
364
0
  int64_t start = 0;  // signed to avoid overflow when target is < first file.
365
0
  int64_t end = static_cast<int64_t>(all_logs.size()) - 1;
366
  // Binary Search. avoid opening all files.
367
0
  while (end >= start) {
368
0
    int64_t mid = start + (end - start) / 2;  // Avoid overflow.
369
0
    SequenceNumber current_seq_num =
370
0
        all_logs.at(static_cast<size_t>(mid))->StartSequence();
371
0
    if (current_seq_num == target) {
372
0
      end = mid;
373
0
      break;
374
0
    } else if (current_seq_num < target) {
375
0
      start = mid + 1;
376
0
    } else {
377
0
      end = mid - 1;
378
0
    }
379
0
  }
380
  // end could be -ve.
381
0
  size_t start_index =
382
0
      static_cast<size_t>(std::max(static_cast<int64_t>(0), end));
383
  // The last wal file is always included
384
0
  all_logs.erase(all_logs.begin(), all_logs.begin() + start_index);
385
0
  return Status::OK();
386
0
}
387
388
Status WalManager::ReadFirstRecord(const WalFileType type,
389
                                   const uint64_t number,
390
0
                                   SequenceNumber* sequence) {
391
0
  *sequence = 0;
392
0
  if (type != kAliveLogFile && type != kArchivedLogFile) {
393
0
    ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s",
394
0
                    std::to_string(type).c_str());
395
0
    return Status::NotSupported("File Type Not Known " + std::to_string(type));
396
0
  }
397
0
  {
398
0
    MutexLock l(&read_first_record_cache_mutex_);
399
0
    auto itr = read_first_record_cache_.find(number);
400
0
    if (itr != read_first_record_cache_.end()) {
401
0
      *sequence = itr->second;
402
0
      return Status::OK();
403
0
    }
404
0
  }
405
0
  Status s;
406
0
  if (type == kAliveLogFile) {
407
0
    std::string fname = LogFileName(wal_dir_, number);
408
0
    s = ReadFirstLine(fname, number, sequence);
409
0
    if (!s.ok() && env_->FileExists(fname).ok()) {
410
      // return any error that is not caused by non-existing file
411
0
      return s;
412
0
    }
413
0
  }
414
415
0
  if (type == kArchivedLogFile || !s.ok()) {
416
    //  check if the file got moved to archive.
417
0
    std::string archived_file = ArchivedLogFileName(wal_dir_, number);
418
0
    s = ReadFirstLine(archived_file, number, sequence);
419
    // maybe the file was deleted from archive dir. If that's the case, return
420
    // Status::OK(). The caller with identify this as empty file because
421
    // *sequence == 0
422
0
    if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) {
423
0
      return Status::OK();
424
0
    }
425
0
  }
426
427
0
  if (s.ok() && *sequence != 0) {
428
0
    MutexLock l(&read_first_record_cache_mutex_);
429
0
    read_first_record_cache_.insert({number, *sequence});
430
0
  }
431
0
  return s;
432
0
}
433
434
Status WalManager::GetLiveWalFile(uint64_t number,
435
0
                                  std::unique_ptr<WalFile>* log_file) {
436
0
  if (!log_file) {
437
0
    return Status::InvalidArgument("log_file not preallocated.");
438
0
  }
439
440
0
  if (!number) {
441
0
    return Status::PathNotFound("log file not available");
442
0
  }
443
444
0
  Status s;
445
446
0
  uint64_t size_bytes;
447
0
  s = env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes);
448
449
0
  if (!s.ok()) {
450
0
    return s;
451
0
  }
452
453
0
  log_file->reset(new WalFileImpl(number, kAliveLogFile,
454
0
                                  0,  // SequenceNumber
455
0
                                  size_bytes));
456
457
0
  return Status::OK();
458
0
}
459
460
// the function returns status.ok() and sequence == 0 if the file exists, but is
461
// empty
462
Status WalManager::ReadFirstLine(const std::string& fname,
463
                                 const uint64_t number,
464
0
                                 SequenceNumber* sequence) {
465
0
  struct LogReporter : public log::Reader::Reporter {
466
0
    Env* env;
467
0
    Logger* info_log;
468
0
    const char* fname;
469
470
0
    Status* status;
471
0
    bool ignore_error;  // true if db_options_.paranoid_checks==false
472
0
    void Corruption(size_t bytes, const Status& s) override {
473
0
      ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s",
474
0
                     (this->ignore_error ? "(ignoring error) " : ""), fname,
475
0
                     static_cast<int>(bytes), s.ToString().c_str());
476
0
      if (this->status->ok()) {
477
        // only keep the first error
478
0
        *this->status = s;
479
0
      }
480
0
    }
481
0
  };
482
483
0
  std::unique_ptr<FSSequentialFile> file;
484
0
  Status status = fs_->NewSequentialFile(
485
0
      fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr);
486
0
  std::unique_ptr<SequentialFileReader> file_reader(
487
0
      new SequentialFileReader(std::move(file), fname, io_tracer_));
488
489
0
  if (!status.ok()) {
490
0
    return status;
491
0
  }
492
493
0
  LogReporter reporter;
494
0
  reporter.env = env_;
495
0
  reporter.info_log = db_options_.info_log.get();
496
0
  reporter.fname = fname.c_str();
497
0
  reporter.status = &status;
498
0
  reporter.ignore_error = !db_options_.paranoid_checks;
499
0
  log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter,
500
0
                     true /*checksum*/, number);
501
0
  std::string scratch;
502
0
  Slice record;
503
504
0
  if (reader.ReadRecord(&record, &scratch) &&
505
0
      (status.ok() || !db_options_.paranoid_checks)) {
506
0
    if (record.size() < WriteBatchInternal::kHeader) {
507
0
      reporter.Corruption(record.size(),
508
0
                          Status::Corruption("log record too small"));
509
      // TODO read record's till the first no corrupt entry?
510
0
    } else {
511
0
      WriteBatch batch;
512
      // We can overwrite an existing non-OK Status since it'd only reach here
513
      // with `paranoid_checks == false`.
514
0
      status = WriteBatchInternal::SetContents(&batch, record);
515
0
      if (status.ok()) {
516
0
        *sequence = WriteBatchInternal::Sequence(&batch);
517
0
        return status;
518
0
      }
519
0
    }
520
0
  }
521
522
0
  if (status.ok() && reader.IsCompressedAndEmptyFile()) {
523
    // In case of wal_compression, it writes a `kSetCompressionType` record
524
    // which is not associated with any sequence number. As result for an empty
525
    // file, GetSortedWalsOfType() will skip these WALs causing the operations
526
    // to fail.
527
    // Therefore, in order to avoid that failure, it sets sequence_number to 1
528
    // indicating those WALs should be included.
529
0
    *sequence = 1;
530
0
  } else {
531
    // ReadRecord might have returned false on EOF, which means that the log
532
    // file is empty. Or, a failure may have occurred while processing the first
533
    // entry. In any case, return status and set sequence number to 0.
534
0
    *sequence = 0;
535
0
  }
536
0
  return status;
537
0
}
538
539
}  // namespace ROCKSDB_NAMESPACE