Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/db/blob/blob_log_writer.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "db/blob/blob_log_writer.h"
7
8
#include <cstdint>
9
#include <string>
10
11
#include "db/blob/blob_log_format.h"
12
#include "file/writable_file_writer.h"
13
#include "monitoring/statistics_impl.h"
14
#include "rocksdb/system_clock.h"
15
#include "test_util/sync_point.h"
16
#include "util/coding.h"
17
#include "util/stop_watch.h"
18
19
namespace ROCKSDB_NAMESPACE {
20
21
BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
22
                             SystemClock* clock, Statistics* statistics,
23
                             uint64_t log_number, bool use_fs, bool do_flush,
24
                             uint64_t boffset)
25
0
    : dest_(std::move(dest)),
26
0
      clock_(clock),
27
0
      statistics_(statistics),
28
0
      log_number_(log_number),
29
0
      block_offset_(boffset),
30
0
      use_fsync_(use_fs),
31
0
      do_flush_(do_flush),
32
0
      last_elem_type_(kEtNone) {}
33
34
0
BlobLogWriter::~BlobLogWriter() = default;
35
36
0
Status BlobLogWriter::Sync(const WriteOptions& write_options) {
37
0
  TEST_SYNC_POINT("BlobLogWriter::Sync");
38
39
0
  StopWatch sync_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS);
40
0
  IOOptions opts;
41
0
  Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
42
0
  if (s.ok()) {
43
0
    s = dest_->Sync(opts, use_fsync_);
44
0
  }
45
0
  if (s.ok()) {
46
0
    RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED);
47
0
  }
48
0
  return s;
49
0
}
50
51
Status BlobLogWriter::WriteHeader(const WriteOptions& write_options,
52
0
                                  BlobLogHeader& header) {
53
0
  assert(block_offset_ == 0);
54
0
  assert(last_elem_type_ == kEtNone);
55
0
  std::string str;
56
0
  header.EncodeTo(&str);
57
58
0
  IOOptions opts;
59
0
  Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
60
0
  if (s.ok()) {
61
0
    s = dest_->Append(opts, Slice(str));
62
0
  }
63
0
  if (s.ok()) {
64
0
    block_offset_ += str.size();
65
0
    if (do_flush_) {
66
0
      s = dest_->Flush(opts);
67
0
    }
68
0
  }
69
0
  last_elem_type_ = kEtFileHdr;
70
0
  if (s.ok()) {
71
0
    RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
72
0
               BlobLogHeader::kSize);
73
0
  }
74
0
  return s;
75
0
}
76
77
Status BlobLogWriter::AppendFooter(const WriteOptions& write_options,
78
                                   BlobLogFooter& footer,
79
                                   std::string* checksum_method,
80
0
                                   std::string* checksum_value) {
81
0
  assert(block_offset_ != 0);
82
0
  assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
83
84
0
  std::string str;
85
0
  footer.EncodeTo(&str);
86
87
0
  Status s;
88
0
  if (dest_->seen_error()) {
89
0
    s.PermitUncheckedError();
90
0
    return Status::IOError("Seen Error. Skip closing.");
91
0
  } else {
92
0
    IOOptions opts;
93
0
    s = WritableFileWriter::PrepareIOOptions(write_options, opts);
94
0
    if (s.ok()) {
95
0
      s = dest_->Append(opts, Slice(str));
96
0
    }
97
0
    if (s.ok()) {
98
0
      block_offset_ += str.size();
99
0
      s = Sync(write_options);
100
101
0
      if (s.ok()) {
102
0
        s = dest_->Close(opts);
103
104
0
        if (s.ok()) {
105
0
          assert(!!checksum_method == !!checksum_value);
106
107
0
          if (checksum_method) {
108
0
            assert(checksum_method->empty());
109
110
0
            std::string method = dest_->GetFileChecksumFuncName();
111
0
            if (method != kUnknownFileChecksumFuncName) {
112
0
              *checksum_method = std::move(method);
113
0
            }
114
0
          }
115
0
          if (checksum_value) {
116
0
            assert(checksum_value->empty());
117
118
0
            std::string value = dest_->GetFileChecksum();
119
0
            if (value != kUnknownFileChecksum) {
120
0
              *checksum_value = std::move(value);
121
0
            }
122
0
          }
123
0
        }
124
0
      }
125
0
    }
126
127
0
    dest_.reset();
128
0
  }
129
130
0
  last_elem_type_ = kEtFileFooter;
131
0
  if (s.ok()) {
132
0
    RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
133
0
               BlobLogFooter::kSize);
134
0
  }
135
0
  return s;
136
0
}
137
138
Status BlobLogWriter::AddRecord(const WriteOptions& write_options,
139
                                const Slice& key, const Slice& val,
140
                                uint64_t expiration, uint64_t* key_offset,
141
0
                                uint64_t* blob_offset) {
142
0
  assert(block_offset_ != 0);
143
0
  assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
144
145
0
  std::string buf;
146
0
  ConstructBlobHeader(&buf, key, val, expiration);
147
148
0
  Status s =
149
0
      EmitPhysicalRecord(write_options, buf, key, val, key_offset, blob_offset);
150
0
  return s;
151
0
}
152
153
Status BlobLogWriter::AddRecord(const WriteOptions& write_options,
154
                                const Slice& key, const Slice& val,
155
0
                                uint64_t* key_offset, uint64_t* blob_offset) {
156
0
  assert(block_offset_ != 0);
157
0
  assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord);
158
159
0
  std::string buf;
160
0
  ConstructBlobHeader(&buf, key, val, 0);
161
162
0
  Status s =
163
0
      EmitPhysicalRecord(write_options, buf, key, val, key_offset, blob_offset);
164
0
  return s;
165
0
}
166
167
void BlobLogWriter::ConstructBlobHeader(std::string* buf, const Slice& key,
168
0
                                        const Slice& val, uint64_t expiration) {
169
0
  BlobLogRecord record;
170
0
  record.key = key;
171
0
  record.value = val;
172
0
  record.expiration = expiration;
173
0
  record.EncodeHeaderTo(buf);
174
0
}
175
176
Status BlobLogWriter::EmitPhysicalRecord(const WriteOptions& write_options,
177
                                         const std::string& headerbuf,
178
                                         const Slice& key, const Slice& val,
179
                                         uint64_t* key_offset,
180
0
                                         uint64_t* blob_offset) {
181
0
  IOOptions opts;
182
0
  Status s = WritableFileWriter::PrepareIOOptions(write_options, opts);
183
0
  if (s.ok()) {
184
0
    s = dest_->Append(opts, Slice(headerbuf));
185
0
  }
186
0
  if (s.ok()) {
187
0
    s = dest_->Append(opts, key);
188
0
  }
189
0
  if (s.ok()) {
190
0
    s = dest_->Append(opts, val);
191
0
  }
192
0
  if (do_flush_ && s.ok()) {
193
0
    s = dest_->Flush(opts);
194
0
  }
195
196
0
  *key_offset = block_offset_ + BlobLogRecord::kHeaderSize;
197
0
  *blob_offset = *key_offset + key.size();
198
0
  block_offset_ = *blob_offset + val.size();
199
0
  last_elem_type_ = kEtRecord;
200
0
  if (s.ok()) {
201
0
    RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN,
202
0
               BlobLogRecord::kHeaderSize + key.size() + val.size());
203
0
  }
204
0
  return s;
205
0
}
206
207
}  // namespace ROCKSDB_NAMESPACE