/src/rocksdb/db/wal_manager.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/wal_manager.h" |
11 | | |
12 | | #include <algorithm> |
13 | | #include <cinttypes> |
14 | | #include <memory> |
15 | | #include <vector> |
16 | | |
17 | | #include "db/log_reader.h" |
18 | | #include "db/log_writer.h" |
19 | | #include "db/transaction_log_impl.h" |
20 | | #include "db/write_batch_internal.h" |
21 | | #include "file/file_util.h" |
22 | | #include "file/filename.h" |
23 | | #include "file/sequence_file_reader.h" |
24 | | #include "logging/logging.h" |
25 | | #include "port/port.h" |
26 | | #include "rocksdb/env.h" |
27 | | #include "rocksdb/options.h" |
28 | | #include "rocksdb/write_batch.h" |
29 | | #include "test_util/sync_point.h" |
30 | | #include "util/cast_util.h" |
31 | | #include "util/coding.h" |
32 | | #include "util/mutexlock.h" |
33 | | #include "util/string_util.h" |
34 | | |
35 | | namespace ROCKSDB_NAMESPACE { |
36 | | |
37 | | |
38 | 0 | Status WalManager::DeleteFile(const std::string& fname, uint64_t number) { |
39 | 0 | auto s = env_->DeleteFile(wal_dir_ + "/" + fname); |
40 | 0 | if (s.ok()) { |
41 | 0 | MutexLock l(&read_first_record_cache_mutex_); |
42 | 0 | read_first_record_cache_.erase(number); |
43 | 0 | } |
44 | 0 | return s; |
45 | 0 | } |
46 | | Status WalManager::GetSortedWalFiles(VectorWalPtr& files, bool need_seqnos, |
47 | 0 | bool include_archived) { |
48 | | // First get sorted files in db dir, then get sorted files from archived |
49 | | // dir, to avoid a race condition where a log file is moved to archived |
50 | | // dir in between. |
51 | 0 | Status s; |
52 | | // list wal files in main db dir. |
53 | 0 | VectorWalPtr logs; |
54 | 0 | s = GetSortedWalsOfType(wal_dir_, logs, kAliveLogFile, need_seqnos); |
55 | |
|
56 | 0 | if (!include_archived || !s.ok()) { |
57 | 0 | return s; |
58 | 0 | } |
59 | | |
60 | | // Reproduce the race condition where a log file is moved |
61 | | // to archived dir, between these two sync points, used in |
62 | | // (DBTest,TransactionLogIteratorRace) |
63 | 0 | TEST_SYNC_POINT("WalManager::GetSortedWalFiles:1"); |
64 | 0 | TEST_SYNC_POINT("WalManager::GetSortedWalFiles:2"); |
65 | |
|
66 | 0 | files.clear(); |
67 | | // list wal files in archive dir. |
68 | 0 | std::string archivedir = ArchivalDirectory(wal_dir_); |
69 | 0 | Status exists = env_->FileExists(archivedir); |
70 | 0 | if (exists.ok()) { |
71 | 0 | s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile, need_seqnos); |
72 | 0 | if (!s.ok()) { |
73 | 0 | return s; |
74 | 0 | } |
75 | 0 | } else if (!exists.IsNotFound()) { |
76 | 0 | assert(s.ok()); |
77 | 0 | return exists; |
78 | 0 | } |
79 | | |
80 | 0 | uint64_t latest_archived_log_number = 0; |
81 | 0 | if (!files.empty()) { |
82 | 0 | latest_archived_log_number = files.back()->LogNumber(); |
83 | 0 | ROCKS_LOG_INFO(db_options_.info_log, "Latest Archived log: %" PRIu64, |
84 | 0 | latest_archived_log_number); |
85 | 0 | } |
86 | |
|
87 | 0 | files.reserve(files.size() + logs.size()); |
88 | 0 | for (auto& log : logs) { |
89 | 0 | if (log->LogNumber() > latest_archived_log_number) { |
90 | 0 | files.push_back(std::move(log)); |
91 | 0 | } else { |
92 | | // When the race condition happens, we could see the |
93 | | // same log in both db dir and archived dir. Simply |
94 | | // ignore the one in db dir. Note that, if we read |
95 | | // archived dir first, we would have missed the log file. |
96 | 0 | ROCKS_LOG_WARN(db_options_.info_log, "%s already moved to archive", |
97 | 0 | log->PathName().c_str()); |
98 | 0 | } |
99 | 0 | } |
100 | |
|
101 | 0 | return s; |
102 | 0 | } |
103 | | |
104 | | Status WalManager::GetUpdatesSince( |
105 | | SequenceNumber seq, std::unique_ptr<TransactionLogIterator>* iter, |
106 | | const TransactionLogIterator::ReadOptions& read_options, |
107 | 0 | VersionSet* version_set) { |
108 | 0 | if (seq_per_batch_) { |
109 | 0 | return Status::NotSupported(); |
110 | 0 | } |
111 | | |
112 | 0 | assert(!seq_per_batch_); |
113 | | |
114 | | // Get all sorted Wal Files. |
115 | | // Do binary search and open files and find the seq number. |
116 | |
|
117 | 0 | std::unique_ptr<VectorWalPtr> wal_files(new VectorWalPtr); |
118 | 0 | Status s = GetSortedWalFiles(*wal_files); |
119 | 0 | if (!s.ok()) { |
120 | 0 | return s; |
121 | 0 | } |
122 | | |
123 | 0 | s = RetainProbableWalFiles(*wal_files, seq); |
124 | 0 | if (!s.ok()) { |
125 | 0 | return s; |
126 | 0 | } |
127 | 0 | iter->reset(new TransactionLogIteratorImpl( |
128 | 0 | wal_dir_, &db_options_, read_options, file_options_, seq, |
129 | 0 | std::move(wal_files), version_set, seq_per_batch_, io_tracer_)); |
130 | 0 | return (*iter)->status(); |
131 | 0 | } |
132 | | |
133 | | // 1. Go through all archived files and |
134 | | // a. if ttl is enabled, delete outdated files |
135 | | // b. if archive size limit is enabled, delete empty files, |
136 | | // compute file number and size. |
137 | | // 2. If size limit is enabled: |
138 | | // a. compute how many files should be deleted |
139 | | // b. get sorted non-empty archived logs |
140 | | // c. delete what should be deleted |
141 | 49.9k | void WalManager::PurgeObsoleteWALFiles() { |
142 | 49.9k | bool const ttl_enabled = db_options_.WAL_ttl_seconds > 0; |
143 | 49.9k | bool const size_limit_enabled = db_options_.WAL_size_limit_MB > 0; |
144 | 49.9k | if (!ttl_enabled && !size_limit_enabled) { |
145 | 49.9k | return; |
146 | 49.9k | } |
147 | | |
148 | 0 | int64_t current_time = 0; |
149 | 0 | Status s = db_options_.clock->GetCurrentTime(¤t_time); |
150 | 0 | if (!s.ok()) { |
151 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s", |
152 | 0 | s.ToString().c_str()); |
153 | 0 | assert(false); |
154 | 0 | return; |
155 | 0 | } |
156 | 0 | uint64_t const now_seconds = static_cast<uint64_t>(current_time); |
157 | 0 | uint64_t const time_to_check = |
158 | 0 | ttl_enabled |
159 | 0 | ? std::min(kDefaultIntervalToDeleteObsoleteWAL, |
160 | 0 | std::max(uint64_t{1}, db_options_.WAL_ttl_seconds / 2)) |
161 | 0 | : kDefaultIntervalToDeleteObsoleteWAL; |
162 | 0 | uint64_t old_last_run_time = purge_wal_files_last_run_.LoadRelaxed(); |
163 | 0 | do { |
164 | 0 | if (old_last_run_time + time_to_check > now_seconds) { |
165 | | // last run is recent enough, no need to purge |
166 | 0 | return; |
167 | 0 | } |
168 | 0 | } while (!purge_wal_files_last_run_.CasWeakRelaxed( |
169 | 0 | /*expected=*/old_last_run_time, /*desired=*/now_seconds)); |
170 | | |
171 | 0 | std::string archival_dir = ArchivalDirectory(wal_dir_); |
172 | 0 | std::vector<std::string> files; |
173 | 0 | s = env_->GetChildren(archival_dir, &files); |
174 | 0 | if (!s.ok()) { |
175 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, "Can't get archive files: %s", |
176 | 0 | s.ToString().c_str()); |
177 | 0 | return; |
178 | 0 | } |
179 | | |
180 | 0 | size_t log_files_num = 0; |
181 | 0 | uint64_t log_file_size = 0; |
182 | 0 | for (auto& f : files) { |
183 | 0 | uint64_t number; |
184 | 0 | FileType type; |
185 | 0 | if (ParseFileName(f, &number, &type) && type == kWalFile) { |
186 | 0 | std::string const file_path = archival_dir + "/" + f; |
187 | 0 | if (ttl_enabled) { |
188 | 0 | uint64_t file_m_time; |
189 | 0 | s = env_->GetFileModificationTime(file_path, &file_m_time); |
190 | 0 | if (!s.ok()) { |
191 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
192 | 0 | "Can't get file mod time: %s: %s", file_path.c_str(), |
193 | 0 | s.ToString().c_str()); |
194 | 0 | continue; |
195 | 0 | } |
196 | 0 | if (now_seconds - file_m_time > db_options_.WAL_ttl_seconds) { |
197 | 0 | s = DeleteDBFile(&db_options_, file_path, archival_dir, false, |
198 | 0 | /*force_fg=*/!wal_in_db_path_); |
199 | 0 | if (!s.ok()) { |
200 | 0 | ROCKS_LOG_WARN(db_options_.info_log, "Can't delete file: %s: %s", |
201 | 0 | file_path.c_str(), s.ToString().c_str()); |
202 | 0 | continue; |
203 | 0 | } else { |
204 | 0 | MutexLock l(&read_first_record_cache_mutex_); |
205 | 0 | read_first_record_cache_.erase(number); |
206 | 0 | } |
207 | 0 | continue; |
208 | 0 | } |
209 | 0 | } |
210 | | |
211 | 0 | if (size_limit_enabled) { |
212 | 0 | uint64_t file_size; |
213 | 0 | s = env_->GetFileSize(file_path, &file_size); |
214 | 0 | if (!s.ok()) { |
215 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, |
216 | 0 | "Unable to get file size: %s: %s", file_path.c_str(), |
217 | 0 | s.ToString().c_str()); |
218 | 0 | return; |
219 | 0 | } else { |
220 | 0 | if (file_size > 0) { |
221 | 0 | log_file_size = std::max(log_file_size, file_size); |
222 | 0 | ++log_files_num; |
223 | 0 | } else { |
224 | 0 | s = DeleteDBFile(&db_options_, file_path, archival_dir, false, |
225 | 0 | /*force_fg=*/!wal_in_db_path_); |
226 | 0 | if (!s.ok()) { |
227 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
228 | 0 | "Unable to delete file: %s: %s", file_path.c_str(), |
229 | 0 | s.ToString().c_str()); |
230 | 0 | continue; |
231 | 0 | } else { |
232 | 0 | MutexLock l(&read_first_record_cache_mutex_); |
233 | 0 | read_first_record_cache_.erase(number); |
234 | 0 | } |
235 | 0 | } |
236 | 0 | } |
237 | 0 | } |
238 | 0 | } |
239 | 0 | } |
240 | | |
241 | 0 | if (0 == log_files_num || !size_limit_enabled) { |
242 | 0 | return; |
243 | 0 | } |
244 | | |
245 | 0 | size_t const files_keep_num = static_cast<size_t>( |
246 | 0 | db_options_.WAL_size_limit_MB * 1024 * 1024 / log_file_size); |
247 | 0 | if (log_files_num <= files_keep_num) { |
248 | 0 | return; |
249 | 0 | } |
250 | | |
251 | 0 | size_t files_del_num = log_files_num - files_keep_num; |
252 | 0 | VectorWalPtr archived_logs; |
253 | 0 | s = GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile, |
254 | 0 | /*need_seqno=*/false); |
255 | 0 | if (!s.ok()) { |
256 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
257 | 0 | "Unable to get archived WALs from: %s: %s", |
258 | 0 | archival_dir.c_str(), s.ToString().c_str()); |
259 | 0 | files_del_num = 0; |
260 | 0 | } else if (files_del_num > archived_logs.size()) { |
261 | 0 | ROCKS_LOG_WARN(db_options_.info_log, |
262 | 0 | "Trying to delete more archived log files than " |
263 | 0 | "exist. Deleting all"); |
264 | 0 | files_del_num = archived_logs.size(); |
265 | 0 | } |
266 | |
|
267 | 0 | for (size_t i = 0; i < files_del_num; ++i) { |
268 | 0 | std::string const file_path = archived_logs[i]->PathName(); |
269 | 0 | s = DeleteDBFile(&db_options_, wal_dir_ + "/" + file_path, wal_dir_, false, |
270 | 0 | /*force_fg=*/!wal_in_db_path_); |
271 | 0 | if (!s.ok()) { |
272 | 0 | ROCKS_LOG_WARN(db_options_.info_log, "Unable to delete file: %s: %s", |
273 | 0 | file_path.c_str(), s.ToString().c_str()); |
274 | 0 | continue; |
275 | 0 | } else { |
276 | 0 | MutexLock l(&read_first_record_cache_mutex_); |
277 | 0 | read_first_record_cache_.erase(archived_logs[i]->LogNumber()); |
278 | 0 | } |
279 | 0 | } |
280 | 0 | } |
281 | | |
282 | 0 | void WalManager::ArchiveWALFile(const std::string& fname, uint64_t number) { |
283 | 0 | auto archived_log_name = ArchivedLogFileName(wal_dir_, number); |
284 | | // The sync point below is used in (DBTest,TransactionLogIteratorRace) |
285 | 0 | TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:1"); |
286 | 0 | Status s = env_->RenameFile(fname, archived_log_name); |
287 | | // The sync point below is used in (DBTest,TransactionLogIteratorRace) |
288 | 0 | TEST_SYNC_POINT("WalManager::PurgeObsoleteFiles:2"); |
289 | | // The sync point below is used in |
290 | | // (CheckPointTest, CheckpointWithArchievedLog) |
291 | 0 | TEST_SYNC_POINT("WalManager::ArchiveWALFile"); |
292 | 0 | ROCKS_LOG_INFO(db_options_.info_log, "Move log file %s to %s -- %s\n", |
293 | 0 | fname.c_str(), archived_log_name.c_str(), |
294 | 0 | s.ToString().c_str()); |
295 | 0 | } |
296 | | |
297 | | Status WalManager::GetSortedWalsOfType(const std::string& path, |
298 | | VectorWalPtr& log_files, |
299 | 0 | WalFileType log_type, bool need_seqnos) { |
300 | 0 | std::vector<std::string> all_files; |
301 | 0 | const Status status = env_->GetChildren(path, &all_files); |
302 | 0 | if (!status.ok()) { |
303 | 0 | return status; |
304 | 0 | } |
305 | 0 | log_files.reserve(all_files.size()); |
306 | 0 | for (const auto& f : all_files) { |
307 | 0 | uint64_t number; |
308 | 0 | FileType type; |
309 | 0 | if (ParseFileName(f, &number, &type) && type == kWalFile) { |
310 | 0 | SequenceNumber sequence; |
311 | 0 | if (need_seqnos) { |
312 | 0 | Status s = ReadFirstRecord(log_type, number, &sequence); |
313 | 0 | if (!s.ok()) { |
314 | 0 | return s; |
315 | 0 | } |
316 | 0 | if (sequence == 0) { |
317 | | // empty file |
318 | 0 | continue; |
319 | 0 | } |
320 | 0 | } else { |
321 | 0 | sequence = 0; |
322 | 0 | } |
323 | | |
324 | | // Reproduce the race condition where a log file is moved |
325 | | // to archived dir, between these two sync points, used in |
326 | | // (DBTest,TransactionLogIteratorRace) |
327 | 0 | TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:1"); |
328 | 0 | TEST_SYNC_POINT("WalManager::GetSortedWalsOfType:2"); |
329 | |
|
330 | 0 | uint64_t size_bytes; |
331 | 0 | Status s = env_->GetFileSize(LogFileName(path, number), &size_bytes); |
332 | | // re-try in case the alive log file has been moved to archive. |
333 | 0 | if (!s.ok() && log_type == kAliveLogFile) { |
334 | 0 | std::string archived_file = ArchivedLogFileName(path, number); |
335 | 0 | if (env_->FileExists(archived_file).ok()) { |
336 | 0 | s = env_->GetFileSize(archived_file, &size_bytes); |
337 | 0 | if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) { |
338 | | // oops, the file just got deleted from archived dir! move on |
339 | 0 | s = Status::OK(); |
340 | 0 | continue; |
341 | 0 | } |
342 | 0 | } |
343 | 0 | } |
344 | 0 | if (!s.ok()) { |
345 | 0 | return s; |
346 | 0 | } |
347 | | |
348 | 0 | log_files.emplace_back( |
349 | 0 | new WalFileImpl(number, log_type, sequence, size_bytes)); |
350 | 0 | } |
351 | 0 | } |
352 | 0 | std::sort( |
353 | 0 | log_files.begin(), log_files.end(), |
354 | 0 | [](const std::unique_ptr<WalFile>& a, const std::unique_ptr<WalFile>& b) { |
355 | 0 | WalFileImpl* a_impl = static_cast_with_check<WalFileImpl>(a.get()); |
356 | 0 | WalFileImpl* b_impl = static_cast_with_check<WalFileImpl>(b.get()); |
357 | 0 | return *a_impl < *b_impl; |
358 | 0 | }); |
359 | 0 | return status; |
360 | 0 | } |
361 | | |
362 | | Status WalManager::RetainProbableWalFiles(VectorWalPtr& all_logs, |
363 | 0 | const SequenceNumber target) { |
364 | 0 | int64_t start = 0; // signed to avoid overflow when target is < first file. |
365 | 0 | int64_t end = static_cast<int64_t>(all_logs.size()) - 1; |
366 | | // Binary Search. avoid opening all files. |
367 | 0 | while (end >= start) { |
368 | 0 | int64_t mid = start + (end - start) / 2; // Avoid overflow. |
369 | 0 | SequenceNumber current_seq_num = |
370 | 0 | all_logs.at(static_cast<size_t>(mid))->StartSequence(); |
371 | 0 | if (current_seq_num == target) { |
372 | 0 | end = mid; |
373 | 0 | break; |
374 | 0 | } else if (current_seq_num < target) { |
375 | 0 | start = mid + 1; |
376 | 0 | } else { |
377 | 0 | end = mid - 1; |
378 | 0 | } |
379 | 0 | } |
380 | | // end could be -ve. |
381 | 0 | size_t start_index = |
382 | 0 | static_cast<size_t>(std::max(static_cast<int64_t>(0), end)); |
383 | | // The last wal file is always included |
384 | 0 | all_logs.erase(all_logs.begin(), all_logs.begin() + start_index); |
385 | 0 | return Status::OK(); |
386 | 0 | } |
387 | | |
388 | | Status WalManager::ReadFirstRecord(const WalFileType type, |
389 | | const uint64_t number, |
390 | 0 | SequenceNumber* sequence) { |
391 | 0 | *sequence = 0; |
392 | 0 | if (type != kAliveLogFile && type != kArchivedLogFile) { |
393 | 0 | ROCKS_LOG_ERROR(db_options_.info_log, "[WalManger] Unknown file type %s", |
394 | 0 | std::to_string(type).c_str()); |
395 | 0 | return Status::NotSupported("File Type Not Known " + std::to_string(type)); |
396 | 0 | } |
397 | 0 | { |
398 | 0 | MutexLock l(&read_first_record_cache_mutex_); |
399 | 0 | auto itr = read_first_record_cache_.find(number); |
400 | 0 | if (itr != read_first_record_cache_.end()) { |
401 | 0 | *sequence = itr->second; |
402 | 0 | return Status::OK(); |
403 | 0 | } |
404 | 0 | } |
405 | 0 | Status s; |
406 | 0 | if (type == kAliveLogFile) { |
407 | 0 | std::string fname = LogFileName(wal_dir_, number); |
408 | 0 | s = ReadFirstLine(fname, number, sequence); |
409 | 0 | if (!s.ok() && env_->FileExists(fname).ok()) { |
410 | | // return any error that is not caused by non-existing file |
411 | 0 | return s; |
412 | 0 | } |
413 | 0 | } |
414 | | |
415 | 0 | if (type == kArchivedLogFile || !s.ok()) { |
416 | | // check if the file got moved to archive. |
417 | 0 | std::string archived_file = ArchivedLogFileName(wal_dir_, number); |
418 | 0 | s = ReadFirstLine(archived_file, number, sequence); |
419 | | // maybe the file was deleted from archive dir. If that's the case, return |
420 | | // Status::OK(). The caller with identify this as empty file because |
421 | | // *sequence == 0 |
422 | 0 | if (!s.ok() && env_->FileExists(archived_file).IsNotFound()) { |
423 | 0 | return Status::OK(); |
424 | 0 | } |
425 | 0 | } |
426 | | |
427 | 0 | if (s.ok() && *sequence != 0) { |
428 | 0 | MutexLock l(&read_first_record_cache_mutex_); |
429 | 0 | read_first_record_cache_.insert({number, *sequence}); |
430 | 0 | } |
431 | 0 | return s; |
432 | 0 | } |
433 | | |
434 | | Status WalManager::GetLiveWalFile(uint64_t number, |
435 | 0 | std::unique_ptr<WalFile>* log_file) { |
436 | 0 | if (!log_file) { |
437 | 0 | return Status::InvalidArgument("log_file not preallocated."); |
438 | 0 | } |
439 | | |
440 | 0 | if (!number) { |
441 | 0 | return Status::PathNotFound("log file not available"); |
442 | 0 | } |
443 | | |
444 | 0 | Status s; |
445 | |
|
446 | 0 | uint64_t size_bytes; |
447 | 0 | s = env_->GetFileSize(LogFileName(wal_dir_, number), &size_bytes); |
448 | |
|
449 | 0 | if (!s.ok()) { |
450 | 0 | return s; |
451 | 0 | } |
452 | | |
453 | 0 | log_file->reset(new WalFileImpl(number, kAliveLogFile, |
454 | 0 | 0, // SequenceNumber |
455 | 0 | size_bytes)); |
456 | |
|
457 | 0 | return Status::OK(); |
458 | 0 | } |
459 | | |
460 | | // the function returns status.ok() and sequence == 0 if the file exists, but is |
461 | | // empty |
462 | | Status WalManager::ReadFirstLine(const std::string& fname, |
463 | | const uint64_t number, |
464 | 0 | SequenceNumber* sequence) { |
465 | 0 | struct LogReporter : public log::Reader::Reporter { |
466 | 0 | Env* env; |
467 | 0 | Logger* info_log; |
468 | 0 | const char* fname; |
469 | |
|
470 | 0 | Status* status; |
471 | 0 | bool ignore_error; // true if db_options_.paranoid_checks==false |
472 | 0 | void Corruption(size_t bytes, const Status& s) override { |
473 | 0 | ROCKS_LOG_WARN(info_log, "[WalManager] %s%s: dropping %d bytes; %s", |
474 | 0 | (this->ignore_error ? "(ignoring error) " : ""), fname, |
475 | 0 | static_cast<int>(bytes), s.ToString().c_str()); |
476 | 0 | if (this->status->ok()) { |
477 | | // only keep the first error |
478 | 0 | *this->status = s; |
479 | 0 | } |
480 | 0 | } |
481 | 0 | }; |
482 | |
|
483 | 0 | std::unique_ptr<FSSequentialFile> file; |
484 | 0 | Status status = fs_->NewSequentialFile( |
485 | 0 | fname, fs_->OptimizeForLogRead(file_options_), &file, nullptr); |
486 | 0 | std::unique_ptr<SequentialFileReader> file_reader( |
487 | 0 | new SequentialFileReader(std::move(file), fname, io_tracer_)); |
488 | |
|
489 | 0 | if (!status.ok()) { |
490 | 0 | return status; |
491 | 0 | } |
492 | | |
493 | 0 | LogReporter reporter; |
494 | 0 | reporter.env = env_; |
495 | 0 | reporter.info_log = db_options_.info_log.get(); |
496 | 0 | reporter.fname = fname.c_str(); |
497 | 0 | reporter.status = &status; |
498 | 0 | reporter.ignore_error = !db_options_.paranoid_checks; |
499 | 0 | log::Reader reader(db_options_.info_log, std::move(file_reader), &reporter, |
500 | 0 | true /*checksum*/, number); |
501 | 0 | std::string scratch; |
502 | 0 | Slice record; |
503 | |
|
504 | 0 | if (reader.ReadRecord(&record, &scratch) && |
505 | 0 | (status.ok() || !db_options_.paranoid_checks)) { |
506 | 0 | if (record.size() < WriteBatchInternal::kHeader) { |
507 | 0 | reporter.Corruption(record.size(), |
508 | 0 | Status::Corruption("log record too small")); |
509 | | // TODO read record's till the first no corrupt entry? |
510 | 0 | } else { |
511 | 0 | WriteBatch batch; |
512 | | // We can overwrite an existing non-OK Status since it'd only reach here |
513 | | // with `paranoid_checks == false`. |
514 | 0 | status = WriteBatchInternal::SetContents(&batch, record); |
515 | 0 | if (status.ok()) { |
516 | 0 | *sequence = WriteBatchInternal::Sequence(&batch); |
517 | 0 | return status; |
518 | 0 | } |
519 | 0 | } |
520 | 0 | } |
521 | | |
522 | 0 | if (status.ok() && reader.IsCompressedAndEmptyFile()) { |
523 | | // In case of wal_compression, it writes a `kSetCompressionType` record |
524 | | // which is not associated with any sequence number. As result for an empty |
525 | | // file, GetSortedWalsOfType() will skip these WALs causing the operations |
526 | | // to fail. |
527 | | // Therefore, in order to avoid that failure, it sets sequence_number to 1 |
528 | | // indicating those WALs should be included. |
529 | 0 | *sequence = 1; |
530 | 0 | } else { |
531 | | // ReadRecord might have returned false on EOF, which means that the log |
532 | | // file is empty. Or, a failure may have occurred while processing the first |
533 | | // entry. In any case, return status and set sequence number to 0. |
534 | 0 | *sequence = 0; |
535 | 0 | } |
536 | 0 | return status; |
537 | 0 | } |
538 | | |
539 | | } // namespace ROCKSDB_NAMESPACE |