Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/log_reader.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
#include <stdint.h>
12
13
#include <cstdint>
14
#include <memory>
15
#include <unordered_map>
16
#include <vector>
17
18
#include "db/log_format.h"
19
#include "file/sequence_file_reader.h"
20
#include "rocksdb/options.h"
21
#include "rocksdb/slice.h"
22
#include "rocksdb/status.h"
23
#include "util/compression.h"
24
#include "util/hash_containers.h"
25
#include "util/udt_util.h"
26
#include "util/xxhash.h"
27
28
namespace ROCKSDB_NAMESPACE {
29
class Logger;
30
31
namespace log {
32
33
/**
34
 * Reader is a general purpose log stream reader implementation. The actual job
35
 * of reading from the device is implemented by the SequentialFile interface.
36
 *
37
 * Please see Writer for details on the file and record layout.
38
 */
39
class Reader {
40
 public:
41
  // Interface for reporting errors.
42
  class Reporter {
43
   public:
44
    virtual ~Reporter();
45
46
    // Some corruption was detected.  "size" is the approximate number
47
    // of bytes dropped due to the corruption.
48
    virtual void Corruption(size_t bytes, const Status& status,
49
                            uint64_t log_number = kMaxSequenceNumber) = 0;
50
51
0
    virtual void OldLogRecord(size_t /*bytes*/) {}
52
  };
53
54
  // Create a reader that will return log records from "*file".
55
  // "*file" must remain live while this Reader is in use.
56
  //
57
  // If "reporter" is non-nullptr, it is notified whenever some data is
58
  // dropped due to a detected corruption.  "*reporter" must remain
59
  // live while this Reader is in use.
60
  //
61
  // If "checksum" is true, verify checksums if available.
62
  // TODO(hx235): separate WAL related parameters from general `Reader`
63
  // parameters
64
  Reader(std::shared_ptr<Logger> info_log,
65
         std::unique_ptr<SequentialFileReader>&& file, Reporter* reporter,
66
         bool checksum, uint64_t log_num, bool track_and_verify_wals = false,
67
         bool stop_replay_for_corruption = false,
68
         uint64_t min_wal_number_to_keep = std::numeric_limits<uint64_t>::max(),
69
         const PredecessorWALInfo& observed_predecessor_wal_info =
70
             PredecessorWALInfo());
71
  // No copying allowed
72
  Reader(const Reader&) = delete;
73
  void operator=(const Reader&) = delete;
74
75
  virtual ~Reader();
76
77
  // Read the next record into *record.  Returns true if read
78
  // successfully, false if we hit end of the input.  May use
79
  // "*scratch" as temporary storage. The contents filled in *record
80
  // will only be valid until the next mutating operation on this
81
  // reader or the next mutation to *scratch.
82
  // If record_checksum is not nullptr, then this function will calculate the
83
  // checksum of the record read and set record_checksum to it. The checksum is
84
  // calculated from the original buffers that contain the contents of the
85
  // record.
86
  virtual bool ReadRecord(Slice* record, std::string* scratch,
87
                          WALRecoveryMode wal_recovery_mode =
88
                              WALRecoveryMode::kTolerateCorruptedTailRecords,
89
                          uint64_t* record_checksum = nullptr);
90
91
  // Return the recorded user-defined timestamp size that have been read so
92
  // far. This only applies to WAL logs.
93
15.8k
  const UnorderedMap<uint32_t, size_t>& GetRecordedTimestampSize() const {
94
15.8k
    return recorded_cf_to_ts_sz_;
95
15.8k
  }
96
97
  // Returns the physical offset of the last record returned by ReadRecord.
98
  //
99
  // Undefined before the first call to ReadRecord.
100
  uint64_t LastRecordOffset();
101
102
  // Returns the first physical offset after the last record returned by
103
  // ReadRecord, or zero before first call to ReadRecord. This can also be
104
  // thought of as the "current" position in processing the file bytes.
105
  uint64_t LastRecordEnd();
106
107
  // returns true if the reader has encountered an eof condition.
108
0
  bool IsEOF() { return eof_; }
109
110
  // returns true if the reader has encountered read error.
111
0
  bool hasReadError() const { return read_error_; }
112
113
  // when we know more data has been written to the file. we can use this
114
  // function to force the reader to look again in the file.
115
  // Also aligns the file position indicator to the start of the next block
116
  // by reading the rest of the data from the EOF position to the end of the
117
  // block that was partially read.
118
  virtual void UnmarkEOF();
119
120
0
  SequentialFileReader* file() { return file_.get(); }
121
122
0
  Reporter* GetReporter() const { return reporter_; }
123
124
0
  uint64_t GetLogNumber() const { return log_number_; }
125
126
39.4k
  size_t GetReadOffset() const {
127
39.4k
    return static_cast<size_t>(end_of_buffer_offset_);
128
39.4k
  }
129
130
0
  bool IsCompressedAndEmptyFile() {
131
0
    return !first_record_read_ && compression_type_record_read_;
132
0
  }
133
134
 protected:
135
  std::shared_ptr<Logger> info_log_;
136
  const std::unique_ptr<SequentialFileReader> file_;
137
  Reporter* const reporter_;
138
  bool const checksum_;
139
  char* const backing_store_;
140
141
  // Internal state variables used for reading records
142
  Slice buffer_;
143
  bool eof_;         // Last Read() indicated EOF by returning < kBlockSize
144
  bool read_error_;  // Error occurred while reading from file
145
146
  // Offset of the file position indicator within the last block when an
147
  // EOF was detected.
148
  size_t eof_offset_;
149
150
  // Offset of the last record returned by ReadRecord.
151
  uint64_t last_record_offset_;
152
  // Offset of the first location past the end of buffer_.
153
  uint64_t end_of_buffer_offset_;
154
155
  // which log number this is
156
  uint64_t const log_number_;
157
158
  // See `Options::track_and_verify_wals`
159
  bool track_and_verify_wals_;
160
  // Below variables are used for WAL verification
161
  // TODO(hx235): To revise `stop_replay_for_corruption_` inside `LogReader`
162
  // since we have `observed_predecessor_wal_info_` to verify against the
163
  // `recorded_predecessor_wal_info_` recorded in current WAL. If there is no
164
  // WAL hole, we can revise `stop_replay_for_corruption_` to be false.
165
  bool stop_replay_for_corruption_;
166
  uint64_t min_wal_number_to_keep_;
167
  PredecessorWALInfo observed_predecessor_wal_info_;
168
169
  // Whether this is a recycled log file
170
  bool recycled_;
171
172
  // Whether the first record has been read or not.
173
  bool first_record_read_;
174
  // Type of compression used
175
  CompressionType compression_type_;
176
  // Track whether the compression type record has been read or not.
177
  bool compression_type_record_read_;
178
  StreamingUncompress* uncompress_;
179
  // Reusable uncompressed output buffer
180
  std::unique_ptr<char[]> uncompressed_buffer_;
181
  // Reusable uncompressed record
182
  std::string uncompressed_record_;
183
  // Used for stream hashing fragment content in ReadRecord()
184
  XXH3_state_t* hash_state_;
185
  // Used for stream hashing uncompressed buffer in ReadPhysicalRecord()
186
  XXH3_state_t* uncompress_hash_state_;
187
188
  // The recorded user-defined timestamp sizes that have been read so far. This
189
  // is only for WAL logs.
190
  UnorderedMap<uint32_t, size_t> recorded_cf_to_ts_sz_;
191
192
  // Extend record types with the following special values
193
  enum : uint8_t {
194
    kEof = kMaxRecordType + 1,
195
    // Returned whenever we find an invalid physical record.
196
    // Currently there are three situations in which this happens:
197
    // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
198
    // * The record is a 0-length record (No drop is reported)
199
    kBadRecord = kMaxRecordType + 2,
200
    // Returned when we fail to read a valid header.
201
    kBadHeader = kMaxRecordType + 3,
202
    // Returned when we read an old record from a previous user of the log.
203
    kOldRecord = kMaxRecordType + 4,
204
    // Returned when we get a bad record length
205
    kBadRecordLen = kMaxRecordType + 5,
206
    // Returned when we get a bad record checksum
207
    kBadRecordChecksum = kMaxRecordType + 6,
208
  };
209
210
  // Return type, or one of the preceding special values
211
  // If WAL compression is enabled, fragment_checksum is the checksum of the
212
  // fragment computed from the original buffer containing uncompressed
213
  // fragment.
214
  uint8_t ReadPhysicalRecord(Slice* result, size_t* drop_size,
215
                             uint64_t* fragment_checksum = nullptr);
216
217
  // Read some more
218
  bool ReadMore(size_t* drop_size, uint8_t* error);
219
220
  void UnmarkEOFInternal();
221
222
  // Reports dropped bytes to the reporter.
223
  // buffer_ must be updated to remove the dropped bytes prior to invocation.
224
  void ReportCorruption(size_t bytes, const char* reason,
225
                        uint64_t log_number = kMaxSequenceNumber);
226
  void ReportDrop(size_t bytes, const Status& reason,
227
                  uint64_t log_number = kMaxSequenceNumber);
228
  void ReportOldLogRecord(size_t bytes);
229
230
  void InitCompression(const CompressionTypeRecord& compression_record);
231
232
  Status UpdateRecordedTimestampSize(
233
      const std::vector<std::pair<uint32_t, size_t>>& cf_to_ts_sz);
234
235
  void MaybeVerifyPredecessorWALInfo(
236
      WALRecoveryMode wal_recovery_mode, Slice fragment,
237
      const PredecessorWALInfo& recorded_predecessor_wal_info);
238
};
239
240
class FragmentBufferedReader : public Reader {
241
 public:
242
  FragmentBufferedReader(std::shared_ptr<Logger> info_log,
243
                         std::unique_ptr<SequentialFileReader>&& _file,
244
                         Reporter* reporter, bool checksum, uint64_t log_num)
245
0
      : Reader(info_log, std::move(_file), reporter, checksum, log_num,
246
0
               false /*verify_and_track_wals*/,
247
0
               false /*stop_replay_for_corruption*/,
248
0
               std::numeric_limits<uint64_t>::max() /*min_wal_number_to_keep*/,
249
0
               PredecessorWALInfo() /*observed_predecessor_wal_info*/),
250
0
        fragments_(),
251
0
        in_fragmented_record_(false) {}
252
0
  ~FragmentBufferedReader() override {}
253
  bool ReadRecord(Slice* record, std::string* scratch,
254
                  WALRecoveryMode wal_recovery_mode =
255
                      WALRecoveryMode::kTolerateCorruptedTailRecords,
256
                  uint64_t* record_checksum = nullptr) override;
257
  void UnmarkEOF() override;
258
259
 private:
260
  std::string fragments_;
261
  bool in_fragmented_record_;
262
263
  bool TryReadFragment(Slice* result, size_t* drop_size,
264
                       uint8_t* fragment_type_or_err);
265
266
  bool TryReadMore(size_t* drop_size, uint8_t* error);
267
268
  // No copy allowed
269
  FragmentBufferedReader(const FragmentBufferedReader&);
270
  void operator=(const FragmentBufferedReader&);
271
};
272
273
}  // namespace log
274
}  // namespace ROCKSDB_NAMESPACE