Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/wal_edit.cc
Line
Count
Source
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
// This source code is licensed under both the GPLv2 (found in the
3
// COPYING file in the root directory) and Apache 2.0 License
4
// (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/wal_edit.h"
7
8
#include "rocksdb/slice.h"
9
#include "rocksdb/status.h"
10
#include "util/coding.h"
11
12
namespace ROCKSDB_NAMESPACE {
13
14
0
void WalAddition::EncodeTo(std::string* dst) const {
15
0
  PutVarint64(dst, number_);
16
17
0
  if (metadata_.HasSyncedSize()) {
18
0
    PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kSyncedSize));
19
0
    PutVarint64(dst, metadata_.GetSyncedSizeInBytes());
20
0
  }
21
22
0
  PutVarint32(dst, static_cast<uint32_t>(WalAdditionTag::kTerminate));
23
0
}
24
25
0
Status WalAddition::DecodeFrom(Slice* src) {
26
0
  constexpr char class_name[] = "WalAddition";
27
28
0
  if (!GetVarint64(src, &number_)) {
29
0
    return Status::Corruption(class_name, "Error decoding WAL log number");
30
0
  }
31
32
0
  while (true) {
33
0
    uint32_t tag_value = 0;
34
0
    if (!GetVarint32(src, &tag_value)) {
35
0
      return Status::Corruption(class_name, "Error decoding tag");
36
0
    }
37
0
    WalAdditionTag tag = static_cast<WalAdditionTag>(tag_value);
38
0
    switch (tag) {
39
0
      case WalAdditionTag::kSyncedSize: {
40
0
        uint64_t size = 0;
41
0
        if (!GetVarint64(src, &size)) {
42
0
          return Status::Corruption(class_name, "Error decoding WAL file size");
43
0
        }
44
0
        metadata_.SetSyncedSizeInBytes(size);
45
0
        break;
46
0
      }
47
      // TODO: process future tags such as checksum.
48
0
      case WalAdditionTag::kTerminate:
49
0
        return Status::OK();
50
0
      default: {
51
0
        std::stringstream ss;
52
0
        ss << "Unknown tag " << tag_value;
53
0
        return Status::Corruption(class_name, ss.str());
54
0
      }
55
0
    }
56
0
  }
57
0
}
58
59
0
JSONWriter& operator<<(JSONWriter& jw, const WalAddition& wal) {
60
0
  jw << "LogNumber" << wal.GetLogNumber() << "SyncedSizeInBytes"
61
0
     << wal.GetMetadata().GetSyncedSizeInBytes();
62
0
  return jw;
63
0
}
64
65
0
std::ostream& operator<<(std::ostream& os, const WalAddition& wal) {
66
0
  os << "log_number: " << wal.GetLogNumber()
67
0
     << " synced_size_in_bytes: " << wal.GetMetadata().GetSyncedSizeInBytes();
68
0
  return os;
69
0
}
70
71
0
std::string WalAddition::DebugString() const {
72
0
  std::ostringstream oss;
73
0
  oss << *this;
74
0
  return oss.str();
75
0
}
76
77
14.5k
void WalDeletion::EncodeTo(std::string* dst) const {
78
14.5k
  PutVarint64(dst, number_);
79
14.5k
}
80
81
12.4k
Status WalDeletion::DecodeFrom(Slice* src) {
82
12.4k
  constexpr char class_name[] = "WalDeletion";
83
84
12.4k
  if (!GetVarint64(src, &number_)) {
85
0
    return Status::Corruption(class_name, "Error decoding WAL log number");
86
0
  }
87
88
12.4k
  return Status::OK();
89
12.4k
}
90
91
0
JSONWriter& operator<<(JSONWriter& jw, const WalDeletion& wal) {
92
0
  jw << "LogNumber" << wal.GetLogNumber();
93
0
  return jw;
94
0
}
95
96
0
std::ostream& operator<<(std::ostream& os, const WalDeletion& wal) {
97
0
  os << "log_number: " << wal.GetLogNumber();
98
0
  return os;
99
0
}
100
101
0
std::string WalDeletion::DebugString() const {
102
0
  std::ostringstream oss;
103
0
  oss << *this;
104
0
  return oss.str();
105
0
}
106
107
0
Status WalSet::AddWal(const WalAddition& wal) {
108
0
  if (wal.GetLogNumber() < min_wal_number_to_keep_) {
109
    // The WAL has been obsolete, ignore it.
110
0
    return Status::OK();
111
0
  }
112
113
0
  auto it = wals_.lower_bound(wal.GetLogNumber());
114
0
  bool existing = it != wals_.end() && it->first == wal.GetLogNumber();
115
116
0
  if (!existing) {
117
0
    wals_.insert(it, {wal.GetLogNumber(), wal.GetMetadata()});
118
0
    return Status::OK();
119
0
  }
120
121
0
  assert(existing);
122
0
  if (!wal.GetMetadata().HasSyncedSize()) {
123
0
    std::stringstream ss;
124
0
    ss << "WAL " << wal.GetLogNumber() << " is created more than once";
125
0
    return Status::Corruption("WalSet::AddWal", ss.str());
126
0
  }
127
128
0
  assert(wal.GetMetadata().HasSyncedSize());
129
0
  if (it->second.HasSyncedSize() && wal.GetMetadata().GetSyncedSizeInBytes() <=
130
0
                                        it->second.GetSyncedSizeInBytes()) {
131
    // This is possible because version edits with different synced WAL sizes
132
    // for the same WAL can be committed out-of-order. For example, thread
133
    // 1 synces the first 10 bytes of 1.log, while thread 2 synces the first 20
134
    // bytes of 1.log. It's possible that thread 1 calls LogAndApply() after
135
    // thread 2.
136
    // In this case, just return ok.
137
0
    return Status::OK();
138
0
  }
139
140
  // Update synced size for the given WAL.
141
0
  it->second.SetSyncedSizeInBytes(wal.GetMetadata().GetSyncedSizeInBytes());
142
0
  return Status::OK();
143
0
}
144
145
0
Status WalSet::AddWals(const WalAdditions& wals) {
146
0
  Status s;
147
0
  for (const WalAddition& wal : wals) {
148
0
    s = AddWal(wal);
149
0
    if (!s.ok()) {
150
0
      break;
151
0
    }
152
0
  }
153
0
  return s;
154
0
}
155
156
12.4k
Status WalSet::DeleteWalsBefore(WalNumber wal) {
157
12.4k
  if (wal > min_wal_number_to_keep_) {
158
12.4k
    min_wal_number_to_keep_ = wal;
159
12.4k
    wals_.erase(wals_.begin(), wals_.lower_bound(wal));
160
12.4k
  }
161
12.4k
  return Status::OK();
162
12.4k
}
163
164
0
void WalSet::Reset() {
165
0
  wals_.clear();
166
0
  min_wal_number_to_keep_ = 0;
167
0
}
168
169
Status WalSet::CheckWals(
170
    Env* env,
171
0
    const std::unordered_map<WalNumber, std::string>& logs_on_disk) const {
172
0
  assert(env != nullptr);
173
174
0
  Status s;
175
0
  for (const auto& wal : wals_) {
176
0
    const uint64_t log_number = wal.first;
177
0
    const WalMetadata& wal_meta = wal.second;
178
179
0
    if (!wal_meta.HasSyncedSize()) {
180
      // The WAL and WAL directory is not even synced,
181
      // so the WAL's inode may not be persisted,
182
      // then the WAL might not show up when listing WAL directory.
183
0
      continue;
184
0
    }
185
186
0
    if (logs_on_disk.find(log_number) == logs_on_disk.end()) {
187
0
      std::stringstream ss;
188
0
      ss << "Missing WAL with log number: " << log_number << ".";
189
0
      s = Status::Corruption(ss.str());
190
0
      break;
191
0
    }
192
193
0
    uint64_t log_file_size = 0;
194
0
    s = env->GetFileSize(logs_on_disk.at(log_number), &log_file_size);
195
0
    if (!s.ok()) {
196
0
      break;
197
0
    }
198
0
    if (log_file_size < wal_meta.GetSyncedSizeInBytes()) {
199
0
      std::stringstream ss;
200
0
      ss << "Size mismatch: WAL (log number: " << log_number
201
0
         << ") in MANIFEST is " << wal_meta.GetSyncedSizeInBytes()
202
0
         << " bytes , but actually is " << log_file_size << " bytes on disk.";
203
0
      s = Status::Corruption(ss.str());
204
0
      break;
205
0
    }
206
0
  }
207
208
0
  return s;
209
0
}
210
211
}  // namespace ROCKSDB_NAMESPACE