Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/db/log_writer.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#include "db/log_writer.h"
11
12
#include <cstdint>
13
14
#include "file/writable_file_writer.h"
15
#include "rocksdb/env.h"
16
#include "rocksdb/io_status.h"
17
#include "util/coding.h"
18
#include "util/crc32c.h"
19
#include "util/udt_util.h"
20
21
namespace ROCKSDB_NAMESPACE::log {
22
23
Writer::Writer(std::unique_ptr<WritableFileWriter>&& dest, uint64_t log_number,
24
               bool recycle_log_files, bool manual_flush,
25
               CompressionType compression_type)
26
    : dest_(std::move(dest)),
27
      block_offset_(0),
28
      log_number_(log_number),
29
      recycle_log_files_(recycle_log_files),
30
      // Header size varies depending on whether we are recycling or not.
31
      header_size_(recycle_log_files ? kRecyclableHeaderSize : kHeaderSize),
32
      manual_flush_(manual_flush),
33
      compression_type_(compression_type),
34
53.7k
      compress_(nullptr) {
35
699k
  for (int i = 0; i <= kMaxRecordType; i++) {
36
645k
    char t = static_cast<char>(i);
37
645k
    type_crc_[i] = crc32c::Value(&t, 1);
38
645k
  }
39
53.7k
}
40
41
53.7k
Writer::~Writer() {
42
53.7k
  ThreadStatus::OperationType cur_op_type =
43
53.7k
      ThreadStatusUtil::GetThreadOperation();
44
53.7k
  ThreadStatusUtil::SetThreadOperation(ThreadStatus::OperationType::OP_UNKNOWN);
45
53.7k
  if (dest_) {
46
29.5k
    WriteBuffer(WriteOptions()).PermitUncheckedError();
47
29.5k
  }
48
53.7k
  if (compress_) {
49
0
    delete compress_;
50
0
  }
51
53.7k
  ThreadStatusUtil::SetThreadOperation(cur_op_type);
52
53.7k
}
53
54
53.7k
IOStatus Writer::WriteBuffer(const WriteOptions& write_options) {
55
53.7k
  if (dest_->seen_error()) {
56
#ifndef NDEBUG
57
    if (dest_->seen_injected_error()) {
58
      return IOStatus::IOError("Seen injected error. Skip writing buffer.");
59
    }
60
#endif  // NDEBUG
61
0
    return IOStatus::IOError("Seen error. Skip writing buffer.");
62
0
  }
63
53.7k
  IOOptions opts;
64
53.7k
  IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
65
53.7k
  if (!s.ok()) {
66
0
    return s;
67
0
  }
68
53.7k
  return dest_->Flush(opts);
69
53.7k
}
70
71
24.2k
IOStatus Writer::Close(const WriteOptions& write_options) {
72
24.2k
  IOStatus s;
73
24.2k
  IOOptions opts;
74
24.2k
  s = WritableFileWriter::PrepareIOOptions(write_options, opts);
75
24.2k
  if (s.ok() && dest_) {
76
23.0k
    s = dest_->Close(opts);
77
23.0k
    dest_.reset();
78
23.0k
  }
79
24.2k
  return s;
80
24.2k
}
81
82
1.24k
bool Writer::PublishIfClosed() {
83
1.24k
  if (dest_->IsClosed()) {
84
1.24k
    dest_.reset();
85
1.24k
    return true;
86
1.24k
  } else {
87
0
    return false;
88
0
  }
89
1.24k
}
90
91
IOStatus Writer::AddRecord(const WriteOptions& write_options,
92
1.55M
                           const Slice& slice) {
93
1.55M
  if (dest_->seen_error()) {
94
#ifndef NDEBUG
95
    if (dest_->seen_injected_error()) {
96
      return IOStatus::IOError("Seen injected error. Skip writing buffer.");
97
    }
98
#endif  // NDEBUG
99
0
    return IOStatus::IOError("Seen error. Skip writing buffer.");
100
0
  }
101
1.55M
  const char* ptr = slice.data();
102
1.55M
  size_t left = slice.size();
103
104
  // Fragment the record if necessary and emit it.  Note that if slice
105
  // is empty, we still want to iterate once to emit a single
106
  // zero-length record
107
1.55M
  bool begin = true;
108
1.55M
  int compress_remaining = 0;
109
1.55M
  bool compress_start = false;
110
1.55M
  if (compress_) {
111
0
    compress_->Reset();
112
0
    compress_start = true;
113
0
  }
114
115
1.55M
  IOStatus s;
116
1.55M
  IOOptions opts;
117
1.55M
  s = WritableFileWriter::PrepareIOOptions(write_options, opts);
118
1.55M
  if (s.ok()) {
119
1.56M
    do {
120
1.56M
      const int64_t leftover = kBlockSize - block_offset_;
121
1.56M
      assert(leftover >= 0);
122
1.56M
      if (leftover < header_size_) {
123
        // Switch to a new block
124
11.3k
        if (leftover > 0) {
125
          // Fill the trailer (literal below relies on kHeaderSize and
126
          // kRecyclableHeaderSize being <= 11)
127
252
          assert(header_size_ <= 11);
128
252
          s = dest_->Append(opts,
129
252
                            Slice("\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00",
130
252
                                  static_cast<size_t>(leftover)),
131
252
                            0 /* crc32c_checksum */);
132
252
          if (!s.ok()) {
133
0
            break;
134
0
          }
135
252
        }
136
11.3k
        block_offset_ = 0;
137
11.3k
      }
138
139
      // Invariant: we never leave < header_size bytes in a block.
140
1.56M
      assert(static_cast<int64_t>(kBlockSize - block_offset_) >= header_size_);
141
142
1.56M
      const size_t avail = kBlockSize - block_offset_ - header_size_;
143
144
      // Compress the record if compression is enabled.
145
      // Compress() is called at least once (compress_start=true) and after the
146
      // previous generated compressed chunk is written out as one or more
147
      // physical records (left=0).
148
1.56M
      if (compress_ && (compress_start || left == 0)) {
149
0
        compress_remaining = compress_->Compress(
150
0
            slice.data(), slice.size(), compressed_buffer_.get(), &left);
151
152
0
        if (compress_remaining < 0) {
153
          // Set failure status
154
0
          s = IOStatus::IOError("Unexpected WAL compression error");
155
0
          s.SetDataLoss(true);
156
0
          break;
157
0
        } else if (left == 0) {
158
          // Nothing left to compress
159
0
          if (!compress_start) {
160
0
            break;
161
0
          }
162
0
        }
163
0
        compress_start = false;
164
0
        ptr = compressed_buffer_.get();
165
0
      }
166
167
1.56M
      const size_t fragment_length = (left < avail) ? left : avail;
168
169
1.56M
      RecordType type;
170
1.56M
      const bool end = (left == fragment_length && compress_remaining == 0);
171
1.56M
      if (begin && end) {
172
1.54M
        type = recycle_log_files_ ? kRecyclableFullType : kFullType;
173
1.54M
      } else if (begin) {
174
7.18k
        type = recycle_log_files_ ? kRecyclableFirstType : kFirstType;
175
11.0k
      } else if (end) {
176
7.18k
        type = recycle_log_files_ ? kRecyclableLastType : kLastType;
177
7.18k
      } else {
178
3.87k
        type = recycle_log_files_ ? kRecyclableMiddleType : kMiddleType;
179
3.87k
      }
180
181
1.56M
      s = EmitPhysicalRecord(write_options, type, ptr, fragment_length);
182
1.56M
      ptr += fragment_length;
183
1.56M
      left -= fragment_length;
184
1.56M
      begin = false;
185
1.56M
    } while (s.ok() && (left > 0 || compress_remaining > 0));
186
1.55M
  }
187
1.55M
  if (s.ok()) {
188
1.55M
    if (!manual_flush_) {
189
1.55M
      s = dest_->Flush(opts);
190
1.55M
    }
191
1.55M
  }
192
193
1.55M
  return s;
194
1.55M
}
195
196
24.2k
IOStatus Writer::AddCompressionTypeRecord(const WriteOptions& write_options) {
197
  // Should be the first record
198
24.2k
  assert(block_offset_ == 0);
199
200
24.2k
  if (compression_type_ == kNoCompression) {
201
    // No need to add a record
202
24.2k
    return IOStatus::OK();
203
24.2k
  }
204
205
0
  if (dest_->seen_error()) {
206
#ifndef NDEBUG
207
    if (dest_->seen_injected_error()) {
208
      return IOStatus::IOError("Seen injected error. Skip writing buffer.");
209
    }
210
#endif  // NDEBUG
211
0
    return IOStatus::IOError("Seen error. Skip writing buffer.");
212
0
  }
213
214
0
  CompressionTypeRecord record(compression_type_);
215
0
  std::string encode;
216
0
  record.EncodeTo(&encode);
217
0
  IOStatus s = EmitPhysicalRecord(write_options, kSetCompressionType,
218
0
                                  encode.data(), encode.size());
219
0
  if (s.ok()) {
220
0
    if (!manual_flush_) {
221
0
      IOOptions io_opts;
222
0
      s = WritableFileWriter::PrepareIOOptions(write_options, io_opts);
223
0
      if (s.ok()) {
224
0
        s = dest_->Flush(io_opts);
225
0
      }
226
0
    }
227
    // Initialize fields required for compression
228
0
    const size_t max_output_buffer_len = kBlockSize - header_size_;
229
0
    CompressionOptions opts;
230
0
    constexpr uint32_t compression_format_version = 2;
231
0
    compress_ = StreamingCompress::Create(compression_type_, opts,
232
0
                                          compression_format_version,
233
0
                                          max_output_buffer_len);
234
0
    assert(compress_ != nullptr);
235
0
    compressed_buffer_ =
236
0
        std::unique_ptr<char[]>(new char[max_output_buffer_len]);
237
0
    assert(compressed_buffer_);
238
0
  } else {
239
    // Disable compression if the record could not be added.
240
0
    compression_type_ = kNoCompression;
241
0
  }
242
0
  return s;
243
0
}
244
245
IOStatus Writer::MaybeAddUserDefinedTimestampSizeRecord(
246
    const WriteOptions& write_options,
247
1.38M
    const UnorderedMap<uint32_t, size_t>& cf_to_ts_sz) {
248
1.38M
  std::vector<std::pair<uint32_t, size_t>> ts_sz_to_record;
249
1.38M
  for (const auto& [cf_id, ts_sz] : cf_to_ts_sz) {
250
0
    if (recorded_cf_to_ts_sz_.count(cf_id) != 0) {
251
      // A column family's user-defined timestamp size should not be
252
      // updated while DB is running.
253
0
      assert(recorded_cf_to_ts_sz_[cf_id] == ts_sz);
254
0
    } else if (ts_sz != 0) {
255
0
      ts_sz_to_record.emplace_back(cf_id, ts_sz);
256
0
      recorded_cf_to_ts_sz_.insert(std::make_pair(cf_id, ts_sz));
257
0
    }
258
0
  }
259
1.38M
  if (ts_sz_to_record.empty()) {
260
1.38M
    return IOStatus::OK();
261
1.38M
  }
262
263
0
  UserDefinedTimestampSizeRecord record(std::move(ts_sz_to_record));
264
0
  std::string encoded;
265
0
  record.EncodeTo(&encoded);
266
0
  RecordType type = recycle_log_files_ ? kRecyclableUserDefinedTimestampSizeType
267
0
                                       : kUserDefinedTimestampSizeType;
268
269
  // If there's not enough space for this record, switch to a new block.
270
0
  const int64_t leftover = kBlockSize - block_offset_;
271
0
  if (leftover < header_size_ + (int)encoded.size()) {
272
0
    IOOptions opts;
273
0
    IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
274
0
    if (!s.ok()) {
275
0
      return s;
276
0
    }
277
278
0
    std::vector<char> trailer(leftover, '\x00');
279
0
    s = dest_->Append(opts, Slice(trailer.data(), trailer.size()));
280
0
    if (!s.ok()) {
281
0
      return s;
282
0
    }
283
284
0
    block_offset_ = 0;
285
0
  }
286
287
0
  return EmitPhysicalRecord(write_options, type, encoded.data(),
288
0
                            encoded.size());
289
0
}
290
291
23.0k
bool Writer::BufferIsEmpty() { return dest_->BufferIsEmpty(); }
292
293
IOStatus Writer::EmitPhysicalRecord(const WriteOptions& write_options,
294
1.56M
                                    RecordType t, const char* ptr, size_t n) {
295
1.56M
  assert(n <= 0xffff);  // Must fit in two bytes
296
297
1.56M
  size_t header_size;
298
1.56M
  char buf[kRecyclableHeaderSize];
299
300
  // Format the header
301
1.56M
  buf[4] = static_cast<char>(n & 0xff);
302
1.56M
  buf[5] = static_cast<char>(n >> 8);
303
1.56M
  buf[6] = static_cast<char>(t);
304
305
1.56M
  uint32_t crc = type_crc_[t];
306
1.56M
  if (t < kRecyclableFullType || t == kSetCompressionType ||
307
1.56M
      t == kUserDefinedTimestampSizeType) {
308
    // Legacy record format
309
1.56M
    assert(block_offset_ + kHeaderSize + n <= kBlockSize);
310
1.56M
    header_size = kHeaderSize;
311
1.56M
  } else {
312
    // Recyclable record format
313
0
    assert(block_offset_ + kRecyclableHeaderSize + n <= kBlockSize);
314
0
    header_size = kRecyclableHeaderSize;
315
316
    // Only encode low 32-bits of the 64-bit log number.  This means
317
    // we will fail to detect an old record if we recycled a log from
318
    // ~4 billion logs ago, but that is effectively impossible, and
319
    // even if it were we'dbe far more likely to see a false positive
320
    // on the 32-bit CRC.
321
0
    EncodeFixed32(buf + 7, static_cast<uint32_t>(log_number_));
322
0
    crc = crc32c::Extend(crc, buf + 7, 4);
323
0
  }
324
325
  // Compute the crc of the record type and the payload.
326
1.56M
  uint32_t payload_crc = crc32c::Value(ptr, n);
327
1.56M
  crc = crc32c::Crc32cCombine(crc, payload_crc, n);
328
1.56M
  crc = crc32c::Mask(crc);  // Adjust for storage
329
1.56M
  TEST_SYNC_POINT_CALLBACK("LogWriter::EmitPhysicalRecord:BeforeEncodeChecksum",
330
1.56M
                           &crc);
331
1.56M
  EncodeFixed32(buf, crc);
332
333
  // Write the header and the payload
334
1.56M
  IOOptions opts;
335
1.56M
  IOStatus s = WritableFileWriter::PrepareIOOptions(write_options, opts);
336
1.56M
  if (s.ok()) {
337
1.56M
    s = dest_->Append(opts, Slice(buf, header_size), 0 /* crc32c_checksum */);
338
1.56M
  }
339
1.56M
  if (s.ok()) {
340
1.56M
    s = dest_->Append(opts, Slice(ptr, n), payload_crc);
341
1.56M
  }
342
1.56M
  block_offset_ += header_size + n;
343
1.56M
  return s;
344
1.56M
}
345
346
}  // namespace ROCKSDB_NAMESPACE::log