/src/rocksdb/db/log_reader.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/log_reader.h" |
11 | | |
12 | | #include <cstdio> |
13 | | |
14 | | #include "file/sequence_file_reader.h" |
15 | | #include "port/lang.h" |
16 | | #include "rocksdb/env.h" |
17 | | #include "test_util/sync_point.h" |
18 | | #include "util/coding.h" |
19 | | #include "util/crc32c.h" |
20 | | |
21 | | namespace ROCKSDB_NAMESPACE::log { |
22 | | |
23 | 106k | Reader::Reporter::~Reporter() = default; |
24 | | |
25 | | Reader::Reader(std::shared_ptr<Logger> info_log, |
26 | | std::unique_ptr<SequentialFileReader>&& _file, |
27 | | Reporter* reporter, bool checksum, uint64_t log_num, |
28 | | bool track_and_verify_wals, bool stop_replay_for_corruption, |
29 | | uint64_t min_wal_number_to_keep, |
30 | | const PredecessorWALInfo& observed_predecessor_wal_info) |
31 | 106k | : info_log_(info_log), |
32 | 106k | file_(std::move(_file)), |
33 | 106k | reporter_(reporter), |
34 | 106k | checksum_(checksum), |
35 | 106k | backing_store_(new char[kBlockSize]), |
36 | 106k | buffer_(), |
37 | 106k | eof_(false), |
38 | 106k | read_error_(false), |
39 | 106k | eof_offset_(0), |
40 | 106k | last_record_offset_(0), |
41 | 106k | end_of_buffer_offset_(0), |
42 | 106k | log_number_(log_num), |
43 | 106k | track_and_verify_wals_(track_and_verify_wals), |
44 | 106k | stop_replay_for_corruption_(stop_replay_for_corruption), |
45 | 106k | min_wal_number_to_keep_(min_wal_number_to_keep), |
46 | 106k | observed_predecessor_wal_info_(observed_predecessor_wal_info), |
47 | 106k | recycled_(false), |
48 | 106k | first_record_read_(false), |
49 | 106k | compression_type_(kNoCompression), |
50 | 106k | compression_type_record_read_(false), |
51 | 106k | uncompress_(nullptr), |
52 | 106k | hash_state_(nullptr), |
53 | 106k | uncompress_hash_state_(nullptr) {} |
54 | | |
55 | 106k | Reader::~Reader() { |
56 | 106k | delete[] backing_store_; |
57 | 106k | if (uncompress_) { |
58 | 0 | delete uncompress_; |
59 | 0 | } |
60 | 106k | if (hash_state_) { |
61 | 48.1k | XXH3_freeState(hash_state_); |
62 | 48.1k | } |
63 | 106k | if (uncompress_hash_state_) { |
64 | 0 | XXH3_freeState(uncompress_hash_state_); |
65 | 0 | } |
66 | 106k | } |
67 | | |
68 | | // For kAbsoluteConsistency, on clean shutdown we don't expect any error |
69 | | // in the log files. For other modes, we can ignore only incomplete records |
70 | | // in the last log file, which are presumably due to a write in progress |
71 | | // during restart (or from log recycling). |
72 | | // |
73 | | // TODO krad: Evaluate if we need to move to a more strict mode where we |
74 | | // restrict the inconsistency to only the last log |
75 | | // TODO (hx235): move `wal_recovery_mode` to be a member data like other |
76 | | // information (e.g, `stop_replay_for_corruption`) to decide whether to |
77 | | // check for and surface corruption in `ReadRecord()` |
78 | | bool Reader::ReadRecord(Slice* record, std::string* scratch, |
79 | | WALRecoveryMode wal_recovery_mode, |
80 | 1.60M | uint64_t* record_checksum) { |
81 | 1.60M | scratch->clear(); |
82 | 1.60M | record->clear(); |
83 | 1.60M | if (record_checksum != nullptr) { |
84 | 1.06M | if (hash_state_ == nullptr) { |
85 | 48.1k | hash_state_ = XXH3_createState(); |
86 | 48.1k | } |
87 | 1.06M | XXH3_64bits_reset(hash_state_); |
88 | 1.06M | } |
89 | 1.60M | if (uncompress_) { |
90 | 0 | uncompress_->Reset(); |
91 | 0 | } |
92 | 1.60M | bool in_fragmented_record = false; |
93 | | // Record offset of the logical record that we're reading |
94 | | // 0 is a dummy value to make compilers happy |
95 | 1.60M | uint64_t prospective_record_offset = 0; |
96 | | |
97 | 1.60M | Slice fragment; |
98 | 1.60M | for (;;) { |
99 | 1.60M | uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); |
100 | 1.60M | size_t drop_size = 0; |
101 | 1.60M | const uint8_t record_type = |
102 | 1.60M | ReadPhysicalRecord(&fragment, &drop_size, record_checksum); |
103 | 1.60M | switch (record_type) { |
104 | 1.48M | case kFullType: |
105 | 1.48M | case kRecyclableFullType: |
106 | 1.48M | if (in_fragmented_record && !scratch->empty()) { |
107 | | // Handle bug in earlier versions of log::Writer where |
108 | | // it could emit an empty kFirstType record at the tail end |
109 | | // of a block followed by a kFullType or kFirstType record |
110 | | // at the beginning of the next block. |
111 | 0 | ReportCorruption(scratch->size(), "partial record without end(1)"); |
112 | 0 | } |
113 | | // No need to compute record_checksum since the record |
114 | | // consists of a single fragment and the checksum is computed |
115 | | // in ReadPhysicalRecord() if WAL compression is enabled |
116 | 1.48M | if (record_checksum != nullptr && uncompress_ == nullptr) { |
117 | | // No need to stream since the record is a single fragment |
118 | 1.01M | *record_checksum = XXH3_64bits(fragment.data(), fragment.size()); |
119 | 1.01M | } |
120 | 1.48M | prospective_record_offset = physical_record_offset; |
121 | 1.48M | scratch->clear(); |
122 | 1.48M | *record = fragment; |
123 | 1.48M | last_record_offset_ = prospective_record_offset; |
124 | 1.48M | first_record_read_ = true; |
125 | 1.48M | return true; |
126 | | |
127 | 4.06k | case kFirstType: |
128 | 4.06k | case kRecyclableFirstType: |
129 | 4.06k | if (in_fragmented_record && !scratch->empty()) { |
130 | | // Handle bug in earlier versions of log::Writer where |
131 | | // it could emit an empty kFirstType record at the tail end |
132 | | // of a block followed by a kFullType or kFirstType record |
133 | | // at the beginning of the next block. |
134 | 0 | ReportCorruption(scratch->size(), "partial record without end(2)"); |
135 | 0 | XXH3_64bits_reset(hash_state_); |
136 | 0 | } |
137 | 4.06k | if (record_checksum != nullptr) { |
138 | 4.05k | XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); |
139 | 4.05k | } |
140 | 4.06k | prospective_record_offset = physical_record_offset; |
141 | 4.06k | scratch->assign(fragment.data(), fragment.size()); |
142 | 4.06k | in_fragmented_record = true; |
143 | 4.06k | break; // switch |
144 | | |
145 | 1.86k | case kMiddleType: |
146 | 1.86k | case kRecyclableMiddleType: |
147 | 1.86k | if (!in_fragmented_record) { |
148 | 0 | ReportCorruption(fragment.size(), |
149 | 0 | "missing start of fragmented record(1)"); |
150 | 1.86k | } else { |
151 | 1.86k | if (record_checksum != nullptr) { |
152 | 1.86k | XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); |
153 | 1.86k | } |
154 | 1.86k | scratch->append(fragment.data(), fragment.size()); |
155 | 1.86k | } |
156 | 1.86k | break; // switch |
157 | | |
158 | 4.06k | case kLastType: |
159 | 4.06k | case kRecyclableLastType: |
160 | 4.06k | if (!in_fragmented_record) { |
161 | 0 | ReportCorruption(fragment.size(), |
162 | 0 | "missing start of fragmented record(2)"); |
163 | 4.06k | } else { |
164 | 4.06k | if (record_checksum != nullptr) { |
165 | 4.05k | XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); |
166 | 4.05k | *record_checksum = XXH3_64bits_digest(hash_state_); |
167 | 4.05k | } |
168 | 4.06k | scratch->append(fragment.data(), fragment.size()); |
169 | 4.06k | *record = Slice(*scratch); |
170 | 4.06k | last_record_offset_ = prospective_record_offset; |
171 | 4.06k | first_record_read_ = true; |
172 | 4.06k | return true; |
173 | 4.06k | } |
174 | 0 | break; // switch |
175 | | |
176 | 0 | case kSetCompressionType: { |
177 | 0 | if (compression_type_record_read_) { |
178 | 0 | ReportCorruption(fragment.size(), |
179 | 0 | "read multiple SetCompressionType records"); |
180 | 0 | } |
181 | 0 | if (first_record_read_) { |
182 | 0 | ReportCorruption(fragment.size(), |
183 | 0 | "SetCompressionType not the first record"); |
184 | 0 | } |
185 | 0 | prospective_record_offset = physical_record_offset; |
186 | 0 | scratch->clear(); |
187 | 0 | last_record_offset_ = prospective_record_offset; |
188 | 0 | CompressionTypeRecord compression_record(kNoCompression); |
189 | 0 | Status s = compression_record.DecodeFrom(&fragment); |
190 | 0 | if (!s.ok()) { |
191 | 0 | ReportCorruption(fragment.size(), |
192 | 0 | "could not decode SetCompressionType record"); |
193 | 0 | } else { |
194 | 0 | InitCompression(compression_record); |
195 | 0 | } |
196 | 0 | break; // switch |
197 | 4.06k | } |
198 | 0 | case kPredecessorWALInfoType: |
199 | 0 | case kRecyclePredecessorWALInfoType: { |
200 | 0 | prospective_record_offset = physical_record_offset; |
201 | 0 | scratch->clear(); |
202 | 0 | last_record_offset_ = prospective_record_offset; |
203 | |
|
204 | 0 | PredecessorWALInfo recorded_predecessor_wal_info; |
205 | 0 | Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment); |
206 | 0 | if (!s.ok()) { |
207 | 0 | ReportCorruption(fragment.size(), |
208 | 0 | "could not decode PredecessorWALInfoType record"); |
209 | 0 | } else { |
210 | 0 | MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, |
211 | 0 | recorded_predecessor_wal_info); |
212 | 0 | } |
213 | 0 | break; // switch |
214 | 0 | } |
215 | 0 | case kUserDefinedTimestampSizeType: |
216 | 0 | case kRecyclableUserDefinedTimestampSizeType: { |
217 | 0 | if (in_fragmented_record && !scratch->empty()) { |
218 | 0 | ReportCorruption( |
219 | 0 | scratch->size(), |
220 | 0 | "user-defined timestamp size record interspersed partial record"); |
221 | 0 | } |
222 | 0 | prospective_record_offset = physical_record_offset; |
223 | 0 | scratch->clear(); |
224 | 0 | last_record_offset_ = prospective_record_offset; |
225 | 0 | UserDefinedTimestampSizeRecord ts_record; |
226 | 0 | Status s = ts_record.DecodeFrom(&fragment); |
227 | 0 | if (!s.ok()) { |
228 | 0 | ReportCorruption( |
229 | 0 | fragment.size(), |
230 | 0 | "could not decode user-defined timestamp size record"); |
231 | 0 | } else { |
232 | 0 | s = UpdateRecordedTimestampSize( |
233 | 0 | ts_record.GetUserDefinedTimestampSize()); |
234 | 0 | if (!s.ok()) { |
235 | 0 | ReportCorruption(fragment.size(), s.getState()); |
236 | 0 | } |
237 | 0 | } |
238 | 0 | break; // switch |
239 | 0 | } |
240 | | |
241 | 0 | case kBadHeader: |
242 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
243 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
244 | | // In clean shutdown we don't expect any error in the log files. |
245 | | // In point-in-time recovery an incomplete record at the end could |
246 | | // produce a hole in the recovered data. Report an error here, which |
247 | | // higher layers can choose to ignore when it's provable there is no |
248 | | // hole. |
249 | 0 | ReportCorruption(drop_size, "truncated header"); |
250 | 0 | } |
251 | 0 | FALLTHROUGH_INTENDED; |
252 | |
|
253 | 106k | case kEof: |
254 | 106k | if (in_fragmented_record) { |
255 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
256 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
257 | | // In clean shutdown we don't expect any error in the log files. |
258 | | // In point-in-time recovery an incomplete record at the end could |
259 | | // produce a hole in the recovered data. Report an error here, which |
260 | | // higher layers can choose to ignore when it's provable there is no |
261 | | // hole. |
262 | 0 | ReportCorruption( |
263 | 0 | scratch->size(), |
264 | 0 | "error reading trailing data due to encountering EOF"); |
265 | 0 | } |
266 | | // This can be caused by the writer dying immediately after |
267 | | // writing a physical record but before completing the next; don't |
268 | | // treat it as a corruption, just ignore the entire logical record. |
269 | 0 | scratch->clear(); |
270 | 0 | } |
271 | 106k | return false; |
272 | | |
273 | 0 | case kOldRecord: |
274 | 0 | if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) { |
275 | | // Treat a record from a previous instance of the log as EOF. |
276 | 0 | if (in_fragmented_record) { |
277 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
278 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
279 | | // In clean shutdown we don't expect any error in the log files. |
280 | | // In point-in-time recovery an incomplete record at the end could |
281 | | // produce a hole in the recovered data. Report an error here, |
282 | | // which higher layers can choose to ignore when it's provable |
283 | | // there is no hole. |
284 | 0 | ReportCorruption( |
285 | 0 | scratch->size(), |
286 | 0 | "error reading trailing data due to encountering old record"); |
287 | 0 | } |
288 | | // This can be caused by the writer dying immediately after |
289 | | // writing a physical record but before completing the next; don't |
290 | | // treat it as a corruption, just ignore the entire logical record. |
291 | 0 | scratch->clear(); |
292 | 0 | } else { |
293 | 0 | if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
294 | 0 | ReportOldLogRecord(scratch->size()); |
295 | 0 | } |
296 | 0 | } |
297 | 0 | return false; |
298 | 0 | } |
299 | 0 | FALLTHROUGH_INTENDED; |
300 | |
|
301 | 0 | case kBadRecord: |
302 | 0 | if (in_fragmented_record) { |
303 | 0 | ReportCorruption(scratch->size(), "error in middle of record"); |
304 | 0 | in_fragmented_record = false; |
305 | 0 | scratch->clear(); |
306 | 0 | } |
307 | 0 | break; // switch |
308 | | |
309 | 0 | case kBadRecordLen: |
310 | 0 | if (eof_) { |
311 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
312 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
313 | | // In clean shutdown we don't expect any error in the log files. |
314 | | // In point-in-time recovery an incomplete record at the end could |
315 | | // produce a hole in the recovered data. Report an error here, which |
316 | | // higher layers can choose to ignore when it's provable there is no |
317 | | // hole. |
318 | 0 | ReportCorruption(drop_size, "truncated record body"); |
319 | 0 | } |
320 | 0 | return false; |
321 | 0 | } |
322 | 0 | FALLTHROUGH_INTENDED; |
323 | |
|
324 | 0 | case kBadRecordChecksum: |
325 | 0 | if (recycled_ && wal_recovery_mode == |
326 | 0 | WALRecoveryMode::kTolerateCorruptedTailRecords) { |
327 | 0 | scratch->clear(); |
328 | 0 | return false; |
329 | 0 | } |
330 | 0 | if (record_type == kBadRecordLen) { |
331 | 0 | ReportCorruption(drop_size, "bad record length"); |
332 | 0 | } else { |
333 | 0 | ReportCorruption(drop_size, "checksum mismatch"); |
334 | 0 | } |
335 | 0 | if (in_fragmented_record) { |
336 | 0 | ReportCorruption(scratch->size(), "error in middle of record"); |
337 | 0 | in_fragmented_record = false; |
338 | 0 | scratch->clear(); |
339 | 0 | } |
340 | 0 | break; // switch |
341 | | |
342 | 0 | default: { |
343 | 0 | if ((record_type & kRecordTypeSafeIgnoreMask) == 0) { |
344 | 0 | std::string reason = |
345 | 0 | "unknown record type " + std::to_string(record_type); |
346 | 0 | ReportCorruption( |
347 | 0 | (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), |
348 | 0 | reason.c_str()); |
349 | 0 | } |
350 | 0 | in_fragmented_record = false; |
351 | 0 | scratch->clear(); |
352 | 0 | break; // switch |
353 | 0 | } |
354 | 1.60M | } |
355 | 1.60M | } |
356 | | // unreachable |
357 | 1.60M | } |
358 | | |
359 | | void Reader::MaybeVerifyPredecessorWALInfo( |
360 | | WALRecoveryMode wal_recovery_mode, Slice fragment, |
361 | 0 | const PredecessorWALInfo& recorded_predecessor_wal_info) { |
362 | 0 | if (!track_and_verify_wals_ || |
363 | 0 | wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords || |
364 | 0 | stop_replay_for_corruption_) { |
365 | 0 | return; |
366 | 0 | } |
367 | 0 | assert(recorded_predecessor_wal_info.IsInitialized()); |
368 | 0 | uint64_t recorded_predecessor_log_number = |
369 | 0 | recorded_predecessor_wal_info.GetLogNumber(); |
370 | | |
371 | | // This is the first WAL recovered thus with no predecessor WAL info has been |
372 | | // initialized |
373 | 0 | if (!observed_predecessor_wal_info_.IsInitialized()) { |
374 | 0 | if (recorded_predecessor_log_number >= min_wal_number_to_keep_) { |
375 | 0 | std::string reason = "Missing WAL of log number " + |
376 | 0 | std::to_string(recorded_predecessor_log_number); |
377 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
378 | 0 | recorded_predecessor_log_number); |
379 | 0 | } |
380 | 0 | } else { |
381 | 0 | if (observed_predecessor_wal_info_.GetLogNumber() != |
382 | 0 | recorded_predecessor_log_number) { |
383 | 0 | std::string reason = "Missing WAL of log number " + |
384 | 0 | std::to_string(recorded_predecessor_log_number); |
385 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
386 | 0 | recorded_predecessor_log_number); |
387 | 0 | } else if (observed_predecessor_wal_info_.GetLastSeqnoRecorded() != |
388 | 0 | recorded_predecessor_wal_info.GetLastSeqnoRecorded()) { |
389 | 0 | std::string reason = |
390 | 0 | "Mismatched last sequence number recorded in the WAL of log number " + |
391 | 0 | std::to_string(recorded_predecessor_log_number) + ". Recorded " + |
392 | 0 | std::to_string(recorded_predecessor_wal_info.GetLastSeqnoRecorded()) + |
393 | 0 | ". Observed " + |
394 | 0 | std::to_string( |
395 | 0 | observed_predecessor_wal_info_.GetLastSeqnoRecorded()) + |
396 | 0 | ". (Last sequence number equal to 0 indicates no WAL records)"; |
397 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
398 | 0 | recorded_predecessor_log_number); |
399 | 0 | } else if (observed_predecessor_wal_info_.GetSizeBytes() != |
400 | 0 | recorded_predecessor_wal_info.GetSizeBytes()) { |
401 | 0 | std::string reason = |
402 | 0 | "Mismatched size of the WAL of log number " + |
403 | 0 | std::to_string(recorded_predecessor_log_number) + ". Recorded " + |
404 | 0 | std::to_string(recorded_predecessor_wal_info.GetSizeBytes()) + |
405 | 0 | " bytes. Observed " + |
406 | 0 | std::to_string(observed_predecessor_wal_info_.GetSizeBytes()) + |
407 | 0 | " bytes."; |
408 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
409 | 0 | recorded_predecessor_log_number); |
410 | 0 | } |
411 | 0 | } |
412 | 0 | } |
413 | | |
414 | 0 | uint64_t Reader::LastRecordOffset() { return last_record_offset_; } |
415 | | |
416 | 534k | uint64_t Reader::LastRecordEnd() { |
417 | 534k | return end_of_buffer_offset_ - buffer_.size(); |
418 | 534k | } |
419 | | |
420 | 0 | void Reader::UnmarkEOF() { |
421 | 0 | if (read_error_) { |
422 | 0 | return; |
423 | 0 | } |
424 | 0 | eof_ = false; |
425 | 0 | if (eof_offset_ == 0) { |
426 | 0 | return; |
427 | 0 | } |
428 | 0 | UnmarkEOFInternal(); |
429 | 0 | } |
430 | | |
431 | 0 | void Reader::UnmarkEOFInternal() { |
432 | | // If the EOF was in the middle of a block (a partial block was read) we have |
433 | | // to read the rest of the block as ReadPhysicalRecord can only read full |
434 | | // blocks and expects the file position indicator to be aligned to the start |
435 | | // of a block. |
436 | | // |
437 | | // consumed_bytes + buffer_size() + remaining == kBlockSize |
438 | |
|
439 | 0 | size_t consumed_bytes = eof_offset_ - buffer_.size(); |
440 | 0 | size_t remaining = kBlockSize - eof_offset_; |
441 | | |
442 | | // backing_store_ is used to concatenate what is left in buffer_ and |
443 | | // the remainder of the block. If buffer_ already uses backing_store_, |
444 | | // we just append the new data. |
445 | 0 | if (buffer_.data() != backing_store_ + consumed_bytes) { |
446 | | // Buffer_ does not use backing_store_ for storage. |
447 | | // Copy what is left in buffer_ to backing_store. |
448 | 0 | memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); |
449 | 0 | } |
450 | |
|
451 | 0 | Slice read_buffer; |
452 | | // TODO: rate limit log reader with approriate priority. |
453 | | // TODO: avoid overcharging rate limiter: |
454 | | // Note that the Read here might overcharge SequentialFileReader's internal |
455 | | // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough |
456 | | // content left until EOF to read. |
457 | 0 | Status status = |
458 | 0 | file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_, |
459 | 0 | Env::IO_TOTAL /* rate_limiter_priority */); |
460 | |
|
461 | 0 | size_t added = read_buffer.size(); |
462 | 0 | end_of_buffer_offset_ += added; |
463 | |
|
464 | 0 | if (!status.ok()) { |
465 | 0 | if (added > 0) { |
466 | 0 | ReportDrop(added, status); |
467 | 0 | } |
468 | |
|
469 | 0 | read_error_ = true; |
470 | 0 | return; |
471 | 0 | } |
472 | | |
473 | 0 | if (read_buffer.data() != backing_store_ + eof_offset_) { |
474 | | // Read did not write to backing_store_ |
475 | 0 | memmove(backing_store_ + eof_offset_, read_buffer.data(), |
476 | 0 | read_buffer.size()); |
477 | 0 | } |
478 | |
|
479 | 0 | buffer_ = Slice(backing_store_ + consumed_bytes, |
480 | 0 | eof_offset_ + added - consumed_bytes); |
481 | |
|
482 | 0 | if (added < remaining) { |
483 | 0 | eof_ = true; |
484 | 0 | eof_offset_ += added; |
485 | 0 | } else { |
486 | 0 | eof_offset_ = 0; |
487 | 0 | } |
488 | 0 | } |
489 | | |
490 | | void Reader::ReportCorruption(size_t bytes, const char* reason, |
491 | 0 | uint64_t log_number) { |
492 | 0 | ReportDrop(bytes, Status::Corruption(reason), log_number); |
493 | 0 | } |
494 | | |
495 | | void Reader::ReportDrop(size_t bytes, const Status& reason, |
496 | 0 | uint64_t log_number) { |
497 | 0 | if (reporter_ != nullptr) { |
498 | 0 | reporter_->Corruption(bytes, reason, log_number); |
499 | 0 | } |
500 | 0 | } |
501 | | |
502 | 0 | void Reader::ReportOldLogRecord(size_t bytes) { |
503 | 0 | if (reporter_ != nullptr) { |
504 | 0 | reporter_->OldLogRecord(bytes); |
505 | 0 | } |
506 | 0 | } |
507 | | |
508 | 219k | bool Reader::ReadMore(size_t* drop_size, uint8_t* error) { |
509 | 219k | if (!eof_ && !read_error_) { |
510 | | // Last read was a full read, so this is a trailer to skip |
511 | 112k | buffer_.clear(); |
512 | | // TODO: rate limit log reader with approriate priority. |
513 | | // TODO: avoid overcharging rate limiter: |
514 | | // Note that the Read here might overcharge SequentialFileReader's internal |
515 | | // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough |
516 | | // content left until EOF to read. |
517 | 112k | Status status = file_->Read(kBlockSize, &buffer_, backing_store_, |
518 | 112k | Env::IO_TOTAL /* rate_limiter_priority */); |
519 | 112k | TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status); |
520 | 112k | end_of_buffer_offset_ += buffer_.size(); |
521 | 112k | if (!status.ok()) { |
522 | 0 | buffer_.clear(); |
523 | 0 | ReportDrop(kBlockSize, status); |
524 | 0 | read_error_ = true; |
525 | 0 | *error = kEof; |
526 | 0 | return false; |
527 | 112k | } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) { |
528 | 106k | eof_ = true; |
529 | 106k | eof_offset_ = buffer_.size(); |
530 | 106k | } |
531 | 112k | return true; |
532 | 112k | } else { |
533 | | // Note that if buffer_ is non-empty, we have a truncated header at the |
534 | | // end of the file, which can be caused by the writer crashing in the |
535 | | // middle of writing the header. Unless explicitly requested we don't |
536 | | // considering this an error, just report EOF. |
537 | 106k | if (buffer_.size()) { |
538 | 0 | *drop_size = buffer_.size(); |
539 | 0 | buffer_.clear(); |
540 | 0 | *error = kBadHeader; |
541 | 0 | return false; |
542 | 0 | } |
543 | 106k | buffer_.clear(); |
544 | 106k | *error = kEof; |
545 | 106k | return false; |
546 | 106k | } |
547 | 219k | } |
548 | | |
549 | | uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, |
550 | 1.60M | uint64_t* fragment_checksum) { |
551 | 1.71M | while (true) { |
552 | | // We need at least the minimum header size |
553 | 1.71M | if (buffer_.size() < static_cast<size_t>(kHeaderSize)) { |
554 | | // the default value of r is meaningless because ReadMore will overwrite |
555 | | // it if it returns false; in case it returns true, the return value will |
556 | | // not be used anyway |
557 | 219k | uint8_t r = kEof; |
558 | 219k | if (!ReadMore(drop_size, &r)) { |
559 | 106k | return r; |
560 | 106k | } |
561 | 112k | continue; |
562 | 219k | } |
563 | | |
564 | | // Parse the header |
565 | 1.49M | const char* header = buffer_.data(); |
566 | 1.49M | const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; |
567 | 1.49M | const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; |
568 | 1.49M | const uint8_t type = static_cast<uint8_t>(header[6]); |
569 | 1.49M | const uint32_t length = a | (b << 8); |
570 | 1.49M | int header_size = kHeaderSize; |
571 | 1.49M | const bool is_recyclable_type = |
572 | 1.49M | ((type >= kRecyclableFullType && type <= kRecyclableLastType) || |
573 | 1.49M | type == kRecyclableUserDefinedTimestampSizeType || |
574 | 1.49M | type == kRecyclePredecessorWALInfoType); |
575 | 1.49M | if (is_recyclable_type) { |
576 | 0 | header_size = kRecyclableHeaderSize; |
577 | 0 | if (first_record_read_ && !recycled_) { |
578 | | // A recycled log should have started with a recycled record |
579 | 0 | return kBadRecord; |
580 | 0 | } |
581 | 0 | recycled_ = true; |
582 | | // We need enough for the larger header |
583 | 0 | if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) { |
584 | 0 | uint8_t r = kEof; |
585 | 0 | if (!ReadMore(drop_size, &r)) { |
586 | 0 | return r; |
587 | 0 | } |
588 | 0 | continue; |
589 | 0 | } |
590 | 0 | } |
591 | | |
592 | 1.49M | if (header_size + length > buffer_.size()) { |
593 | 0 | assert(buffer_.size() >= static_cast<size_t>(header_size)); |
594 | 0 | *drop_size = buffer_.size(); |
595 | 0 | buffer_.clear(); |
596 | | // If the end of the read has been reached without seeing |
597 | | // `header_size + length` bytes of payload, report a corruption. The |
598 | | // higher layers can decide how to handle it based on the recovery mode, |
599 | | // whether this occurred at EOF, whether this is the final WAL, etc. |
600 | 0 | return kBadRecordLen; |
601 | 0 | } |
602 | | |
603 | 1.49M | if (is_recyclable_type) { |
604 | 0 | const uint32_t log_num = DecodeFixed32(header + 7); |
605 | 0 | if (log_num != log_number_) { |
606 | 0 | buffer_.remove_prefix(header_size + length); |
607 | 0 | return kOldRecord; |
608 | 0 | } |
609 | 0 | } |
610 | | |
611 | 1.49M | if (type == kZeroType && length == 0) { |
612 | | // Skip zero length record without reporting any drops since |
613 | | // such records are produced by the mmap based writing code in |
614 | | // env_posix.cc that preallocates file regions. |
615 | | // NOTE: this should never happen in DB written by new RocksDB versions, |
616 | | // since we turn off mmap writes to manifest and log files |
617 | 0 | buffer_.clear(); |
618 | 0 | return kBadRecord; |
619 | 0 | } |
620 | | |
621 | | // Check crc |
622 | 1.49M | if (checksum_) { |
623 | 1.49M | uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); |
624 | 1.49M | uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); |
625 | 1.49M | if (actual_crc != expected_crc) { |
626 | | // Drop the rest of the buffer since "length" itself may have |
627 | | // been corrupted and if we trust it, we could find some |
628 | | // fragment of a real log record that just happens to look |
629 | | // like a valid log record. |
630 | 0 | *drop_size = buffer_.size(); |
631 | 0 | buffer_.clear(); |
632 | 0 | return kBadRecordChecksum; |
633 | 0 | } |
634 | 1.49M | } |
635 | | |
636 | 1.49M | buffer_.remove_prefix(header_size + length); |
637 | | |
638 | 1.49M | if (!uncompress_ || type == kSetCompressionType || |
639 | 1.49M | type == kPredecessorWALInfoType || |
640 | 1.49M | type == kRecyclePredecessorWALInfoType || |
641 | 1.49M | type == kUserDefinedTimestampSizeType || |
642 | 1.49M | type == kRecyclableUserDefinedTimestampSizeType) { |
643 | 1.49M | *result = Slice(header + header_size, length); |
644 | 1.49M | return type; |
645 | 1.49M | } else { |
646 | | // Uncompress compressed records |
647 | 0 | uncompressed_record_.clear(); |
648 | 0 | if (fragment_checksum != nullptr) { |
649 | 0 | if (uncompress_hash_state_ == nullptr) { |
650 | 0 | uncompress_hash_state_ = XXH3_createState(); |
651 | 0 | } |
652 | 0 | XXH3_64bits_reset(uncompress_hash_state_); |
653 | 0 | } |
654 | |
|
655 | 0 | size_t uncompressed_size = 0; |
656 | 0 | int remaining = 0; |
657 | 0 | const char* input = header + header_size; |
658 | 0 | do { |
659 | 0 | remaining = uncompress_->Uncompress( |
660 | 0 | input, length, uncompressed_buffer_.get(), &uncompressed_size); |
661 | 0 | input = nullptr; |
662 | 0 | if (remaining < 0) { |
663 | 0 | buffer_.clear(); |
664 | 0 | return kBadRecord; |
665 | 0 | } |
666 | 0 | if (uncompressed_size > 0) { |
667 | 0 | if (fragment_checksum != nullptr) { |
668 | 0 | XXH3_64bits_update(uncompress_hash_state_, |
669 | 0 | uncompressed_buffer_.get(), uncompressed_size); |
670 | 0 | } |
671 | 0 | uncompressed_record_.append(uncompressed_buffer_.get(), |
672 | 0 | uncompressed_size); |
673 | 0 | } |
674 | 0 | } while (remaining > 0 || uncompressed_size == kBlockSize); |
675 | | |
676 | 0 | if (fragment_checksum != nullptr) { |
677 | | // We can remove this check by updating hash_state_ directly, |
678 | | // but that requires resetting hash_state_ for full and first types |
679 | | // for edge cases like consecutive fist type records. |
680 | | // Leaving the check as is since it is cleaner and can revert to the |
681 | | // above approach if it causes performance impact. |
682 | 0 | *fragment_checksum = XXH3_64bits_digest(uncompress_hash_state_); |
683 | 0 | uint64_t actual_checksum = XXH3_64bits(uncompressed_record_.data(), |
684 | 0 | uncompressed_record_.size()); |
685 | 0 | if (*fragment_checksum != actual_checksum) { |
686 | | // uncompressed_record_ contains bad content that does not match |
687 | | // actual decompressed content |
688 | 0 | return kBadRecord; |
689 | 0 | } |
690 | 0 | } |
691 | 0 | *result = Slice(uncompressed_record_); |
692 | 0 | return type; |
693 | 0 | } |
694 | 1.49M | } |
695 | 1.60M | } |
696 | | |
697 | | // Initialize uncompress related fields |
698 | 0 | void Reader::InitCompression(const CompressionTypeRecord& compression_record) { |
699 | 0 | compression_type_ = compression_record.GetCompressionType(); |
700 | 0 | compression_type_record_read_ = true; |
701 | 0 | constexpr uint32_t compression_format_version = 2; |
702 | 0 | uncompress_ = StreamingUncompress::Create( |
703 | 0 | compression_type_, compression_format_version, kBlockSize); |
704 | 0 | assert(uncompress_ != nullptr); |
705 | 0 | uncompressed_buffer_ = std::unique_ptr<char[]>(new char[kBlockSize]); |
706 | 0 | assert(uncompressed_buffer_); |
707 | 0 | } |
708 | | |
709 | | Status Reader::UpdateRecordedTimestampSize( |
710 | 0 | const std::vector<std::pair<uint32_t, size_t>>& cf_to_ts_sz) { |
711 | 0 | for (const auto& [cf, ts_sz] : cf_to_ts_sz) { |
712 | | // Zero user-defined timestamp size are not recorded. |
713 | 0 | if (ts_sz == 0) { |
714 | 0 | return Status::Corruption( |
715 | 0 | "User-defined timestamp size record contains zero timestamp size."); |
716 | 0 | } |
717 | | // The user-defined timestamp size record for a column family should not be |
718 | | // updated in the same log file. |
719 | 0 | if (recorded_cf_to_ts_sz_.count(cf) != 0) { |
720 | 0 | return Status::Corruption( |
721 | 0 | "User-defined timestamp size record contains update to " |
722 | 0 | "recorded column family."); |
723 | 0 | } |
724 | 0 | recorded_cf_to_ts_sz_.insert(std::make_pair(cf, ts_sz)); |
725 | 0 | } |
726 | 0 | return Status::OK(); |
727 | 0 | } |
728 | | |
729 | | bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, |
730 | | WALRecoveryMode wal_recovery_mode |
731 | | |
732 | | , |
733 | 0 | uint64_t* /* checksum */) { |
734 | 0 | assert(record != nullptr); |
735 | 0 | assert(scratch != nullptr); |
736 | 0 | record->clear(); |
737 | 0 | scratch->clear(); |
738 | 0 | if (uncompress_) { |
739 | 0 | uncompress_->Reset(); |
740 | 0 | } |
741 | |
|
742 | 0 | uint64_t prospective_record_offset = 0; |
743 | 0 | uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); |
744 | 0 | size_t drop_size = 0; |
745 | 0 | uint8_t fragment_type_or_err = 0; // Initialize to make compiler happy |
746 | 0 | Slice fragment; |
747 | 0 | while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) { |
748 | 0 | switch (fragment_type_or_err) { |
749 | 0 | case kFullType: |
750 | 0 | case kRecyclableFullType: |
751 | 0 | if (in_fragmented_record_ && !fragments_.empty()) { |
752 | 0 | ReportCorruption(fragments_.size(), "partial record without end(1)"); |
753 | 0 | } |
754 | 0 | fragments_.clear(); |
755 | 0 | *record = fragment; |
756 | 0 | prospective_record_offset = physical_record_offset; |
757 | 0 | last_record_offset_ = prospective_record_offset; |
758 | 0 | first_record_read_ = true; |
759 | 0 | in_fragmented_record_ = false; |
760 | 0 | return true; |
761 | | |
762 | 0 | case kFirstType: |
763 | 0 | case kRecyclableFirstType: |
764 | 0 | if (in_fragmented_record_ || !fragments_.empty()) { |
765 | 0 | ReportCorruption(fragments_.size(), "partial record without end(2)"); |
766 | 0 | } |
767 | 0 | prospective_record_offset = physical_record_offset; |
768 | 0 | fragments_.assign(fragment.data(), fragment.size()); |
769 | 0 | in_fragmented_record_ = true; |
770 | 0 | break; |
771 | | |
772 | 0 | case kMiddleType: |
773 | 0 | case kRecyclableMiddleType: |
774 | 0 | if (!in_fragmented_record_) { |
775 | 0 | ReportCorruption(fragment.size(), |
776 | 0 | "missing start of fragmented record(1)"); |
777 | 0 | } else { |
778 | 0 | fragments_.append(fragment.data(), fragment.size()); |
779 | 0 | } |
780 | 0 | break; |
781 | | |
782 | 0 | case kLastType: |
783 | 0 | case kRecyclableLastType: |
784 | 0 | if (!in_fragmented_record_) { |
785 | 0 | ReportCorruption(fragment.size(), |
786 | 0 | "missing start of fragmented record(2)"); |
787 | 0 | } else { |
788 | 0 | fragments_.append(fragment.data(), fragment.size()); |
789 | 0 | scratch->assign(fragments_.data(), fragments_.size()); |
790 | 0 | fragments_.clear(); |
791 | 0 | *record = Slice(*scratch); |
792 | 0 | last_record_offset_ = prospective_record_offset; |
793 | 0 | first_record_read_ = true; |
794 | 0 | in_fragmented_record_ = false; |
795 | 0 | return true; |
796 | 0 | } |
797 | 0 | break; |
798 | | |
799 | 0 | case kSetCompressionType: { |
800 | 0 | if (compression_type_record_read_) { |
801 | 0 | ReportCorruption(fragment.size(), |
802 | 0 | "read multiple SetCompressionType records"); |
803 | 0 | } |
804 | 0 | if (first_record_read_) { |
805 | 0 | ReportCorruption(fragment.size(), |
806 | 0 | "SetCompressionType not the first record"); |
807 | 0 | } |
808 | 0 | fragments_.clear(); |
809 | 0 | prospective_record_offset = physical_record_offset; |
810 | 0 | last_record_offset_ = prospective_record_offset; |
811 | 0 | in_fragmented_record_ = false; |
812 | 0 | CompressionTypeRecord compression_record(kNoCompression); |
813 | 0 | Status s = compression_record.DecodeFrom(&fragment); |
814 | 0 | if (!s.ok()) { |
815 | 0 | ReportCorruption(fragment.size(), |
816 | 0 | "could not decode SetCompressionType record"); |
817 | 0 | } else { |
818 | 0 | InitCompression(compression_record); |
819 | 0 | } |
820 | 0 | break; |
821 | 0 | } |
822 | 0 | case kPredecessorWALInfoType: |
823 | 0 | case kRecyclePredecessorWALInfoType: { |
824 | 0 | fragments_.clear(); |
825 | 0 | prospective_record_offset = physical_record_offset; |
826 | 0 | last_record_offset_ = prospective_record_offset; |
827 | 0 | in_fragmented_record_ = false; |
828 | |
|
829 | 0 | PredecessorWALInfo recorded_predecessor_wal_info; |
830 | 0 | Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment); |
831 | 0 | if (!s.ok()) { |
832 | 0 | ReportCorruption(fragment.size(), |
833 | 0 | "could not decode PredecessorWALInfoType record"); |
834 | 0 | } else { |
835 | 0 | MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, |
836 | 0 | recorded_predecessor_wal_info); |
837 | 0 | } |
838 | 0 | break; |
839 | 0 | } |
840 | 0 | case kUserDefinedTimestampSizeType: |
841 | 0 | case kRecyclableUserDefinedTimestampSizeType: { |
842 | 0 | if (in_fragmented_record_ && !scratch->empty()) { |
843 | 0 | ReportCorruption( |
844 | 0 | scratch->size(), |
845 | 0 | "user-defined timestamp size record interspersed partial record"); |
846 | 0 | } |
847 | 0 | fragments_.clear(); |
848 | 0 | prospective_record_offset = physical_record_offset; |
849 | 0 | last_record_offset_ = prospective_record_offset; |
850 | 0 | in_fragmented_record_ = false; |
851 | 0 | UserDefinedTimestampSizeRecord ts_record; |
852 | 0 | Status s = ts_record.DecodeFrom(&fragment); |
853 | 0 | if (!s.ok()) { |
854 | 0 | ReportCorruption( |
855 | 0 | fragment.size(), |
856 | 0 | "could not decode user-defined timestamp size record"); |
857 | 0 | } else { |
858 | 0 | s = UpdateRecordedTimestampSize( |
859 | 0 | ts_record.GetUserDefinedTimestampSize()); |
860 | 0 | if (!s.ok()) { |
861 | 0 | ReportCorruption(fragment.size(), s.getState()); |
862 | 0 | } |
863 | 0 | } |
864 | 0 | break; |
865 | 0 | } |
866 | | |
867 | 0 | case kBadHeader: |
868 | 0 | case kBadRecord: |
869 | 0 | case kEof: |
870 | 0 | case kOldRecord: |
871 | 0 | if (in_fragmented_record_) { |
872 | 0 | ReportCorruption(fragments_.size(), "error in middle of record"); |
873 | 0 | in_fragmented_record_ = false; |
874 | 0 | fragments_.clear(); |
875 | 0 | } |
876 | 0 | break; |
877 | | |
878 | 0 | case kBadRecordChecksum: |
879 | 0 | if (recycled_) { |
880 | 0 | fragments_.clear(); |
881 | 0 | return false; |
882 | 0 | } |
883 | 0 | ReportCorruption(drop_size, "checksum mismatch"); |
884 | 0 | if (in_fragmented_record_) { |
885 | 0 | ReportCorruption(fragments_.size(), "error in middle of record"); |
886 | 0 | in_fragmented_record_ = false; |
887 | 0 | fragments_.clear(); |
888 | 0 | } |
889 | 0 | break; |
890 | | |
891 | 0 | default: { |
892 | 0 | if ((fragment_type_or_err & kRecordTypeSafeIgnoreMask) == 0) { |
893 | 0 | std::string reason = |
894 | 0 | "unknown record type " + std::to_string(fragment_type_or_err); |
895 | 0 | ReportCorruption( |
896 | 0 | fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), |
897 | 0 | reason.c_str()); |
898 | 0 | } |
899 | 0 | in_fragmented_record_ = false; |
900 | 0 | fragments_.clear(); |
901 | 0 | break; |
902 | 0 | } |
903 | 0 | } |
904 | 0 | } |
905 | 0 | return false; |
906 | 0 | } |
907 | | |
908 | 0 | void FragmentBufferedReader::UnmarkEOF() { |
909 | 0 | if (read_error_) { |
910 | 0 | return; |
911 | 0 | } |
912 | 0 | eof_ = false; |
913 | 0 | UnmarkEOFInternal(); |
914 | 0 | } |
915 | | |
916 | 0 | bool FragmentBufferedReader::TryReadMore(size_t* drop_size, uint8_t* error) { |
917 | 0 | if (!eof_ && !read_error_) { |
918 | | // Last read was a full read, so this is a trailer to skip |
919 | 0 | buffer_.clear(); |
920 | | // TODO: rate limit log reader with approriate priority. |
921 | | // TODO: avoid overcharging rate limiter: |
922 | | // Note that the Read here might overcharge SequentialFileReader's internal |
923 | | // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough |
924 | | // content left until EOF to read. |
925 | 0 | Status status = file_->Read(kBlockSize, &buffer_, backing_store_, |
926 | 0 | Env::IO_TOTAL /* rate_limiter_priority */); |
927 | 0 | end_of_buffer_offset_ += buffer_.size(); |
928 | 0 | if (!status.ok()) { |
929 | 0 | buffer_.clear(); |
930 | 0 | ReportDrop(kBlockSize, status); |
931 | 0 | read_error_ = true; |
932 | 0 | *error = kEof; |
933 | 0 | return false; |
934 | 0 | } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) { |
935 | 0 | eof_ = true; |
936 | 0 | eof_offset_ = buffer_.size(); |
937 | 0 | TEST_SYNC_POINT_CALLBACK( |
938 | 0 | "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr); |
939 | 0 | } |
940 | 0 | return true; |
941 | 0 | } else if (!read_error_) { |
942 | 0 | UnmarkEOF(); |
943 | 0 | } |
944 | 0 | if (!read_error_) { |
945 | 0 | return true; |
946 | 0 | } |
947 | 0 | *error = kEof; |
948 | 0 | *drop_size = buffer_.size(); |
949 | 0 | if (buffer_.size() > 0) { |
950 | 0 | *error = kBadHeader; |
951 | 0 | } |
952 | 0 | buffer_.clear(); |
953 | 0 | return false; |
954 | 0 | } |
955 | | |
956 | | // return true if the caller should process the fragment_type_or_err. |
957 | | bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, |
958 | 0 | uint8_t* fragment_type_or_err) { |
959 | 0 | assert(fragment != nullptr); |
960 | 0 | assert(drop_size != nullptr); |
961 | 0 | assert(fragment_type_or_err != nullptr); |
962 | |
|
963 | 0 | while (buffer_.size() < static_cast<size_t>(kHeaderSize)) { |
964 | 0 | size_t old_size = buffer_.size(); |
965 | 0 | uint8_t error = kEof; |
966 | 0 | if (!TryReadMore(drop_size, &error)) { |
967 | 0 | *fragment_type_or_err = error; |
968 | 0 | return false; |
969 | 0 | } else if (old_size == buffer_.size()) { |
970 | 0 | return false; |
971 | 0 | } |
972 | 0 | } |
973 | 0 | const char* header = buffer_.data(); |
974 | 0 | const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; |
975 | 0 | const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; |
976 | 0 | const uint8_t type = static_cast<uint8_t>(header[6]); |
977 | 0 | const uint32_t length = a | (b << 8); |
978 | 0 | int header_size = kHeaderSize; |
979 | 0 | if ((type >= kRecyclableFullType && type <= kRecyclableLastType) || |
980 | 0 | type == kRecyclableUserDefinedTimestampSizeType || |
981 | 0 | type == kRecyclePredecessorWALInfoType) { |
982 | 0 | if (first_record_read_ && !recycled_) { |
983 | | // A recycled log should have started with a recycled record |
984 | 0 | *fragment_type_or_err = kBadRecord; |
985 | 0 | return true; |
986 | 0 | } |
987 | 0 | recycled_ = true; |
988 | 0 | header_size = kRecyclableHeaderSize; |
989 | 0 | while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) { |
990 | 0 | size_t old_size = buffer_.size(); |
991 | 0 | uint8_t error = kEof; |
992 | 0 | if (!TryReadMore(drop_size, &error)) { |
993 | 0 | *fragment_type_or_err = error; |
994 | 0 | return false; |
995 | 0 | } else if (old_size == buffer_.size()) { |
996 | 0 | return false; |
997 | 0 | } |
998 | 0 | } |
999 | 0 | const uint32_t log_num = DecodeFixed32(header + 7); |
1000 | 0 | if (log_num != log_number_) { |
1001 | 0 | *fragment_type_or_err = kOldRecord; |
1002 | 0 | return true; |
1003 | 0 | } |
1004 | 0 | } |
1005 | | |
1006 | 0 | while (header_size + length > buffer_.size()) { |
1007 | 0 | size_t old_size = buffer_.size(); |
1008 | 0 | uint8_t error = kEof; |
1009 | 0 | if (!TryReadMore(drop_size, &error)) { |
1010 | 0 | *fragment_type_or_err = error; |
1011 | 0 | return false; |
1012 | 0 | } else if (old_size == buffer_.size()) { |
1013 | 0 | return false; |
1014 | 0 | } |
1015 | 0 | } |
1016 | | |
1017 | 0 | if (type == kZeroType && length == 0) { |
1018 | 0 | buffer_.clear(); |
1019 | 0 | *fragment_type_or_err = kBadRecord; |
1020 | 0 | return true; |
1021 | 0 | } |
1022 | | |
1023 | 0 | if (checksum_) { |
1024 | 0 | uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); |
1025 | 0 | uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); |
1026 | 0 | if (actual_crc != expected_crc) { |
1027 | 0 | *drop_size = buffer_.size(); |
1028 | 0 | buffer_.clear(); |
1029 | 0 | *fragment_type_or_err = kBadRecordChecksum; |
1030 | 0 | return true; |
1031 | 0 | } |
1032 | 0 | } |
1033 | | |
1034 | 0 | buffer_.remove_prefix(header_size + length); |
1035 | |
|
1036 | 0 | if (!uncompress_ || type == kSetCompressionType || |
1037 | 0 | type == kPredecessorWALInfoType || |
1038 | 0 | type == kRecyclePredecessorWALInfoType || |
1039 | 0 | type == kUserDefinedTimestampSizeType || |
1040 | 0 | type == kRecyclableUserDefinedTimestampSizeType) { |
1041 | 0 | *fragment = Slice(header + header_size, length); |
1042 | 0 | *fragment_type_or_err = type; |
1043 | 0 | return true; |
1044 | 0 | } else { |
1045 | | // Uncompress compressed records |
1046 | 0 | uncompressed_record_.clear(); |
1047 | 0 | size_t uncompressed_size = 0; |
1048 | 0 | int remaining = 0; |
1049 | 0 | const char* input = header + header_size; |
1050 | 0 | do { |
1051 | 0 | remaining = uncompress_->Uncompress( |
1052 | 0 | input, length, uncompressed_buffer_.get(), &uncompressed_size); |
1053 | 0 | input = nullptr; |
1054 | 0 | if (remaining < 0) { |
1055 | 0 | buffer_.clear(); |
1056 | 0 | *fragment_type_or_err = kBadRecord; |
1057 | 0 | return true; |
1058 | 0 | } |
1059 | 0 | if (uncompressed_size > 0) { |
1060 | 0 | uncompressed_record_.append(uncompressed_buffer_.get(), |
1061 | 0 | uncompressed_size); |
1062 | 0 | } |
1063 | 0 | } while (remaining > 0 || uncompressed_size == kBlockSize); |
1064 | 0 | *fragment = Slice(std::move(uncompressed_record_)); |
1065 | 0 | *fragment_type_or_err = type; |
1066 | 0 | return true; |
1067 | 0 | } |
1068 | 0 | } |
1069 | | |
1070 | | } // namespace ROCKSDB_NAMESPACE::log |