Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/transaction_log_impl.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
7
#include "db/transaction_log_impl.h"
8
9
#include <cinttypes>
10
11
#include "db/write_batch_internal.h"
12
#include "file/sequence_file_reader.h"
13
#include "util/defer.h"
14
15
namespace ROCKSDB_NAMESPACE {
16
17
TransactionLogIteratorImpl::TransactionLogIteratorImpl(
18
    const std::string& dir, const ImmutableDBOptions* options,
19
    const TransactionLogIterator::ReadOptions& read_options,
20
    const EnvOptions& soptions, const SequenceNumber seq,
21
    std::unique_ptr<VectorWalPtr> files, VersionSet const* const versions,
22
    const bool seq_per_batch, const std::shared_ptr<IOTracer>& io_tracer)
23
    : dir_(dir),
24
      options_(options),
25
      read_options_(read_options),
26
      soptions_(soptions),
27
      starting_sequence_number_(seq),
28
      files_(std::move(files)),
29
      versions_(versions),
30
      seq_per_batch_(seq_per_batch),
31
      io_tracer_(io_tracer),
32
      started_(false),
33
      is_valid_(false),
34
      current_file_index_(0),
35
      current_batch_seq_(0),
36
0
      current_last_seq_(0) {
37
0
  assert(files_ != nullptr);
38
0
  assert(versions_ != nullptr);
39
0
  assert(!seq_per_batch_);
40
0
  current_status_.PermitUncheckedError();  // Clear on start
41
0
  reporter_.env = options_->env;
42
0
  reporter_.info_log = options_->info_log.get();
43
0
  SeekToStartSequence();  // Seek till starting sequence
44
0
}
45
46
Status TransactionLogIteratorImpl::OpenLogFile(
47
    const WalFile* log_file,
48
0
    std::unique_ptr<SequentialFileReader>* file_reader) {
49
0
  FileSystemPtr fs(options_->fs, io_tracer_);
50
0
  std::unique_ptr<FSSequentialFile> file;
51
0
  std::string fname;
52
0
  Status s;
53
0
  EnvOptions optimized_env_options = fs->OptimizeForLogRead(soptions_);
54
0
  if (log_file->Type() == kArchivedLogFile) {
55
0
    fname = ArchivedLogFileName(dir_, log_file->LogNumber());
56
0
    s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
57
0
  } else {
58
0
    fname = LogFileName(dir_, log_file->LogNumber());
59
0
    s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
60
0
    if (!s.ok()) {
61
      //  If cannot open file in DB directory.
62
      //  Try the archive dir, as it could have moved in the meanwhile.
63
0
      fname = ArchivedLogFileName(dir_, log_file->LogNumber());
64
0
      s = fs->NewSequentialFile(fname, optimized_env_options, &file, nullptr);
65
0
    }
66
0
  }
67
0
  if (s.ok()) {
68
0
    file_reader->reset(new SequentialFileReader(std::move(file), fname,
69
0
                                                io_tracer_, options_->listeners,
70
0
                                                options_->rate_limiter.get()));
71
0
  }
72
0
  return s;
73
0
}
74
75
0
BatchResult TransactionLogIteratorImpl::GetBatch() {
76
0
  assert(is_valid_);  //  cannot call in a non valid state.
77
0
  BatchResult result;
78
0
  result.sequence = current_batch_seq_;
79
0
  result.writeBatchPtr = std::move(current_batch_);
80
0
  return result;
81
0
}
82
83
0
Status TransactionLogIteratorImpl::status() { return current_status_; }
84
85
0
bool TransactionLogIteratorImpl::Valid() { return started_ && is_valid_; }
86
87
0
bool TransactionLogIteratorImpl::RestrictedRead(Slice* record) {
88
  // Don't read if no more complete entries to read from logs
89
0
  if (current_last_seq_ >= versions_->LastSequence()) {
90
0
    return false;
91
0
  }
92
0
  return current_log_reader_->ReadRecord(record, &scratch_);
93
0
}
94
95
void TransactionLogIteratorImpl::SeekToStartSequence(uint64_t start_file_index,
96
0
                                                     bool strict) {
97
0
  Slice record;
98
0
  started_ = false;
99
0
  is_valid_ = false;
100
  // Check invariant of TransactionLogIterator when SeekToStartSequence()
101
  // succeeds.
102
0
  const Defer defer([this]() {
103
0
    if (is_valid_) {
104
0
      assert(current_status_.ok());
105
0
      if (starting_sequence_number_ > current_batch_seq_) {
106
0
        assert(current_batch_seq_ < current_last_seq_);
107
0
        assert(current_last_seq_ >= starting_sequence_number_);
108
0
      }
109
0
    }
110
0
  });
111
0
  if (files_->size() <= start_file_index) {
112
0
    return;
113
0
  } else if (!current_status_.ok()) {
114
0
    return;
115
0
  }
116
0
  Status s =
117
0
      OpenLogReader(files_->at(static_cast<size_t>(start_file_index)).get());
118
0
  if (!s.ok()) {
119
0
    current_status_ = s;
120
0
    reporter_.Info(current_status_.ToString().c_str());
121
0
    return;
122
0
  }
123
0
  while (RestrictedRead(&record)) {
124
0
    if (record.size() < WriteBatchInternal::kHeader) {
125
0
      reporter_.Corruption(record.size(),
126
0
                           Status::Corruption("very small log record"));
127
0
      continue;
128
0
    }
129
0
    UpdateCurrentWriteBatch(record);
130
0
    if (current_last_seq_ >= starting_sequence_number_) {
131
0
      if (strict && current_batch_seq_ != starting_sequence_number_) {
132
0
        current_status_ = Status::Corruption(
133
0
            "Gap in sequence number. Could not "
134
0
            "seek to required sequence number");
135
0
        reporter_.Info(current_status_.ToString().c_str());
136
0
        return;
137
0
      } else if (strict) {
138
0
        reporter_.Info(
139
0
            "Could seek required sequence number. Iterator will "
140
0
            "continue.");
141
0
      }
142
0
      is_valid_ = true;
143
0
      started_ = true;  // set started_ as we could seek till starting sequence
144
0
      return;
145
0
    } else {
146
0
      is_valid_ = false;
147
0
    }
148
0
  }
149
150
  // Could not find start sequence in first file. Normally this must be the
151
  // only file. Otherwise log the error and let the iterator return next entry
152
  // If strict is set, we want to seek exactly till the start sequence and it
153
  // should have been present in the file we scanned above
154
0
  if (strict) {
155
0
    current_status_ = Status::Corruption(
156
0
        "Gap in sequence number. Could not "
157
0
        "seek to required sequence number");
158
0
    reporter_.Info(current_status_.ToString().c_str());
159
0
  } else if (files_->size() != 1) {
160
0
    current_status_ = Status::Corruption(
161
0
        "Start sequence was not found, "
162
0
        "skipping to the next available");
163
0
    reporter_.Info(current_status_.ToString().c_str());
164
    // Let NextImpl find the next available entry. started_ remains false
165
    // because we don't want to check for gaps while moving to start sequence
166
0
    NextImpl(true);
167
0
  }
168
0
}
169
170
0
void TransactionLogIteratorImpl::Next() {
171
0
  if (!current_status_.ok()) {
172
0
    return;
173
0
  }
174
0
  return NextImpl(false);
175
0
}
176
177
0
void TransactionLogIteratorImpl::NextImpl(bool internal) {
178
0
  Slice record;
179
0
  is_valid_ = false;
180
0
  if (!internal && !started_) {
181
    // Runs every time until we can seek to the start sequence
182
0
    SeekToStartSequence();
183
0
  }
184
0
  while (true) {
185
0
    assert(current_log_reader_);
186
0
    if (current_log_reader_->IsEOF()) {
187
0
      current_log_reader_->UnmarkEOF();
188
0
    }
189
0
    while (RestrictedRead(&record)) {
190
0
      if (record.size() < WriteBatchInternal::kHeader) {
191
0
        reporter_.Corruption(record.size(),
192
0
                             Status::Corruption("very small log record"));
193
0
        continue;
194
0
      } else {
195
        // started_ should be true if called by application
196
0
        assert(internal || started_);
197
        // started_ should be false if called internally
198
0
        assert(!internal || !started_);
199
0
        UpdateCurrentWriteBatch(record);
200
0
        if (internal && !started_) {
201
0
          started_ = true;
202
0
        }
203
0
        return;
204
0
      }
205
0
    }
206
207
    // Open the next file
208
0
    if (current_file_index_ < files_->size() - 1) {
209
0
      ++current_file_index_;
210
0
      Status s = OpenLogReader(files_->at(current_file_index_).get());
211
0
      if (!s.ok()) {
212
0
        is_valid_ = false;
213
0
        current_status_ = s;
214
0
        return;
215
0
      }
216
0
    } else {
217
0
      is_valid_ = false;
218
0
      if (current_last_seq_ == versions_->LastSequence()) {
219
0
        current_status_ = Status::OK();
220
0
      } else {
221
0
        const char* msg = "Create a new iterator to fetch the new tail.";
222
0
        current_status_ = Status::TryAgain(msg);
223
0
      }
224
0
      return;
225
0
    }
226
0
  }
227
0
}
228
229
bool TransactionLogIteratorImpl::IsBatchExpected(
230
0
    const WriteBatch* batch, const SequenceNumber expected_seq) {
231
0
  assert(batch);
232
0
  SequenceNumber batchSeq = WriteBatchInternal::Sequence(batch);
233
0
  if (batchSeq != expected_seq) {
234
0
    char buf[200];
235
0
    snprintf(buf, sizeof(buf),
236
0
             "Discontinuity in log records. Got seq=%" PRIu64
237
0
             ", Expected seq=%" PRIu64 ", Last flushed seq=%" PRIu64
238
0
             ".Log iterator will reseek the correct batch.",
239
0
             batchSeq, expected_seq, versions_->LastSequence());
240
0
    reporter_.Info(buf);
241
0
    return false;
242
0
  }
243
0
  return true;
244
0
}
245
246
0
void TransactionLogIteratorImpl::UpdateCurrentWriteBatch(const Slice& record) {
247
0
  std::unique_ptr<WriteBatch> batch(new WriteBatch());
248
0
  Status s = WriteBatchInternal::SetContents(batch.get(), record);
249
0
  s.PermitUncheckedError();  // TODO: What should we do with this error?
250
251
0
  SequenceNumber expected_seq = current_last_seq_ + 1;
252
  // If the iterator has started, then confirm that we get continuous batches
253
0
  if (started_ && !IsBatchExpected(batch.get(), expected_seq)) {
254
    // Seek to the batch having expected sequence number
255
0
    if (expected_seq < files_->at(current_file_index_)->StartSequence()) {
256
      // Expected batch must lie in the previous log file
257
      // Avoid underflow.
258
0
      if (current_file_index_ != 0) {
259
0
        current_file_index_--;
260
0
      }
261
0
    }
262
0
    starting_sequence_number_ = expected_seq;
263
    // currentStatus_ will be set to Ok if reseek succeeds
264
    // Note: this is still ok in seq_pre_batch_ && two_write_queuesp_ mode
265
    // that allows gaps in the WAL since it will still skip over the gap.
266
0
    current_status_ = Status::NotFound("Gap in sequence numbers");
267
    // In seq_per_batch_ mode, gaps in the seq are possible so the strict mode
268
    // should be disabled
269
0
    return SeekToStartSequence(current_file_index_, !seq_per_batch_);
270
0
  }
271
272
0
  current_batch_seq_ = WriteBatchInternal::Sequence(batch.get());
273
0
  assert(!seq_per_batch_);
274
0
  current_last_seq_ =
275
0
      current_batch_seq_ + WriteBatchInternal::Count(batch.get()) - 1;
276
  // currentBatchSeq_ can only change here
277
0
  assert(current_last_seq_ <= versions_->LastSequence());
278
279
0
  current_batch_ = std::move(batch);
280
0
  is_valid_ = true;
281
0
  current_status_ = Status::OK();
282
0
}
283
284
0
Status TransactionLogIteratorImpl::OpenLogReader(const WalFile* log_file) {
285
0
  std::unique_ptr<SequentialFileReader> file;
286
0
  Status s = OpenLogFile(log_file, &file);
287
0
  if (!s.ok()) {
288
0
    return s;
289
0
  }
290
0
  assert(file);
291
0
  current_log_reader_.reset(
292
0
      new log::Reader(options_->info_log, std::move(file), &reporter_,
293
0
                      read_options_.verify_checksums_, log_file->LogNumber()));
294
0
  return Status::OK();
295
0
}
296
}  // namespace ROCKSDB_NAMESPACE