/src/rocksdb/db/blob/blob_log_writer.cc
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #include "db/blob/blob_log_writer.h" |
7 | | |
8 | | #include <cstdint> |
9 | | #include <string> |
10 | | |
11 | | #include "db/blob/blob_log_format.h" |
12 | | #include "file/writable_file_writer.h" |
13 | | #include "monitoring/statistics_impl.h" |
14 | | #include "rocksdb/system_clock.h" |
15 | | #include "test_util/sync_point.h" |
16 | | #include "util/coding.h" |
17 | | #include "util/stop_watch.h" |
18 | | |
19 | | namespace ROCKSDB_NAMESPACE { |
20 | | |
21 | | BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest, |
22 | | SystemClock* clock, Statistics* statistics, |
23 | | uint64_t log_number, bool use_fs, bool do_flush, |
24 | | uint64_t boffset) |
25 | 0 | : dest_(std::move(dest)), |
26 | 0 | clock_(clock), |
27 | 0 | statistics_(statistics), |
28 | 0 | log_number_(log_number), |
29 | 0 | block_offset_(boffset), |
30 | 0 | use_fsync_(use_fs), |
31 | 0 | do_flush_(do_flush), |
32 | 0 | last_elem_type_(kEtNone) {} |
33 | | |
34 | 0 | BlobLogWriter::~BlobLogWriter() = default; |
35 | | |
36 | 0 | Status BlobLogWriter::Sync(const WriteOptions& write_options) { |
37 | 0 | TEST_SYNC_POINT("BlobLogWriter::Sync"); |
38 | |
|
39 | 0 | StopWatch sync_sw(clock_, statistics_, BLOB_DB_BLOB_FILE_SYNC_MICROS); |
40 | 0 | IOOptions opts; |
41 | 0 | Status s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
42 | 0 | if (s.ok()) { |
43 | 0 | s = dest_->Sync(opts, use_fsync_); |
44 | 0 | } |
45 | 0 | if (s.ok()) { |
46 | 0 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_SYNCED); |
47 | 0 | } |
48 | 0 | return s; |
49 | 0 | } |
50 | | |
51 | | Status BlobLogWriter::WriteHeader(const WriteOptions& write_options, |
52 | 0 | BlobLogHeader& header) { |
53 | 0 | assert(block_offset_ == 0); |
54 | 0 | assert(last_elem_type_ == kEtNone); |
55 | 0 | std::string str; |
56 | 0 | header.EncodeTo(&str); |
57 | |
|
58 | 0 | IOOptions opts; |
59 | 0 | Status s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
60 | 0 | if (s.ok()) { |
61 | 0 | s = dest_->Append(opts, Slice(str)); |
62 | 0 | } |
63 | 0 | if (s.ok()) { |
64 | 0 | block_offset_ += str.size(); |
65 | 0 | if (do_flush_) { |
66 | 0 | s = dest_->Flush(opts); |
67 | 0 | } |
68 | 0 | } |
69 | 0 | last_elem_type_ = kEtFileHdr; |
70 | 0 | if (s.ok()) { |
71 | 0 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, |
72 | 0 | BlobLogHeader::kSize); |
73 | 0 | } |
74 | 0 | return s; |
75 | 0 | } |
76 | | |
77 | | Status BlobLogWriter::AppendFooter(const WriteOptions& write_options, |
78 | | BlobLogFooter& footer, |
79 | | std::string* checksum_method, |
80 | 0 | std::string* checksum_value) { |
81 | 0 | assert(block_offset_ != 0); |
82 | 0 | assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); |
83 | |
|
84 | 0 | std::string str; |
85 | 0 | footer.EncodeTo(&str); |
86 | |
|
87 | 0 | Status s; |
88 | 0 | if (dest_->seen_error()) { |
89 | 0 | s.PermitUncheckedError(); |
90 | 0 | return Status::IOError("Seen Error. Skip closing."); |
91 | 0 | } else { |
92 | 0 | IOOptions opts; |
93 | 0 | s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
94 | 0 | if (s.ok()) { |
95 | 0 | s = dest_->Append(opts, Slice(str)); |
96 | 0 | } |
97 | 0 | if (s.ok()) { |
98 | 0 | block_offset_ += str.size(); |
99 | 0 | s = Sync(write_options); |
100 | |
|
101 | 0 | if (s.ok()) { |
102 | 0 | s = dest_->Close(opts); |
103 | |
|
104 | 0 | if (s.ok()) { |
105 | 0 | assert(!!checksum_method == !!checksum_value); |
106 | |
|
107 | 0 | if (checksum_method) { |
108 | 0 | assert(checksum_method->empty()); |
109 | |
|
110 | 0 | std::string method = dest_->GetFileChecksumFuncName(); |
111 | 0 | if (method != kUnknownFileChecksumFuncName) { |
112 | 0 | *checksum_method = std::move(method); |
113 | 0 | } |
114 | 0 | } |
115 | 0 | if (checksum_value) { |
116 | 0 | assert(checksum_value->empty()); |
117 | |
|
118 | 0 | std::string value = dest_->GetFileChecksum(); |
119 | 0 | if (value != kUnknownFileChecksum) { |
120 | 0 | *checksum_value = std::move(value); |
121 | 0 | } |
122 | 0 | } |
123 | 0 | } |
124 | 0 | } |
125 | 0 | } |
126 | |
|
127 | 0 | dest_.reset(); |
128 | 0 | } |
129 | | |
130 | 0 | last_elem_type_ = kEtFileFooter; |
131 | 0 | if (s.ok()) { |
132 | 0 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, |
133 | 0 | BlobLogFooter::kSize); |
134 | 0 | } |
135 | 0 | return s; |
136 | 0 | } |
137 | | |
138 | | Status BlobLogWriter::AddRecord(const WriteOptions& write_options, |
139 | | const Slice& key, const Slice& val, |
140 | | uint64_t expiration, uint64_t* key_offset, |
141 | 0 | uint64_t* blob_offset) { |
142 | 0 | assert(block_offset_ != 0); |
143 | 0 | assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); |
144 | |
|
145 | 0 | std::string buf; |
146 | 0 | ConstructBlobHeader(&buf, key, val, expiration); |
147 | |
|
148 | 0 | Status s = |
149 | 0 | EmitPhysicalRecord(write_options, buf, key, val, key_offset, blob_offset); |
150 | 0 | return s; |
151 | 0 | } |
152 | | |
153 | | Status BlobLogWriter::AddRecord(const WriteOptions& write_options, |
154 | | const Slice& key, const Slice& val, |
155 | 0 | uint64_t* key_offset, uint64_t* blob_offset) { |
156 | 0 | assert(block_offset_ != 0); |
157 | 0 | assert(last_elem_type_ == kEtFileHdr || last_elem_type_ == kEtRecord); |
158 | |
|
159 | 0 | std::string buf; |
160 | 0 | ConstructBlobHeader(&buf, key, val, 0); |
161 | |
|
162 | 0 | Status s = |
163 | 0 | EmitPhysicalRecord(write_options, buf, key, val, key_offset, blob_offset); |
164 | 0 | return s; |
165 | 0 | } |
166 | | |
167 | | void BlobLogWriter::ConstructBlobHeader(std::string* buf, const Slice& key, |
168 | 0 | const Slice& val, uint64_t expiration) { |
169 | 0 | BlobLogRecord record; |
170 | 0 | record.key = key; |
171 | 0 | record.value = val; |
172 | 0 | record.expiration = expiration; |
173 | 0 | record.EncodeHeaderTo(buf); |
174 | 0 | } |
175 | | |
176 | | Status BlobLogWriter::EmitPhysicalRecord(const WriteOptions& write_options, |
177 | | const std::string& headerbuf, |
178 | | const Slice& key, const Slice& val, |
179 | | uint64_t* key_offset, |
180 | 0 | uint64_t* blob_offset) { |
181 | 0 | IOOptions opts; |
182 | 0 | Status s = WritableFileWriter::PrepareIOOptions(write_options, opts); |
183 | 0 | if (s.ok()) { |
184 | 0 | s = dest_->Append(opts, Slice(headerbuf)); |
185 | 0 | } |
186 | 0 | if (s.ok()) { |
187 | 0 | s = dest_->Append(opts, key); |
188 | 0 | } |
189 | 0 | if (s.ok()) { |
190 | 0 | s = dest_->Append(opts, val); |
191 | 0 | } |
192 | 0 | if (do_flush_ && s.ok()) { |
193 | 0 | s = dest_->Flush(opts); |
194 | 0 | } |
195 | |
|
196 | 0 | *key_offset = block_offset_ + BlobLogRecord::kHeaderSize; |
197 | 0 | *blob_offset = *key_offset + key.size(); |
198 | 0 | block_offset_ = *blob_offset + val.size(); |
199 | 0 | last_elem_type_ = kEtRecord; |
200 | 0 | if (s.ok()) { |
201 | 0 | RecordTick(statistics_, BLOB_DB_BLOB_FILE_BYTES_WRITTEN, |
202 | 0 | BlobLogRecord::kHeaderSize + key.size() + val.size()); |
203 | 0 | } |
204 | 0 | return s; |
205 | 0 | } |
206 | | |
207 | | } // namespace ROCKSDB_NAMESPACE |