/src/rocksdb/db/log_writer.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #include "db/log_writer.h" |
11 | | |
12 | | #include <cstdint> |
13 | | |
14 | | #include "file/writable_file_writer.h" |
15 | | #include "rocksdb/env.h" |
16 | | #include "rocksdb/io_status.h" |
17 | | #include "util/coding.h" |
18 | | #include "util/crc32c.h" |
19 | | #include "util/udt_util.h" |
20 | | |
21 | | namespace ROCKSDB_NAMESPACE::log { |
22 | | |
23 | | Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number, |
24 | | bool recycle_log_files, bool manual_flush, |
25 | | CompressionType compression_type) |
26 | | : dest_(std::move(dest)), |
27 | | block_offset_(0), |
28 | | log_number_(log_number), |
29 | | recycle_log_files_(recycle_log_files), |
30 | | // Header size varies depending on whether we are recycling or not. |
31 | | header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize), |
32 | | manual_flush_(manual_flush), |
33 | | compression_type_(compression_type), |
34 | 53.7k | compress_(nullptr) { |
35 | 699k | for (int i = 0; i <= kMaxRecordType; i++) { |
36 | 645k | char t = static_cast<char>(i); |
37 | 645k | type_crc_[i] = crc32c::Value(&t, 1); |
38 | 645k | } |
39 | 53.7k | } |
40 | | |
41 | 53.7k | Writer::~Writer() { |
42 | 53.7k | ThreadStatus::OperationType cur_op_type = |
43 | 53.7k | ThreadStatusUtil::GetThreadOperation(); |
44 | 53.7k | ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN); |
45 | 53.7k | if (dest_) { |
46 | 29.5k | WriteBuffer(WriteOptions()).PermitUncheckedError(); |
47 | 29.5k | } |
48 | 53.7k | if (compress_) { |
49 | 0 | delete compress_; |
50 | 0 | } |
51 | 53.7k | ThreadStatusUtil::SetThreadOperation(cur_op_type); |
52 | 53.7k | } |
53 | | |
54 | 53.7k | IOStatus Writer::WriteBuffer(const WriteOptions& write_options) { |
55 | 53.7k | if (dest_->seen_error()) { |
56 | | #ifndef NDEBUG |
57 | | if (dest_->seen_injected_error()) { |
58 | | return IOStatus::IOError("Seen injected error. Skip writing buffer."); |
59 | | } |
60 | | #endif // NDEBUG |
61 | 0 | return IOStatus::IOError("Seen error. Skip writing buffer."); |
62 | 0 | } |
63 | 53.7k | IOOptions opts; |
64 | 53.7k | IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
65 | 53.7k | if (!s.ok()) { |
66 | 0 | return s; |
67 | 0 | } |
68 | 53.7k | return dest_->Flush(opts); |
69 | 53.7k | } |
70 | | |
71 | 24.2k | IOStatus Writer::Close(const WriteOptions& write_options) { |
72 | 24.2k | IOStatus s; |
73 | 24.2k | IOOptions opts; |
74 | 24.2k | s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
75 | 24.2k | if (s.ok() && dest_) { |
76 | 23.0k | s = dest_->Close(opts); |
77 | 23.0k | dest_.reset(); |
78 | 23.0k | } |
79 | 24.2k | return s; |
80 | 24.2k | } |
81 | | |
82 | 1.24k | bool Writer::PublishIfClosed() { |
83 | 1.24k | if (dest_->IsClosed()) { |
84 | 1.24k | dest_.reset(); |
85 | 1.24k | return true; |
86 | 1.24k | } else { |
87 | 0 | return false; |
88 | 0 | } |
89 | 1.24k | } |
90 | | |
91 | | IOStatus Writer::AddRecord(const WriteOptions& write_options, |
92 | 1.55M | const Slice& slice) { |
93 | 1.55M | if (dest_->seen_error()) { |
94 | | #ifndef NDEBUG |
95 | | if (dest_->seen_injected_error()) { |
96 | | return IOStatus::IOError("Seen injected error. Skip writing buffer."); |
97 | | } |
98 | | #endif // NDEBUG |
99 | 0 | return IOStatus::IOError("Seen error. Skip writing buffer."); |
100 | 0 | } |
101 | 1.55M | const char* ptr = slice.data(); |
102 | 1.55M | size_t left = slice.size(); |
103 | | |
104 | | // Fragment the record if necessary and emit it. Note that if slice |
105 | | // is empty, we still want to iterate once to emit a single |
106 | | // zero-length record |
107 | 1.55M | bool begin = true; |
108 | 1.55M | int compress_remaining = 0; |
109 | 1.55M | bool compress_start = false; |
110 | 1.55M | if (compress_) { |
111 | 0 | compress_->Reset(); |
112 | 0 | compress_start = true; |
113 | 0 | } |
114 | | |
115 | 1.55M | IOStatus s; |
116 | 1.55M | IOOptions opts; |
117 | 1.55M | s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
118 | 1.55M | if (s.ok()) { |
119 | 1.56M | do { |
120 | 1.56M | const int64_t leftover = kBlockSize - block_offset_; |
121 | 1.56M | assert(leftover >= 0); |
122 | 1.56M | if (leftover < header_size_) { |
123 | | // Switch to a new block |
124 | 11.3k | if (leftover > 0) { |
125 | | // Fill the trailer (literal below relies on kHeaderSize and |
126 | | // kRecyclableHeaderSize being <= 11) |
127 | 252 | assert(header_size_ <= 11); |
128 | 252 | s = dest_->Append(opts, |
129 | 252 | Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00", |
130 | 252 | static_cast<size_t>(leftover)), |
131 | 252 | 0 /* crc32c_checksum */); |
132 | 252 | if (!s.ok()) { |
133 | 0 | break; |
134 | 0 | } |
135 | 252 | } |
136 | 11.3k | block_offset_ = 0; |
137 | 11.3k | } |
138 | | |
139 | | // Invariant: we never leave < header_size bytes in a block. |
140 | 1.56M | assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size_); |
141 | | |
142 | 1.56M | const size_t avail = kBlockSize - block_offset_ - header_size_; |
143 | | |
144 | | // Compress the record if compression is enabled. |
145 | | // Compress() is called at least once (compress_start=true) and after the |
146 | | // previous generated compressed chunk is written out as one or more |
147 | | // physical records (left=0). |
148 | 1.56M | if (compress_ && (compress_start || left == 0)) { |
149 | 0 | compress_remaining = compress_->Compress( |
150 | 0 | slice.data(), slice.size(), compressed_buffer_.get(), &left); |
151 | |
|
152 | 0 | if (compress_remaining < 0) { |
153 | | // Set failure status |
154 | 0 | s = IOStatus::IOError("Unexpected WAL compression error"); |
155 | 0 | s.SetDataLoss(true); |
156 | 0 | break; |
157 | 0 | } else if (left == 0) { |
158 | | // Nothing left to compress |
159 | 0 | if (!compress_start) { |
160 | 0 | break; |
161 | 0 | } |
162 | 0 | } |
163 | 0 | compress_start = false; |
164 | 0 | ptr = compressed_buffer_.get(); |
165 | 0 | } |
166 | | |
167 | 1.56M | const size_t fragment_length = (left < avail) ? left : avail; |
168 | | |
169 | 1.56M | RecordType type; |
170 | 1.56M | const bool end = (left == fragment_length && compress_remaining == 0); |
171 | 1.56M | if (begin && end) { |
172 | 1.54M | type = recycle_log_files_ ? kRecyclableFullType : kFullType; |
173 | 1.54M | } else if (begin) { |
174 | 7.18k | type = recycle_log_files_ ? kRecyclableFirstType : kFirstType; |
175 | 11.0k | } else if (end) { |
176 | 7.18k | type = recycle_log_files_ ? kRecyclableLastType : kLastType; |
177 | 7.18k | } else { |
178 | 3.87k | type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType; |
179 | 3.87k | } |
180 | | |
181 | 1.56M | s = EmitPhysicalRecord(write_options, type, ptr, fragment_length); |
182 | 1.56M | ptr += fragment_length; |
183 | 1.56M | left -= fragment_length; |
184 | 1.56M | begin = false; |
185 | 1.56M | } while (s.ok() && (left > 0 || compress_remaining > 0)); |
186 | 1.55M | } |
187 | 1.55M | if (s.ok()) { |
188 | 1.55M | if (!manual_flush_) { |
189 | 1.55M | s = dest_->Flush(opts); |
190 | 1.55M | } |
191 | 1.55M | } |
192 | | |
193 | 1.55M | return s; |
194 | 1.55M | } |
195 | | |
196 | 24.2k | IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) { |
197 | | // Should be the first record |
198 | 24.2k | assert(block_offset_ == 0); |
199 | | |
200 | 24.2k | if (compression_type_ == kNoCompression) { |
201 | | // No need to add a record |
202 | 24.2k | return IOStatus::OK(); |
203 | 24.2k | } |
204 | | |
205 | 0 | if (dest_->seen_error()) { |
206 | | #ifndef NDEBUG |
207 | | if (dest_->seen_injected_error()) { |
208 | | return IOStatus::IOError("Seen injected error. Skip writing buffer."); |
209 | | } |
210 | | #endif // NDEBUG |
211 | 0 | return IOStatus::IOError("Seen error. Skip writing buffer."); |
212 | 0 | } |
213 | | |
214 | 0 | CompressionTypeRecord record(compression_type_); |
215 | 0 | std::string encode; |
216 | 0 | record.EncodeTo(&encode); |
217 | 0 | IOStatus s = EmitPhysicalRecord(write_options, kSetCompressionType, |
218 | 0 | encode.data(), encode.size()); |
219 | 0 | if (s.ok()) { |
220 | 0 | if (!manual_flush_) { |
221 | 0 | IOOptions io_opts; |
222 | 0 | s = WritableFileWriter::PrepareIOOptions(write_options, io_opts); |
223 | 0 | if (s.ok()) { |
224 | 0 | s = dest_->Flush(io_opts); |
225 | 0 | } |
226 | 0 | } |
227 | | // Initialize fields required for compression |
228 | 0 | const size_t max_output_buffer_len = kBlockSize - header_size_; |
229 | 0 | CompressionOptions opts; |
230 | 0 | constexpr uint32_t compression_format_version = 2; |
231 | 0 | compress_ = StreamingCompress::Create(compression_type_, opts, |
232 | 0 | compression_format_version, |
233 | 0 | max_output_buffer_len); |
234 | 0 | assert(compress_ != nullptr); |
235 | 0 | compressed_buffer_ = |
236 | 0 | std::unique_ptr<char[]>(new char[max_output_buffer_len]); |
237 | 0 | assert(compressed_buffer_); |
238 | 0 | } else { |
239 | | // Disable compression if the record could not be added. |
240 | 0 | compression_type_ = kNoCompression; |
241 | 0 | } |
242 | 0 | return s; |
243 | 0 | } |
244 | | |
245 | | IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord( |
246 | | const WriteOptions& write_options, |
247 | 1.38M | const UnorderedMap<uint32_t, size_t>& cf_to_ts_sz) { |
248 | 1.38M | std::vector<std::pair<uint32_t, size_t>> ts_sz_to_record; |
249 | 1.38M | for (const auto& [cf_id, ts_sz] : cf_to_ts_sz) { |
250 | 0 | if (recorded_cf_to_ts_sz_.count(cf_id) != 0) { |
251 | | // A column family's user-defined timestamp size should not be |
252 | | // updated while DB is running. |
253 | 0 | assert(recorded_cf_to_ts_sz_[cf_id] == ts_sz); |
254 | 0 | } else if (ts_sz != 0) { |
255 | 0 | ts_sz_to_record.emplace_back(cf_id, ts_sz); |
256 | 0 | recorded_cf_to_ts_sz_.insert(std::make_pair(cf_id, ts_sz)); |
257 | 0 | } |
258 | 0 | } |
259 | 1.38M | if (ts_sz_to_record.empty()) { |
260 | 1.38M | return IOStatus::OK(); |
261 | 1.38M | } |
262 | | |
263 | 0 | UserDefinedTimestampSizeRecord record(std::move(ts_sz_to_record)); |
264 | 0 | std::string encoded; |
265 | 0 | record.EncodeTo(&encoded); |
266 | 0 | RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType |
267 | 0 | : kUserDefinedTimestampSizeType; |
268 | | |
269 | | // If there's not enough space for this record, switch to a new block. |
270 | 0 | const int64_t leftover = kBlockSize - block_offset_; |
271 | 0 | if (leftover < header_size_ + (int)encoded.size()) { |
272 | 0 | IOOptions opts; |
273 | 0 | IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
274 | 0 | if (!s.ok()) { |
275 | 0 | return s; |
276 | 0 | } |
277 | | |
278 | 0 | std::vector<char> trailer(leftover, '\x00'); |
279 | 0 | s = dest_->Append(opts, Slice(trailer.data(), trailer.size())); |
280 | 0 | if (!s.ok()) { |
281 | 0 | return s; |
282 | 0 | } |
283 | | |
284 | 0 | block_offset_ = 0; |
285 | 0 | } |
286 | | |
287 | 0 | return EmitPhysicalRecord(write_options, type, encoded.data(), |
288 | 0 | encoded.size()); |
289 | 0 | } |
290 | | |
291 | 23.0k | bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); } |
292 | | |
293 | | IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options, |
294 | 1.56M | RecordType t, const char* ptr, size_t n) { |
295 | 1.56M | assert(n <= 0xffff); // Must fit in two bytes |
296 | | |
297 | 1.56M | size_t header_size; |
298 | 1.56M | char buf[kRecyclableHeaderSize]; |
299 | | |
300 | | // Format the header |
301 | 1.56M | buf[4] = static_cast<char>(n & 0xff); |
302 | 1.56M | buf[5] = static_cast<char>(n >> 8); |
303 | 1.56M | buf[6] = static_cast<char>(t); |
304 | | |
305 | 1.56M | uint32_t crc = type_crc_[t]; |
306 | 1.56M | if (t < kRecyclableFullType || t == kSetCompressionType || |
307 | 1.56M | t == kUserDefinedTimestampSizeType) { |
308 | | // Legacy record format |
309 | 1.56M | assert(block_offset_ + kHeaderSize + n <= kBlockSize); |
310 | 1.56M | header_size = kHeaderSize; |
311 | 1.56M | } else { |
312 | | // Recyclable record format |
313 | 0 | assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize); |
314 | 0 | header_size = kRecyclableHeaderSize; |
315 | | |
316 | | // Only encode low 32-bits of the 64-bit log number. This means |
317 | | // we will fail to detect an old record if we recycled a log from |
318 | | // ~4 billion logs ago, but that is effectively impossible, and |
319 | | // even if it were we'dbe far more likely to see a false positive |
320 | | // on the 32-bit CRC. |
321 | 0 | EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_)); |
322 | 0 | crc = crc32c::Extend(crc, buf + 7, 4); |
323 | 0 | } |
324 | | |
325 | | // Compute the crc of the record type and the payload. |
326 | 1.56M | uint32_t payload_crc = crc32c::Value(ptr, n); |
327 | 1.56M | crc = crc32c::Crc32cCombine(crc, payload_crc, n); |
328 | 1.56M | crc = crc32c::Mask(crc); // Adjust for storage |
329 | 1.56M | TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum", |
330 | 1.56M | &crc); |
331 | 1.56M | EncodeFixed32(buf, crc); |
332 | | |
333 | | // Write the header and the payload |
334 | 1.56M | IOOptions opts; |
335 | 1.56M | IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
336 | 1.56M | if (s.ok()) { |
337 | 1.56M | s = dest_->Append(opts, Slice(buf, header_size), 0 /* crc32c_checksum */); |
338 | 1.56M | } |
339 | 1.56M | if (s.ok()) { |
340 | 1.56M | s = dest_->Append(opts, Slice(ptr, n), payload_crc); |
341 | 1.56M | } |
342 | 1.56M | block_offset_ += header_size + n; |
343 | 1.56M | return s; |
344 | 1.56M | } |
345 | | |
346 | | } // namespace ROCKSDB_NAMESPACE::log |