Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/transaction_log_impl.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/transaction_log_impl.h"
7
8
#include <cinttypes>
9
10
#include "db/write_batch_internal.h"
11
#include "file/sequence_file_reader.h"
12
#include "util/defer.h"
13
14
namespace ROCKSDB_NAMESPACE {
15
16
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
17
    const std::string& dir, const ImmutableDBOptions* options,
18
    const TransactionLogIterator::ReadOptions& read_options,
19
    const EnvOptions& soptions, const SequenceNumber seq,
20
    std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
21
    const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
22
0
    : dir_(dir),
23
0
      options_(options),
24
0
      read_options_(read_options),
25
0
      soptions_(soptions),
26
0
      starting_sequence_number_(seq),
27
0
      files_(std::move(files)),
28
0
      versions_(versions),
29
0
      seq_per_batch_(seq_per_batch),
30
0
      io_tracer_(io_tracer),
31
0
      started_(false),
32
0
      is_valid_(false),
33
0
      current_file_index_(0),
34
0
      current_batch_seq_(0),
35
0
      current_last_seq_(0) {
36
0
  assert(files_ != nullptr);
37
0
  assert(versions_ != nullptr);
38
0
  assert(!seq_per_batch_);
39
0
  current_status_.PermitUncheckedError();  // Clear on start
40
0
  reporter_.env = options_->env;
41
0
  reporter_.info_log = options_->info_log.get();
42
0
  SeekToStartSequence();  // Seek till starting sequence
43
0
}
44
45
Status TransactionLogIteratorImpl::OpenLogFile(
46
    const WalFile* log_file,
47
0
    std::unique_ptr<SequentialFileReader>* file_reader) {
48
0
  FileSystemPtr fs(options_->fs, io_tracer_);
49
0
  std::unique_ptr<FSSequentialFile> file;
50
0
  std::string fname;
51
0
  Status s;
52
0
  EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
53
0
  if (log_file->Type() == kArchivedLogFile) {
54
0
    fname = ArchivedLogFileName(dir_, log_file->LogNumber());
55
0
    s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
56
0
  } else {
57
0
    fname = LogFileName(dir_, log_file->LogNumber());
58
0
    s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
59
0
    if (!s.ok()) {
60
      //  If cannot open file in DB directory.
61
      //  Try the archive dir, as it could have moved in the meanwhile.
62
0
      fname = ArchivedLogFileName(dir_, log_file->LogNumber());
63
0
      s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
64
0
    }
65
0
  }
66
0
  if (s.ok()) {
67
0
    file_reader->reset(new SequentialFileReader(std::move(file), fname,
68
0
                                                io_tracer_, options_->listeners,
69
0
                                                options_->rate_limiter.get()));
70
0
  }
71
0
  return s;
72
0
}
73
74
0
BatchResult TransactionLogIteratorImpl::GetBatch() {
75
0
  assert(is_valid_);  //  cannot call in a non valid state.
76
0
  BatchResult result;
77
0
  result.sequence = current_batch_seq_;
78
0
  result.writeBatchPtr = std::move(current_batch_);
79
0
  return result;
80
0
}
81
82
0
Status TransactionLogIteratorImpl::status() { return current_status_; }
83
84
0
bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
85
86
0
bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
87
  // Don't read if no more complete entries to read from logs
88
0
  if (current_last_seq_ >= versions_->LastSequence()) {
89
0
    return false;
90
0
  }
91
0
  return current_log_reader_->ReadRecord(record, &scratch_);
92
0
}
93
94
void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
95
0
                                                     bool strict) {
96
0
  Slice record;
97
0
  started_ = false;
98
0
  is_valid_ = false;
99
  // Check invariant of TransactionLogIterator when SeekToStartSequence()
100
  // succeeds.
101
0
  const Defer defer([this]() {
102
0
    if (is_valid_) {
103
0
      assert(current_status_.ok());
104
0
      if (starting_sequence_number_ > current_batch_seq_) {
105
0
        assert(current_batch_seq_ < current_last_seq_);
106
0
        assert(current_last_seq_ >= starting_sequence_number_);
107
0
      }
108
0
    }
109
0
  });
110
0
  if (files_->size() <= start_file_index) {
111
0
    return;
112
0
  } else if (!current_status_.ok()) {
113
0
    return;
114
0
  }
115
0
  Status s =
116
0
      OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
117
0
  if (!s.ok()) {
118
0
    current_status_ = s;
119
0
    reporter_.Info(current_status_.ToString().c_str());
120
0
    return;
121
0
  }
122
0
  while (RestrictedRead(&record)) {
123
0
    if (record.size() < WriteBatchInternal::kHeader) {
124
0
      reporter_.Corruption(record.size(),
125
0
                           Status::Corruption("very small log record"));
126
0
      continue;
127
0
    }
128
0
    UpdateCurrentWriteBatch(record);
129
0
    if (current_last_seq_ >= starting_sequence_number_) {
130
0
      if (strict && current_batch_seq_ != starting_sequence_number_) {
131
0
        current_status_ = Status::Corruption(
132
0
            "Gap in sequence number. Could not "
133
0
            "seek to required sequence number");
134
0
        reporter_.Info(current_status_.ToString().c_str());
135
0
        return;
136
0
      } else if (strict) {
137
0
        reporter_.Info(
138
0
            "Could seek required sequence number. Iterator will "
139
0
            "continue.");
140
0
      }
141
0
      is_valid_ = true;
142
0
      started_ = true;  // set started_ as we could seek till starting sequence
143
0
      return;
144
0
    } else {
145
0
      is_valid_ = false;
146
0
    }
147
0
  }
148
149
  // Could not find start sequence in first file. Normally this must be the
150
  // only file. Otherwise log the error and let the iterator return next entry
151
  // If strict is set, we want to seek exactly till the start sequence and it
152
  // should have been present in the file we scanned above
153
0
  if (strict) {
154
0
    current_status_ = Status::Corruption(
155
0
        "Gap in sequence number. Could not "
156
0
        "seek to required sequence number");
157
0
    reporter_.Info(current_status_.ToString().c_str());
158
0
  } else if (files_->size() != 1) {
159
0
    current_status_ = Status::Corruption(
160
0
        "Start sequence was not found, "
161
0
        "skipping to the next available");
162
0
    reporter_.Info(current_status_.ToString().c_str());
163
    // Let NextImpl find the next available entry. started_ remains false
164
    // because we don't want to check for gaps while moving to start sequence
165
0
    NextImpl(true);
166
0
  }
167
0
}
168
169
0
void TransactionLogIteratorImpl::Next() {
170
0
  if (!current_status_.ok()) {
171
0
    return;
172
0
  }
173
0
  return NextImpl(false);
174
0
}
175
176
0
void TransactionLogIteratorImpl::NextImpl(bool internal) {
177
0
  Slice record;
178
0
  is_valid_ = false;
179
0
  if (!internal && !started_) {
180
    // Runs every time until we can seek to the start sequence
181
0
    SeekToStartSequence();
182
0
  }
183
0
  while (true) {
184
0
    assert(current_log_reader_);
185
0
    if (current_log_reader_->IsEOF()) {
186
0
      current_log_reader_->UnmarkEOF();
187
0
    }
188
0
    while (RestrictedRead(&record)) {
189
0
      if (record.size() < WriteBatchInternal::kHeader) {
190
0
        reporter_.Corruption(record.size(),
191
0
                             Status::Corruption("very small log record"));
192
0
        continue;
193
0
      } else {
194
        // started_ should be true if called by application
195
0
        assert(internal || started_);
196
        // started_ should be false if called internally
197
0
        assert(!internal || !started_);
198
0
        UpdateCurrentWriteBatch(record);
199
0
        if (internal && !started_) {
200
0
          started_ = true;
201
0
        }
202
0
        return;
203
0
      }
204
0
    }
205
206
    // Open the next file
207
0
    if (current_file_index_ < files_->size() - 1) {
208
0
      ++current_file_index_;
209
0
      Status s = OpenLogReader(files_->at(current_file_index_).get());
210
0
      if (!s.ok()) {
211
0
        is_valid_ = false;
212
0
        current_status_ = s;
213
0
        return;
214
0
      }
215
0
    } else {
216
0
      is_valid_ = false;
217
0
      if (current_last_seq_ == versions_->LastSequence()) {
218
0
        current_status_ = Status::OK();
219
0
      } else {
220
0
        const char* msg = "Create a new iterator to fetch the new tail.";
221
0
        current_status_ = Status::TryAgain(msg);
222
0
      }
223
0
      return;
224
0
    }
225
0
  }
226
0
}
227
228
bool TransactionLogIteratorImpl::IsBatchExpected(
229
0
    const WriteBatch* batch, const SequenceNumber expected_seq) {
230
0
  assert(batch);
231
0
  SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
232
0
  if (batchSeq != expected_seq) {
233
0
    std::ostringstream oss;
234
0
    oss << "Discontinuity in log records. " << "Got seq=" << batchSeq << ", "
235
0
        << "Expected seq=" << expected_seq << ", "
236
0
        << "Last flushed seq=" << versions_->LastSequence() << ". "
237
0
        << "Log iterator will reseek the correct batch.";
238
239
0
    reporter_.Info(oss.str().c_str());
240
0
    return false;
241
0
  }
242
0
  return true;
243
0
}
244
245
0
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
246
0
  std::unique_ptr<WriteBatch> batch(new WriteBatch());
