/src/rocksdb/db/log_reader.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/log_reader.h" |
11 | | |
12 | | #include <cstdio> |
13 | | |
14 | | #include "file/sequence_file_reader.h" |
15 | | #include "port/lang.h" |
16 | | #include "rocksdb/env.h" |
17 | | #include "test_util/sync_point.h" |
18 | | #include "util/coding.h" |
19 | | #include "util/crc32c.h" |
20 | | |
21 | | namespace ROCKSDB_NAMESPACE::log { |
22 | | |
23 | 87.6k | Reader::Reporter::~Reporter() = default; |
24 | | |
25 | | Reader::Reader(std::shared_ptr<Logger> info_log, |
26 | | std::unique_ptr<SequentialFileReader>&& _file, |
27 | | Reporter* reporter, bool checksum, uint64_t log_num, |
28 | | bool track_and_verify_wals, bool stop_replay_for_corruption, |
29 | | uint64_t min_wal_number_to_keep, |
30 | | const PredecessorWALInfo& observed_predecessor_wal_info) |
31 | 87.6k | : info_log_(info_log), |
32 | 87.6k | file_(std::move(_file)), |
33 | 87.6k | reporter_(reporter), |
34 | 87.6k | checksum_(checksum), |
35 | 87.6k | backing_store_(new char[kBlockSize]), |
36 | 87.6k | buffer_(), |
37 | 87.6k | eof_(false), |
38 | 87.6k | read_error_(false), |
39 | 87.6k | eof_offset_(0), |
40 | 87.6k | last_record_offset_(0), |
41 | 87.6k | end_of_buffer_offset_(0), |
42 | 87.6k | log_number_(log_num), |
43 | 87.6k | track_and_verify_wals_(track_and_verify_wals), |
44 | 87.6k | stop_replay_for_corruption_(stop_replay_for_corruption), |
45 | 87.6k | min_wal_number_to_keep_(min_wal_number_to_keep), |
46 | 87.6k | observed_predecessor_wal_info_(observed_predecessor_wal_info), |
47 | 87.6k | recycled_(false), |
48 | 87.6k | first_record_read_(false), |
49 | 87.6k | compression_type_(kNoCompression), |
50 | 87.6k | compression_type_record_read_(false), |
51 | 87.6k | uncompress_(nullptr), |
52 | 87.6k | hash_state_(nullptr), |
53 | 87.6k | uncompress_hash_state_(nullptr) {} |
54 | | |
55 | 87.6k | Reader::~Reader() { |
56 | 87.6k | delete[] backing_store_; |
57 | 87.6k | if (uncompress_) { |
58 | 0 | delete uncompress_; |
59 | 0 | } |
60 | 87.6k | if (hash_state_) { |
61 | 39.2k | XXH3_freeState(hash_state_); |
62 | 39.2k | } |
63 | 87.6k | if (uncompress_hash_state_) { |
64 | 0 | XXH3_freeState(uncompress_hash_state_); |
65 | 0 | } |
66 | 87.6k | } |
67 | | |
68 | | // For kAbsoluteConsistency, on clean shutdown we don't expect any error |
69 | | // in the log files. For other modes, we can ignore only incomplete records |
70 | | // in the last log file, which are presumably due to a write in progress |
71 | | // during restart (or from log recycling). |
72 | | // |
73 | | // TODO krad: Evaluate if we need to move to a more strict mode where we |
74 | | // restrict the inconsistency to only the last log |
75 | | // TODO (hx235): move `wal_recovery_mode` to be a member data like other |
76 | | // information (e.g, `stop_replay_for_corruption`) to decide whether to |
77 | | // check for and surface corruption in `ReadRecord()` |
78 | | bool Reader::ReadRecord(Slice* record, std::string* scratch, |
79 | | WALRecoveryMode wal_recovery_mode, |
80 | 1.47M | uint64_t* record_checksum) { |
81 | 1.47M | scratch->clear(); |
82 | 1.47M | record->clear(); |
83 | 1.47M | if (record_checksum != nullptr) { |
84 | 1.04M | if (hash_state_ == nullptr) { |
85 | 39.2k | hash_state_ = XXH3_createState(); |
86 | 39.2k | } |
87 | 1.04M | XXH3_64bits_reset(hash_state_); |
88 | 1.04M | } |
89 | 1.47M | if (uncompress_) { |
90 | 0 | uncompress_->Reset(); |
91 | 0 | } |
92 | 1.47M | bool in_fragmented_record = false; |
93 | | // Record offset of the logical record that we're reading |
94 | | // 0 is a dummy value to make compilers happy |
95 | 1.47M | uint64_t prospective_record_offset = 0; |
96 | | |
97 | 1.47M | Slice fragment; |
98 | 1.48M | for (;;) { |
99 | 1.48M | uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); |
100 | 1.48M | size_t drop_size = 0; |
101 | 1.48M | const uint8_t record_type = |
102 | 1.48M | ReadPhysicalRecord(&fragment, &drop_size, record_checksum); |
103 | 1.48M | switch (record_type) { |
104 | 1.38M | case kFullType: |
105 | 1.38M | case kRecyclableFullType: |
106 | 1.38M | if (in_fragmented_record && !scratch->empty()) { |
107 | | // Handle bug in earlier versions of log::Writer where |
108 | | // it could emit an empty kFirstType record at the tail end |
109 | | // of a block followed by a kFullType or kFirstType record |
110 | | // at the beginning of the next block. |
111 | 0 | ReportCorruption(scratch->size(), "partial record without end(1)"); |
112 | 0 | } |
113 | | // No need to compute record_checksum since the record |
114 | | // consists of a single fragment and the checksum is computed |
115 | | // in ReadPhysicalRecord() if WAL compression is enabled |
116 | 1.38M | if (record_checksum != nullptr && uncompress_ == nullptr) { |
117 | | // No need to stream since the record is a single fragment |
118 | 1.00M | *record_checksum = XXH3_64bits(fragment.data(), fragment.size()); |
119 | 1.00M | } |
120 | 1.38M | prospective_record_offset = physical_record_offset; |
121 | 1.38M | scratch->clear(); |
122 | 1.38M | *record = fragment; |
123 | 1.38M | last_record_offset_ = prospective_record_offset; |
124 | 1.38M | first_record_read_ = true; |
125 | 1.38M | return true; |
126 | | |
127 | 4.05k | case kFirstType: |
128 | 4.05k | case kRecyclableFirstType: |
129 | 4.05k | if (in_fragmented_record && !scratch->empty()) { |
130 | | // Handle bug in earlier versions of log::Writer where |
131 | | // it could emit an empty kFirstType record at the tail end |
132 | | // of a block followed by a kFullType or kFirstType record |
133 | | // at the beginning of the next block. |
134 | 0 | ReportCorruption(scratch->size(), "partial record without end(2)"); |
135 | 0 | XXH3_64bits_reset(hash_state_); |
136 | 0 | } |
137 | 4.05k | if (record_checksum != nullptr) { |
138 | 4.05k | XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); |
139 | 4.05k | } |
140 | 4.05k | prospective_record_offset = physical_record_offset; |
141 | 4.05k | scratch->assign(fragment.data(), fragment.size()); |
142 | 4.05k | in_fragmented_record = true; |
143 | 4.05k | break; // switch |
144 | | |
145 | 1.86k | case kMiddleType: |
146 | 1.86k | case kRecyclableMiddleType: |
147 | 1.86k | if (!in_fragmented_record) { |
148 | 0 | ReportCorruption(fragment.size(), |
149 | 0 | "missing start of fragmented record(1)"); |
150 | 1.86k | } else { |
151 | 1.86k | if (record_checksum != nullptr) { |
152 | 1.86k | XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); |
153 | 1.86k | } |
154 | 1.86k | scratch->append(fragment.data(), fragment.size()); |
155 | 1.86k | } |
156 | 1.86k | break; // switch |
157 | | |
158 | 4.05k | case kLastType: |
159 | 4.05k | case kRecyclableLastType: |
160 | 4.05k | if (!in_fragmented_record) { |
161 | 0 | ReportCorruption(fragment.size(), |
162 | 0 | "missing start of fragmented record(2)"); |
163 | 4.05k | } else { |
164 | 4.05k | if (record_checksum != nullptr) { |
165 | 4.05k | XXH3_64bits_update(hash_state_, fragment.data(), fragment.size()); |
166 | 4.05k | *record_checksum = XXH3_64bits_digest(hash_state_); |
167 | 4.05k | } |
168 | 4.05k | scratch->append(fragment.data(), fragment.size()); |
169 | 4.05k | *record = Slice(*scratch); |
170 | 4.05k | last_record_offset_ = prospective_record_offset; |
171 | 4.05k | first_record_read_ = true; |
172 | 4.05k | return true; |
173 | 4.05k | } |
174 | 0 | break; // switch |
175 | | |
176 | 0 | case kSetCompressionType: { |
177 | 0 | if (compression_type_record_read_) { |
178 | 0 | ReportCorruption(fragment.size(), |
179 | 0 | "read multiple SetCompressionType records"); |
180 | 0 | } |
181 | 0 | if (first_record_read_) { |
182 | 0 | ReportCorruption(fragment.size(), |
183 | 0 | "SetCompressionType not the first record"); |
184 | 0 | } |
185 | 0 | prospective_record_offset = physical_record_offset; |
186 | 0 | scratch->clear(); |
187 | 0 | last_record_offset_ = prospective_record_offset; |
188 | 0 | CompressionTypeRecord compression_record(kNoCompression); |
189 | 0 | Status s = compression_record.DecodeFrom(&fragment); |
190 | 0 | if (!s.ok()) { |
191 | 0 | ReportCorruption(fragment.size(), |
192 | 0 | "could not decode SetCompressionType record"); |
193 | 0 | } else { |
194 | 0 | InitCompression(compression_record); |
195 | 0 | } |
196 | 0 | break; // switch |
197 | 4.05k | } |
198 | 0 | case kPredecessorWALInfoType: |
199 | 0 | case kRecyclePredecessorWALInfoType: { |
200 | 0 | prospective_record_offset = physical_record_offset; |
201 | 0 | scratch->clear(); |
202 | 0 | last_record_offset_ = prospective_record_offset; |
203 | |
|
204 | 0 | PredecessorWALInfo recorded_predecessor_wal_info; |
205 | 0 | Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment); |
206 | 0 | if (!s.ok()) { |
207 | 0 | ReportCorruption(fragment.size(), |
208 | 0 | "could not decode PredecessorWALInfoType record"); |
209 | 0 | } else { |
210 | 0 | MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, |
211 | 0 | recorded_predecessor_wal_info); |
212 | 0 | } |
213 | 0 | break; // switch |
214 | 0 | } |
215 | 0 | case kUserDefinedTimestampSizeType: |
216 | 0 | case kRecyclableUserDefinedTimestampSizeType: { |
217 | 0 | if (in_fragmented_record && !scratch->empty()) { |
218 | 0 | ReportCorruption( |
219 | 0 | scratch->size(), |
220 | 0 | "user-defined timestamp size record interspersed partial record"); |
221 | 0 | } |
222 | 0 | prospective_record_offset = physical_record_offset; |
223 | 0 | scratch->clear(); |
224 | 0 | last_record_offset_ = prospective_record_offset; |
225 | 0 | UserDefinedTimestampSizeRecord ts_record; |
226 | 0 | Status s = ts_record.DecodeFrom(&fragment); |
227 | 0 | if (!s.ok()) { |
228 | 0 | ReportCorruption( |
229 | 0 | fragment.size(), |
230 | 0 | "could not decode user-defined timestamp size record"); |
231 | 0 | } else { |
232 | 0 | s = UpdateRecordedTimestampSize( |
233 | 0 | ts_record.GetUserDefinedTimestampSize()); |
234 | 0 | if (!s.ok()) { |
235 | 0 | ReportCorruption(fragment.size(), s.getState()); |
236 | 0 | } |
237 | 0 | } |
238 | 0 | break; // switch |
239 | 0 | } |
240 | | |
241 | 0 | case kBadHeader: |
242 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
243 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
244 | | // In clean shutdown we don't expect any error in the log files. |
245 | | // In point-in-time recovery an incomplete record at the end could |
246 | | // produce a hole in the recovered data. Report an error here, which |
247 | | // higher layers can choose to ignore when it's provable there is no |
248 | | // hole. |
249 | 0 | ReportCorruption(drop_size, "truncated header"); |
250 | 0 | } |
251 | 0 | FALLTHROUGH_INTENDED; |
252 | |
|
253 | 87.6k | case kEof: |
254 | 87.6k | if (in_fragmented_record) { |
255 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
256 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
257 | | // In clean shutdown we don't expect any error in the log files. |
258 | | // In point-in-time recovery an incomplete record at the end could |
259 | | // produce a hole in the recovered data. Report an error here, which |
260 | | // higher layers can choose to ignore when it's provable there is no |
261 | | // hole. |
262 | 0 | ReportCorruption( |
263 | 0 | scratch->size(), |
264 | 0 | "error reading trailing data due to encountering EOF"); |
265 | 0 | } |
266 | | // This can be caused by the writer dying immediately after |
267 | | // writing a physical record but before completing the next; don't |
268 | | // treat it as a corruption, just ignore the entire logical record. |
269 | 0 | scratch->clear(); |
270 | 0 | } |
271 | 87.6k | return false; |
272 | | |
273 | 0 | case kOldRecord: |
274 | 0 | if (wal_recovery_mode != WALRecoveryMode::kSkipAnyCorruptedRecords) { |
275 | | // Treat a record from a previous instance of the log as EOF. |
276 | 0 | if (in_fragmented_record) { |
277 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
278 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
279 | | // In clean shutdown we don't expect any error in the log files. |
280 | | // In point-in-time recovery an incomplete record at the end could |
281 | | // produce a hole in the recovered data. Report an error here, |
282 | | // which higher layers can choose to ignore when it's provable |
283 | | // there is no hole. |
284 | 0 | ReportCorruption( |
285 | 0 | scratch->size(), |
286 | 0 | "error reading trailing data due to encountering old record"); |
287 | 0 | } |
288 | | // This can be caused by the writer dying immediately after |
289 | | // writing a physical record but before completing the next; don't |
290 | | // treat it as a corruption, just ignore the entire logical record. |
291 | 0 | scratch->clear(); |
292 | 0 | } else { |
293 | 0 | if (wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
294 | 0 | ReportOldLogRecord(scratch->size()); |
295 | 0 | } |
296 | 0 | } |
297 | 0 | return false; |
298 | 0 | } |
299 | 0 | FALLTHROUGH_INTENDED; |
300 | |
|
301 | 0 | case kBadRecord: |
302 | 0 | if (in_fragmented_record) { |
303 | 0 | ReportCorruption(scratch->size(), "error in middle of record"); |
304 | 0 | in_fragmented_record = false; |
305 | 0 | scratch->clear(); |
306 | 0 | } |
307 | 0 | break; // switch |
308 | | |
309 | 0 | case kBadRecordLen: |
310 | 0 | if (eof_) { |
311 | 0 | if (wal_recovery_mode == WALRecoveryMode::kAbsoluteConsistency || |
312 | 0 | wal_recovery_mode == WALRecoveryMode::kPointInTimeRecovery) { |
313 | | // In clean shutdown we don't expect any error in the log files. |
314 | | // In point-in-time recovery an incomplete record at the end could |
315 | | // produce a hole in the recovered data. Report an error here, which |
316 | | // higher layers can choose to ignore when it's provable there is no |
317 | | // hole. |
318 | 0 | ReportCorruption(drop_size, "truncated record body"); |
319 | 0 | } |
320 | 0 | return false; |
321 | 0 | } |
322 | 0 | FALLTHROUGH_INTENDED; |
323 | |
|
324 | 0 | case kBadRecordChecksum: |
325 | 0 | if (recycled_ && wal_recovery_mode == |
326 | 0 | WALRecoveryMode::kTolerateCorruptedTailRecords) { |
327 | 0 | scratch->clear(); |
328 | 0 | return false; |
329 | 0 | } |
330 | 0 | if (record_type == kBadRecordLen) { |
331 | 0 | ReportCorruption(drop_size, "bad record length"); |
332 | 0 | } else { |
333 | 0 | ReportCorruption(drop_size, "checksum mismatch"); |
334 | 0 | } |
335 | 0 | if (in_fragmented_record) { |
336 | 0 | ReportCorruption(scratch->size(), "error in middle of record"); |
337 | 0 | in_fragmented_record = false; |
338 | 0 | scratch->clear(); |
339 | 0 | } |
340 | 0 | break; // switch |
341 | | |
342 | 0 | default: { |
343 | 0 | if ((record_type & kRecordTypeSafeIgnoreMask) == 0) { |
344 | 0 | std::string reason = |
345 | 0 | "unknown record type " + std::to_string(record_type); |
346 | 0 | ReportCorruption( |
347 | 0 | (fragment.size() + (in_fragmented_record ? scratch->size() : 0)), |
348 | 0 | reason.c_str()); |
349 | 0 | } |
350 | 0 | in_fragmented_record = false; |
351 | 0 | scratch->clear(); |
352 | 0 | break; // switch |
353 | 0 | } |
354 | 1.48M | } |
355 | 1.48M | } |
356 | | // unreachable |
357 | 1.47M | } |
358 | | |
359 | | void Reader::MaybeVerifyPredecessorWALInfo( |
360 | | WALRecoveryMode wal_recovery_mode, Slice fragment, |
361 | 0 | const PredecessorWALInfo& recorded_predecessor_wal_info) { |
362 | 0 | if (!track_and_verify_wals_ || |
363 | 0 | wal_recovery_mode == WALRecoveryMode::kSkipAnyCorruptedRecords || |
364 | 0 | stop_replay_for_corruption_) { |
365 | 0 | return; |
366 | 0 | } |
367 | 0 | assert(recorded_predecessor_wal_info.IsInitialized()); |
368 | 0 | uint64_t recorded_predecessor_log_number = |
369 | 0 | recorded_predecessor_wal_info.GetLogNumber(); |
370 | | |
371 | | // This is the first WAL recovered thus with no predecessor WAL info has been |
372 | | // initialized |
373 | 0 | if (!observed_predecessor_wal_info_.IsInitialized()) { |
374 | 0 | if (recorded_predecessor_log_number >= min_wal_number_to_keep_) { |
375 | 0 | std::string reason = "Missing WAL of log number " + |
376 | 0 | std::to_string(recorded_predecessor_log_number); |
377 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
378 | 0 | recorded_predecessor_log_number); |
379 | 0 | } |
380 | 0 | } else { |
381 | 0 | if (observed_predecessor_wal_info_.GetLogNumber() != |
382 | 0 | recorded_predecessor_log_number) { |
383 | 0 | std::string reason = |
384 | 0 | "Mismatched predecessor log number of WAL file " + |
385 | 0 | file_->file_name() + " Recorded " + |
386 | 0 | std::to_string(recorded_predecessor_log_number) + ". Observed " + |
387 | 0 | std::to_string(observed_predecessor_wal_info_.GetLogNumber()); |
388 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
389 | 0 | recorded_predecessor_log_number); |
390 | 0 | } else if (observed_predecessor_wal_info_.GetLastSeqnoRecorded() != |
391 | 0 | recorded_predecessor_wal_info.GetLastSeqnoRecorded()) { |
392 | 0 | std::string reason = |
393 | 0 | "Mismatched last sequence number recorded in the WAL of log number " + |
394 | 0 | std::to_string(recorded_predecessor_log_number) + ". Recorded " + |
395 | 0 | std::to_string(recorded_predecessor_wal_info.GetLastSeqnoRecorded()) + |
396 | 0 | ". Observed " + |
397 | 0 | std::to_string( |
398 | 0 | observed_predecessor_wal_info_.GetLastSeqnoRecorded()) + |
399 | 0 | ". (Last sequence number equal to 0 indicates no WAL records)"; |
400 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
401 | 0 | recorded_predecessor_log_number); |
402 | 0 | } else if (observed_predecessor_wal_info_.GetSizeBytes() != |
403 | 0 | recorded_predecessor_wal_info.GetSizeBytes()) { |
404 | 0 | std::string reason = |
405 | 0 | "Mismatched size of the WAL of log number " + |
406 | 0 | std::to_string(recorded_predecessor_log_number) + ". Recorded " + |
407 | 0 | std::to_string(recorded_predecessor_wal_info.GetSizeBytes()) + |
408 | 0 | " bytes. Observed " + |
409 | 0 | std::to_string(observed_predecessor_wal_info_.GetSizeBytes()) + |
410 | 0 | " bytes."; |
411 | 0 | ReportCorruption(fragment.size(), reason.c_str(), |
412 | 0 | recorded_predecessor_log_number); |
413 | 0 | } |
414 | 0 | } |
415 | 0 | } |
416 | | |
417 | 0 | uint64_t Reader::LastRecordOffset() { return last_record_offset_; } |
418 | | |
419 | 430k | uint64_t Reader::LastRecordEnd() { |
420 | 430k | return end_of_buffer_offset_ - buffer_.size(); |
421 | 430k | } |
422 | | |
423 | 0 | void Reader::UnmarkEOF() { |
424 | 0 | if (read_error_) { |
425 | 0 | return; |
426 | 0 | } |
427 | 0 | eof_ = false; |
428 | 0 | if (eof_offset_ == 0) { |
429 | 0 | return; |
430 | 0 | } |
431 | 0 | UnmarkEOFInternal(); |
432 | 0 | } |
433 | | |
434 | 0 | void Reader::UnmarkEOFInternal() { |
435 | | // If the EOF was in the middle of a block (a partial block was read) we have |
436 | | // to read the rest of the block as ReadPhysicalRecord can only read full |
437 | | // blocks and expects the file position indicator to be aligned to the start |
438 | | // of a block. |
439 | | // |
440 | | // consumed_bytes + buffer_size() + remaining == kBlockSize |
441 | |
|
442 | 0 | size_t consumed_bytes = eof_offset_ - buffer_.size(); |
443 | 0 | size_t remaining = kBlockSize - eof_offset_; |
444 | | |
445 | | // backing_store_ is used to concatenate what is left in buffer_ and |
446 | | // the remainder of the block. If buffer_ already uses backing_store_, |
447 | | // we just append the new data. |
448 | 0 | if (buffer_.data() != backing_store_ + consumed_bytes) { |
449 | | // Buffer_ does not use backing_store_ for storage. |
450 | | // Copy what is left in buffer_ to backing_store. |
451 | 0 | memmove(backing_store_ + consumed_bytes, buffer_.data(), buffer_.size()); |
452 | 0 | } |
453 | |
|
454 | 0 | Slice read_buffer; |
455 | | // TODO: rate limit log reader with approriate priority. |
456 | | // TODO: avoid overcharging rate limiter: |
457 | | // Note that the Read here might overcharge SequentialFileReader's internal |
458 | | // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough |
459 | | // content left until EOF to read. |
460 | 0 | Status status = |
461 | 0 | file_->Read(remaining, &read_buffer, backing_store_ + eof_offset_, |
462 | 0 | Env::IO_TOTAL /* rate_limiter_priority */); |
463 | |
|
464 | 0 | size_t added = read_buffer.size(); |
465 | 0 | end_of_buffer_offset_ += added; |
466 | |
|
467 | 0 | if (!status.ok()) { |
468 | 0 | if (added > 0) { |
469 | 0 | ReportDrop(added, status); |
470 | 0 | } |
471 | |
|
472 | 0 | read_error_ = true; |
473 | 0 | return; |
474 | 0 | } |
475 | | |
476 | 0 | if (read_buffer.data() != backing_store_ + eof_offset_) { |
477 | | // Read did not write to backing_store_ |
478 | 0 | memmove(backing_store_ + eof_offset_, read_buffer.data(), |
479 | 0 | read_buffer.size()); |
480 | 0 | } |
481 | |
|
482 | 0 | buffer_ = Slice(backing_store_ + consumed_bytes, |
483 | 0 | eof_offset_ + added - consumed_bytes); |
484 | |
|
485 | 0 | if (added < remaining) { |
486 | 0 | eof_ = true; |
487 | 0 | eof_offset_ += added; |
488 | 0 | } else { |
489 | 0 | eof_offset_ = 0; |
490 | 0 | } |
491 | 0 | } |
492 | | |
493 | | void Reader::ReportCorruption(size_t bytes, const char* reason, |
494 | 0 | uint64_t log_number) { |
495 | 0 | ReportDrop(bytes, Status::Corruption(reason), log_number); |
496 | 0 | } |
497 | | |
498 | | void Reader::ReportDrop(size_t bytes, const Status& reason, |
499 | 0 | uint64_t log_number) { |
500 | 0 | if (reporter_ != nullptr) { |
501 | 0 | reporter_->Corruption(bytes, reason, log_number); |
502 | 0 | } |
503 | 0 | } |
504 | | |
505 | 0 | void Reader::ReportOldLogRecord(size_t bytes) { |
506 | 0 | if (reporter_ != nullptr) { |
507 | 0 | reporter_->OldLogRecord(bytes); |
508 | 0 | } |
509 | 0 | } |
510 | | |
511 | 181k | bool Reader::ReadMore(size_t* drop_size, uint8_t* error) { |
512 | 181k | if (!eof_ && !read_error_) { |
513 | | // Last read was a full read, so this is a trailer to skip |
514 | 93.7k | buffer_.clear(); |
515 | | // TODO: rate limit log reader with approriate priority. |
516 | | // TODO: avoid overcharging rate limiter: |
517 | | // Note that the Read here might overcharge SequentialFileReader's internal |
518 | | // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough |
519 | | // content left until EOF to read. |
520 | 93.7k | Status status = file_->Read(kBlockSize, &buffer_, backing_store_, |
521 | 93.7k | Env::IO_TOTAL /* rate_limiter_priority */); |
522 | 93.7k | TEST_SYNC_POINT_CALLBACK("LogReader::ReadMore:AfterReadFile", &status); |
523 | 93.7k | end_of_buffer_offset_ += buffer_.size(); |
524 | 93.7k | if (!status.ok()) { |
525 | 0 | buffer_.clear(); |
526 | 0 | ReportDrop(kBlockSize, status); |
527 | 0 | read_error_ = true; |
528 | 0 | *error = kEof; |
529 | 0 | return false; |
530 | 93.7k | } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) { |
531 | 87.6k | eof_ = true; |
532 | 87.6k | eof_offset_ = buffer_.size(); |
533 | 87.6k | } |
534 | 93.7k | return true; |
535 | 93.7k | } else { |
536 | | // Note that if buffer_ is non-empty, we have a truncated header at the |
537 | | // end of the file, which can be caused by the writer crashing in the |
538 | | // middle of writing the header. Unless explicitly requested we don't |
539 | | // considering this an error, just report EOF. |
540 | 87.6k | if (buffer_.size()) { |
541 | 0 | *drop_size = buffer_.size(); |
542 | 0 | buffer_.clear(); |
543 | 0 | *error = kBadHeader; |
544 | 0 | return false; |
545 | 0 | } |
546 | 87.6k | buffer_.clear(); |
547 | 87.6k | *error = kEof; |
548 | 87.6k | return false; |
549 | 87.6k | } |
550 | 181k | } |
551 | | |
552 | | uint8_t Reader::ReadPhysicalRecord(Slice* result, size_t* drop_size, |
553 | 1.48M | uint64_t* fragment_checksum) { |
554 | 1.57M | while (true) { |
555 | | // We need at least the minimum header size |
556 | 1.57M | if (buffer_.size() < static_cast<size_t>(kHeaderSize)) { |
557 | | // the default value of r is meaningless because ReadMore will overwrite |
558 | | // it if it returns false; in case it returns true, the return value will |
559 | | // not be used anyway |
560 | 181k | uint8_t r = kEof; |
561 | 181k | if (!ReadMore(drop_size, &r)) { |
562 | 87.6k | return r; |
563 | 87.6k | } |
564 | 93.7k | continue; |
565 | 181k | } |
566 | | |
567 | | // Parse the header |
568 | 1.39M | const char* header = buffer_.data(); |
569 | 1.39M | const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; |
570 | 1.39M | const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; |
571 | 1.39M | const uint8_t type = static_cast<uint8_t>(header[6]); |
572 | 1.39M | const uint32_t length = a | (b << 8); |
573 | 1.39M | int header_size = kHeaderSize; |
574 | 1.39M | const bool is_recyclable_type = |
575 | 1.39M | ((type >= kRecyclableFullType && type <= kRecyclableLastType) || |
576 | 1.39M | type == kRecyclableUserDefinedTimestampSizeType || |
577 | 1.39M | type == kRecyclePredecessorWALInfoType); |
578 | 1.39M | if (is_recyclable_type) { |
579 | 0 | header_size = kRecyclableHeaderSize; |
580 | 0 | if (first_record_read_ && !recycled_) { |
581 | | // A recycled log should have started with a recycled record |
582 | 0 | return kBadRecord; |
583 | 0 | } |
584 | 0 | recycled_ = true; |
585 | | // We need enough for the larger header |
586 | 0 | if (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) { |
587 | 0 | uint8_t r = kEof; |
588 | 0 | if (!ReadMore(drop_size, &r)) { |
589 | 0 | return r; |
590 | 0 | } |
591 | 0 | continue; |
592 | 0 | } |
593 | 0 | } |
594 | | |
595 | 1.39M | if (header_size + length > buffer_.size()) { |
596 | 0 | assert(buffer_.size() >= static_cast<size_t>(header_size)); |
597 | 0 | *drop_size = buffer_.size(); |
598 | 0 | buffer_.clear(); |
599 | | // If the end of the read has been reached without seeing |
600 | | // `header_size + length` bytes of payload, report a corruption. The |
601 | | // higher layers can decide how to handle it based on the recovery mode, |
602 | | // whether this occurred at EOF, whether this is the final WAL, etc. |
603 | 0 | return kBadRecordLen; |
604 | 0 | } |
605 | | |
606 | 1.39M | if (is_recyclable_type) { |
607 | 0 | const uint32_t log_num = DecodeFixed32(header + 7); |
608 | 0 | if (log_num != log_number_) { |
609 | 0 | buffer_.remove_prefix(header_size + length); |
610 | 0 | return kOldRecord; |
611 | 0 | } |
612 | 0 | } |
613 | | |
614 | 1.39M | if (type == kZeroType && length == 0) { |
615 | | // Skip zero length record without reporting any drops since |
616 | | // such records are produced by the mmap based writing code in |
617 | | // env_posix.cc that preallocates file regions. |
618 | | // NOTE: this should never happen in DB written by new RocksDB versions, |
619 | | // since we turn off mmap writes to manifest and log files |
620 | 0 | buffer_.clear(); |
621 | 0 | return kBadRecord; |
622 | 0 | } |
623 | | |
624 | | // Check crc |
625 | 1.39M | if (checksum_) { |
626 | 1.39M | uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); |
627 | 1.39M | uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); |
628 | 1.39M | if (actual_crc != expected_crc) { |
629 | | // Drop the rest of the buffer since "length" itself may have |
630 | | // been corrupted and if we trust it, we could find some |
631 | | // fragment of a real log record that just happens to look |
632 | | // like a valid log record. |
633 | 0 | *drop_size = buffer_.size(); |
634 | 0 | buffer_.clear(); |
635 | 0 | return kBadRecordChecksum; |
636 | 0 | } |
637 | 1.39M | } |
638 | | |
639 | 1.39M | buffer_.remove_prefix(header_size + length); |
640 | | |
641 | 1.39M | if (!uncompress_ || type == kSetCompressionType || |
642 | 0 | type == kPredecessorWALInfoType || |
643 | 0 | type == kRecyclePredecessorWALInfoType || |
644 | 0 | type == kUserDefinedTimestampSizeType || |
645 | 1.39M | type == kRecyclableUserDefinedTimestampSizeType) { |
646 | 1.39M | *result = Slice(header + header_size, length); |
647 | 1.39M | return type; |
648 | 1.39M | } else { |
649 | | // Uncompress compressed records |
650 | 0 | uncompressed_record_.clear(); |
651 | 0 | if (fragment_checksum != nullptr) { |
652 | 0 | if (uncompress_hash_state_ == nullptr) { |
653 | 0 | uncompress_hash_state_ = XXH3_createState(); |
654 | 0 | } |
655 | 0 | XXH3_64bits_reset(uncompress_hash_state_); |
656 | 0 | } |
657 | |
|
658 | 0 | size_t uncompressed_size = 0; |
659 | 0 | int remaining = 0; |
660 | 0 | const char* input = header + header_size; |
661 | 0 | do { |
662 | 0 | remaining = uncompress_->Uncompress( |
663 | 0 | input, length, uncompressed_buffer_.get(), &uncompressed_size); |
664 | 0 | input = nullptr; |
665 | 0 | if (remaining < 0) { |
666 | 0 | buffer_.clear(); |
667 | 0 | return kBadRecord; |
668 | 0 | } |
669 | 0 | if (uncompressed_size > 0) { |
670 | 0 | if (fragment_checksum != nullptr) { |
671 | 0 | XXH3_64bits_update(uncompress_hash_state_, |
672 | 0 | uncompressed_buffer_.get(), uncompressed_size); |
673 | 0 | } |
674 | 0 | uncompressed_record_.append(uncompressed_buffer_.get(), |
675 | 0 | uncompressed_size); |
676 | 0 | } |
677 | 0 | } while (remaining > 0 || uncompressed_size == kBlockSize); |
678 | | |
679 | 0 | if (fragment_checksum != nullptr) { |
680 | | // We can remove this check by updating hash_state_ directly, |
681 | | // but that requires resetting hash_state_ for full and first types |
682 | | // for edge cases like consecutive fist type records. |
683 | | // Leaving the check as is since it is cleaner and can revert to the |
684 | | // above approach if it causes performance impact. |
685 | 0 | *fragment_checksum = XXH3_64bits_digest(uncompress_hash_state_); |
686 | 0 | uint64_t actual_checksum = XXH3_64bits(uncompressed_record_.data(), |
687 | 0 | uncompressed_record_.size()); |
688 | 0 | if (*fragment_checksum != actual_checksum) { |
689 | | // uncompressed_record_ contains bad content that does not match |
690 | | // actual decompressed content |
691 | 0 | return kBadRecord; |
692 | 0 | } |
693 | 0 | } |
694 | 0 | *result = Slice(uncompressed_record_); |
695 | 0 | return type; |
696 | 0 | } |
697 | 1.39M | } |
698 | 1.48M | } |
699 | | |
700 | | // Initialize uncompress related fields |
701 | 0 | void Reader::InitCompression(const CompressionTypeRecord& compression_record) { |
702 | 0 | compression_type_ = compression_record.GetCompressionType(); |
703 | 0 | compression_type_record_read_ = true; |
704 | 0 | constexpr uint32_t compression_format_version = 2; |
705 | 0 | uncompress_ = StreamingUncompress::Create( |
706 | 0 | compression_type_, compression_format_version, kBlockSize); |
707 | 0 | assert(uncompress_ != nullptr); |
708 | 0 | uncompressed_buffer_ = std::unique_ptr<char[]>(new char[kBlockSize]); |
709 | 0 | assert(uncompressed_buffer_); |
710 | 0 | } |
711 | | |
712 | | Status Reader::UpdateRecordedTimestampSize( |
713 | 0 | const std::vector<std::pair<uint32_t, size_t>>& cf_to_ts_sz) { |
714 | 0 | for (const auto& [cf, ts_sz] : cf_to_ts_sz) { |
715 | | // Zero user-defined timestamp size are not recorded. |
716 | 0 | if (ts_sz == 0) { |
717 | 0 | return Status::Corruption( |
718 | 0 | "User-defined timestamp size record contains zero timestamp size."); |
719 | 0 | } |
720 | | // The user-defined timestamp size record for a column family should not be |
721 | | // updated in the same log file. |
722 | 0 | if (recorded_cf_to_ts_sz_.count(cf) != 0) { |
723 | 0 | return Status::Corruption( |
724 | 0 | "User-defined timestamp size record contains update to " |
725 | 0 | "recorded column family."); |
726 | 0 | } |
727 | 0 | recorded_cf_to_ts_sz_.insert(std::make_pair(cf, ts_sz)); |
728 | 0 | } |
729 | 0 | return Status::OK(); |
730 | 0 | } |
731 | | |
732 | | bool FragmentBufferedReader::ReadRecord(Slice* record, std::string* scratch, |
733 | | WALRecoveryMode wal_recovery_mode |
734 | | |
735 | | , |
736 | 0 | uint64_t* /* checksum */) { |
737 | 0 | assert(record != nullptr); |
738 | 0 | assert(scratch != nullptr); |
739 | 0 | record->clear(); |
740 | 0 | scratch->clear(); |
741 | 0 | if (uncompress_) { |
742 | 0 | uncompress_->Reset(); |
743 | 0 | } |
744 | |
|
745 | 0 | uint64_t prospective_record_offset = 0; |
746 | 0 | uint64_t physical_record_offset = end_of_buffer_offset_ - buffer_.size(); |
747 | 0 | size_t drop_size = 0; |
748 | 0 | uint8_t fragment_type_or_err = 0; // Initialize to make compiler happy |
749 | 0 | Slice fragment; |
750 | 0 | while (TryReadFragment(&fragment, &drop_size, &fragment_type_or_err)) { |
751 | 0 | switch (fragment_type_or_err) { |
752 | 0 | case kFullType: |
753 | 0 | case kRecyclableFullType: |
754 | 0 | if (in_fragmented_record_ && !fragments_.empty()) { |
755 | 0 | ReportCorruption(fragments_.size(), "partial record without end(1)"); |
756 | 0 | } |
757 | 0 | fragments_.clear(); |
758 | 0 | *record = fragment; |
759 | 0 | prospective_record_offset = physical_record_offset; |
760 | 0 | last_record_offset_ = prospective_record_offset; |
761 | 0 | first_record_read_ = true; |
762 | 0 | in_fragmented_record_ = false; |
763 | 0 | return true; |
764 | | |
765 | 0 | case kFirstType: |
766 | 0 | case kRecyclableFirstType: |
767 | 0 | if (in_fragmented_record_ || !fragments_.empty()) { |
768 | 0 | ReportCorruption(fragments_.size(), "partial record without end(2)"); |
769 | 0 | } |
770 | 0 | prospective_record_offset = physical_record_offset; |
771 | 0 | fragments_.assign(fragment.data(), fragment.size()); |
772 | 0 | in_fragmented_record_ = true; |
773 | 0 | break; |
774 | | |
775 | 0 | case kMiddleType: |
776 | 0 | case kRecyclableMiddleType: |
777 | 0 | if (!in_fragmented_record_) { |
778 | 0 | ReportCorruption(fragment.size(), |
779 | 0 | "missing start of fragmented record(1)"); |
780 | 0 | } else { |
781 | 0 | fragments_.append(fragment.data(), fragment.size()); |
782 | 0 | } |
783 | 0 | break; |
784 | | |
785 | 0 | case kLastType: |
786 | 0 | case kRecyclableLastType: |
787 | 0 | if (!in_fragmented_record_) { |
788 | 0 | ReportCorruption(fragment.size(), |
789 | 0 | "missing start of fragmented record(2)"); |
790 | 0 | } else { |
791 | 0 | fragments_.append(fragment.data(), fragment.size()); |
792 | 0 | scratch->assign(fragments_.data(), fragments_.size()); |
793 | 0 | fragments_.clear(); |
794 | 0 | *record = Slice(*scratch); |
795 | 0 | last_record_offset_ = prospective_record_offset; |
796 | 0 | first_record_read_ = true; |
797 | 0 | in_fragmented_record_ = false; |
798 | 0 | return true; |
799 | 0 | } |
800 | 0 | break; |
801 | | |
802 | 0 | case kSetCompressionType: { |
803 | 0 | if (compression_type_record_read_) { |
804 | 0 | ReportCorruption(fragment.size(), |
805 | 0 | "read multiple SetCompressionType records"); |
806 | 0 | } |
807 | 0 | if (first_record_read_) { |
808 | 0 | ReportCorruption(fragment.size(), |
809 | 0 | "SetCompressionType not the first record"); |
810 | 0 | } |
811 | 0 | fragments_.clear(); |
812 | 0 | prospective_record_offset = physical_record_offset; |
813 | 0 | last_record_offset_ = prospective_record_offset; |
814 | 0 | in_fragmented_record_ = false; |
815 | 0 | CompressionTypeRecord compression_record(kNoCompression); |
816 | 0 | Status s = compression_record.DecodeFrom(&fragment); |
817 | 0 | if (!s.ok()) { |
818 | 0 | ReportCorruption(fragment.size(), |
819 | 0 | "could not decode SetCompressionType record"); |
820 | 0 | } else { |
821 | 0 | InitCompression(compression_record); |
822 | 0 | } |
823 | 0 | break; |
824 | 0 | } |
825 | 0 | case kPredecessorWALInfoType: |
826 | 0 | case kRecyclePredecessorWALInfoType: { |
827 | 0 | fragments_.clear(); |
828 | 0 | prospective_record_offset = physical_record_offset; |
829 | 0 | last_record_offset_ = prospective_record_offset; |
830 | 0 | in_fragmented_record_ = false; |
831 | |
|
832 | 0 | PredecessorWALInfo recorded_predecessor_wal_info; |
833 | 0 | Status s = recorded_predecessor_wal_info.DecodeFrom(&fragment); |
834 | 0 | if (!s.ok()) { |
835 | 0 | ReportCorruption(fragment.size(), |
836 | 0 | "could not decode PredecessorWALInfoType record"); |
837 | 0 | } else { |
838 | 0 | MaybeVerifyPredecessorWALInfo(wal_recovery_mode, fragment, |
839 | 0 | recorded_predecessor_wal_info); |
840 | 0 | } |
841 | 0 | break; |
842 | 0 | } |
843 | 0 | case kUserDefinedTimestampSizeType: |
844 | 0 | case kRecyclableUserDefinedTimestampSizeType: { |
845 | 0 | if (in_fragmented_record_ && !scratch->empty()) { |
846 | 0 | ReportCorruption( |
847 | 0 | scratch->size(), |
848 | 0 | "user-defined timestamp size record interspersed partial record"); |
849 | 0 | } |
850 | 0 | fragments_.clear(); |
851 | 0 | prospective_record_offset = physical_record_offset; |
852 | 0 | last_record_offset_ = prospective_record_offset; |
853 | 0 | in_fragmented_record_ = false; |
854 | 0 | UserDefinedTimestampSizeRecord ts_record; |
855 | 0 | Status s = ts_record.DecodeFrom(&fragment); |
856 | 0 | if (!s.ok()) { |
857 | 0 | ReportCorruption( |
858 | 0 | fragment.size(), |
859 | 0 | "could not decode user-defined timestamp size record"); |
860 | 0 | } else { |
861 | 0 | s = UpdateRecordedTimestampSize( |
862 | 0 | ts_record.GetUserDefinedTimestampSize()); |
863 | 0 | if (!s.ok()) { |
864 | 0 | ReportCorruption(fragment.size(), s.getState()); |
865 | 0 | } |
866 | 0 | } |
867 | 0 | break; |
868 | 0 | } |
869 | | |
870 | 0 | case kBadHeader: |
871 | 0 | case kBadRecord: |
872 | 0 | case kEof: |
873 | 0 | case kOldRecord: |
874 | 0 | if (in_fragmented_record_) { |
875 | 0 | ReportCorruption(fragments_.size(), "error in middle of record"); |
876 | 0 | in_fragmented_record_ = false; |
877 | 0 | fragments_.clear(); |
878 | 0 | } |
879 | 0 | break; |
880 | | |
881 | 0 | case kBadRecordChecksum: |
882 | 0 | if (recycled_) { |
883 | 0 | fragments_.clear(); |
884 | 0 | return false; |
885 | 0 | } |
886 | 0 | ReportCorruption(drop_size, "checksum mismatch"); |
887 | 0 | if (in_fragmented_record_) { |
888 | 0 | ReportCorruption(fragments_.size(), "error in middle of record"); |
889 | 0 | in_fragmented_record_ = false; |
890 | 0 | fragments_.clear(); |
891 | 0 | } |
892 | 0 | break; |
893 | | |
894 | 0 | default: { |
895 | 0 | if ((fragment_type_or_err & kRecordTypeSafeIgnoreMask) == 0) { |
896 | 0 | std::string reason = |
897 | 0 | "unknown record type " + std::to_string(fragment_type_or_err); |
898 | 0 | ReportCorruption( |
899 | 0 | fragment.size() + (in_fragmented_record_ ? fragments_.size() : 0), |
900 | 0 | reason.c_str()); |
901 | 0 | } |
902 | 0 | in_fragmented_record_ = false; |
903 | 0 | fragments_.clear(); |
904 | 0 | break; |
905 | 0 | } |
906 | 0 | } |
907 | 0 | } |
908 | 0 | return false; |
909 | 0 | } |
910 | | |
911 | 0 | void FragmentBufferedReader::UnmarkEOF() { |
912 | 0 | if (read_error_) { |
913 | 0 | return; |
914 | 0 | } |
915 | 0 | eof_ = false; |
916 | 0 | UnmarkEOFInternal(); |
917 | 0 | } |
918 | | |
919 | 0 | bool FragmentBufferedReader::TryReadMore(size_t* drop_size, uint8_t* error) { |
920 | 0 | if (!eof_ && !read_error_) { |
921 | | // Last read was a full read, so this is a trailer to skip |
922 | 0 | buffer_.clear(); |
923 | | // TODO: rate limit log reader with approriate priority. |
924 | | // TODO: avoid overcharging rate limiter: |
925 | | // Note that the Read here might overcharge SequentialFileReader's internal |
926 | | // rate limiter if priority is not IO_TOTAL, e.g., when there is not enough |
927 | | // content left until EOF to read. |
928 | 0 | Status status = file_->Read(kBlockSize, &buffer_, backing_store_, |
929 | 0 | Env::IO_TOTAL /* rate_limiter_priority */); |
930 | 0 | end_of_buffer_offset_ += buffer_.size(); |
931 | 0 | if (!status.ok()) { |
932 | 0 | buffer_.clear(); |
933 | 0 | ReportDrop(kBlockSize, status); |
934 | 0 | read_error_ = true; |
935 | 0 | *error = kEof; |
936 | 0 | return false; |
937 | 0 | } else if (buffer_.size() < static_cast<size_t>(kBlockSize)) { |
938 | 0 | eof_ = true; |
939 | 0 | eof_offset_ = buffer_.size(); |
940 | 0 | TEST_SYNC_POINT_CALLBACK( |
941 | 0 | "FragmentBufferedLogReader::TryReadMore:FirstEOF", nullptr); |
942 | 0 | } |
943 | 0 | return true; |
944 | 0 | } else if (!read_error_) { |
945 | 0 | UnmarkEOF(); |
946 | 0 | } |
947 | 0 | if (!read_error_) { |
948 | 0 | return true; |
949 | 0 | } |
950 | 0 | *error = kEof; |
951 | 0 | *drop_size = buffer_.size(); |
952 | 0 | if (buffer_.size() > 0) { |
953 | 0 | *error = kBadHeader; |
954 | 0 | } |
955 | 0 | buffer_.clear(); |
956 | 0 | return false; |
957 | 0 | } |
958 | | |
959 | | // return true if the caller should process the fragment_type_or_err. |
960 | | bool FragmentBufferedReader::TryReadFragment(Slice* fragment, size_t* drop_size, |
961 | 0 | uint8_t* fragment_type_or_err) { |
962 | 0 | assert(fragment != nullptr); |
963 | 0 | assert(drop_size != nullptr); |
964 | 0 | assert(fragment_type_or_err != nullptr); |
965 | |
|
966 | 0 | while (buffer_.size() < static_cast<size_t>(kHeaderSize)) { |
967 | 0 | size_t old_size = buffer_.size(); |
968 | 0 | uint8_t error = kEof; |
969 | 0 | if (!TryReadMore(drop_size, &error)) { |
970 | 0 | *fragment_type_or_err = error; |
971 | 0 | return false; |
972 | 0 | } else if (old_size == buffer_.size()) { |
973 | 0 | return false; |
974 | 0 | } |
975 | 0 | } |
976 | 0 | const char* header = buffer_.data(); |
977 | 0 | const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff; |
978 | 0 | const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff; |
979 | 0 | const uint8_t type = static_cast<uint8_t>(header[6]); |
980 | 0 | const uint32_t length = a | (b << 8); |
981 | 0 | int header_size = kHeaderSize; |
982 | 0 | if ((type >= kRecyclableFullType && type <= kRecyclableLastType) || |
983 | 0 | type == kRecyclableUserDefinedTimestampSizeType || |
984 | 0 | type == kRecyclePredecessorWALInfoType) { |
985 | 0 | if (first_record_read_ && !recycled_) { |
986 | | // A recycled log should have started with a recycled record |
987 | 0 | *fragment_type_or_err = kBadRecord; |
988 | 0 | return true; |
989 | 0 | } |
990 | 0 | recycled_ = true; |
991 | 0 | header_size = kRecyclableHeaderSize; |
992 | 0 | while (buffer_.size() < static_cast<size_t>(kRecyclableHeaderSize)) { |
993 | 0 | size_t old_size = buffer_.size(); |
994 | 0 | uint8_t error = kEof; |
995 | 0 | if (!TryReadMore(drop_size, &error)) { |
996 | 0 | *fragment_type_or_err = error; |
997 | 0 | return false; |
998 | 0 | } else if (old_size == buffer_.size()) { |
999 | 0 | return false; |
1000 | 0 | } |
1001 | 0 | } |
1002 | 0 | const uint32_t log_num = DecodeFixed32(header + 7); |
1003 | 0 | if (log_num != log_number_) { |
1004 | 0 | *fragment_type_or_err = kOldRecord; |
1005 | 0 | return true; |
1006 | 0 | } |
1007 | 0 | } |
1008 | | |
1009 | 0 | while (header_size + length > buffer_.size()) { |
1010 | 0 | size_t old_size = buffer_.size(); |
1011 | 0 | uint8_t error = kEof; |
1012 | 0 | if (!TryReadMore(drop_size, &error)) { |
1013 | 0 | *fragment_type_or_err = error; |
1014 | 0 | return false; |
1015 | 0 | } else if (old_size == buffer_.size()) { |
1016 | 0 | return false; |
1017 | 0 | } |
1018 | 0 | } |
1019 | | |
1020 | 0 | if (type == kZeroType && length == 0) { |
1021 | 0 | buffer_.clear(); |
1022 | 0 | *fragment_type_or_err = kBadRecord; |
1023 | 0 | return true; |
1024 | 0 | } |
1025 | | |
1026 | 0 | if (checksum_) { |
1027 | 0 | uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header)); |
1028 | 0 | uint32_t actual_crc = crc32c::Value(header + 6, length + header_size - 6); |
1029 | 0 | if (actual_crc != expected_crc) { |
1030 | 0 | *drop_size = buffer_.size(); |
1031 | 0 | buffer_.clear(); |
1032 | 0 | *fragment_type_or_err = kBadRecordChecksum; |
1033 | 0 | return true; |
1034 | 0 | } |
1035 | 0 | } |
1036 | | |
1037 | 0 | buffer_.remove_prefix(header_size + length); |
1038 | |
|
1039 | 0 | if (!uncompress_ || type == kSetCompressionType || |
1040 | 0 | type == kPredecessorWALInfoType || |
1041 | 0 | type == kRecyclePredecessorWALInfoType || |
1042 | 0 | type == kUserDefinedTimestampSizeType || |
1043 | 0 | type == kRecyclableUserDefinedTimestampSizeType) { |
1044 | 0 | *fragment = Slice(header + header_size, length); |
1045 | 0 | *fragment_type_or_err = type; |
1046 | 0 | return true; |
1047 | 0 | } else { |
1048 | | // Uncompress compressed records |
1049 | 0 | uncompressed_record_.clear(); |
1050 | 0 | size_t uncompressed_size = 0; |
1051 | 0 | int remaining = 0; |
1052 | 0 | const char* input = header + header_size; |
1053 | 0 | do { |
1054 | 0 | remaining = uncompress_->Uncompress( |
1055 | 0 | input, length, uncompressed_buffer_.get(), &uncompressed_size); |
1056 | 0 | input = nullptr; |
1057 | 0 | if (remaining < 0) { |
1058 | 0 | buffer_.clear(); |
1059 | 0 | *fragment_type_or_err = kBadRecord; |
1060 | 0 | return true; |
1061 | 0 | } |
1062 | 0 | if (uncompressed_size > 0) { |
1063 | 0 | uncompressed_record_.append(uncompressed_buffer_.get(), |
1064 | 0 | uncompressed_size); |
1065 | 0 | } |
1066 | 0 | } while (remaining > 0 || uncompressed_size == kBlockSize); |
1067 | 0 | *fragment = Slice(std::move(uncompressed_record_)); |
1068 | 0 | *fragment_type_or_err = type; |
1069 | 0 | return true; |
1070 | 0 | } |
1071 | 0 | } |
1072 | | |
1073 | | } // namespace ROCKSDB_NAMESPACE::log |