247
0
  Status s = WriteBatchInternal::SetContents(batch.get(), record);
248
0
  s.PermitUncheckedError();  // TODO: What should we do with this error?
249
250
0
  SequenceNumber expected_seq = current_last_seq_ + 1;
251
  // If the iterator has started, then confirm that we get continuous batches
252
0
  if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
253
    // Seek to the batch having expected sequence number
254
0
    if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
255
      // Expected batch must lie in the previous log file
256
      // Avoid underflow.
257
0
      if (current_file_index_ != 0) {
258
0
        current_file_index_--;
259
0
      }
260
0
    }
261
0
    starting_sequence_number_ = expected_seq;
262
    // currentStatus_ will be set to Ok if reseek succeeds
263
    // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
264
    // that allows gaps in the WAL since it will still skip over the gap.
265
0
    current_status_ = Status::NotFound("Gap in sequence numbers");
266
    // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
267
    // should be disabled
268
0
    return SeekToStartSequence(current_file_index_, !seq_per_batch_);
269
0
  }
270
271
0
  current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
272
0
  assert(!seq_per_batch_);
273
0
  current_last_seq_ =
274
0
      current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
275
  // currentBatchSeq_ can only change here
276
0
  assert(current_last_seq_ <= versions_->LastSequence());
277
278
0
  current_batch_ = std::move(batch);
279
0
  is_valid_ = true;
280
0
  current_status_ = Status::OK();
281
0
}
282
283
0
Status TransactionLogIteratorImpl::OpenLogReader(const WalFile* log_file) {
284
0
  std::unique_ptr<SequentialFileReader> file;
285
0
  Status s = OpenLogFile(log_file, &file);
286
0
  if (!s.ok()) {
287
0
    return s;
288
0
  }
289
0
  assert(file);
290
0
  current_log_reader_.reset(
291
0
      new log::Reader(options_->info_log, std::move(file), &reporter_,
292
0
                      read_options_.verify_checksums_, log_file->LogNumber()));
293
0
  return Status::OK();
294
0
}
295
}  // namespace ROCKSDB_NAMESPACE