/src/rocksdb/db/write_batch_internal.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | // |
6 | | // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
7 | | // Use of this source code is governed by a BSD-style license that can be |
8 | | // found in the LICENSE file. See the AUTHORS file for names of contributors. |
9 | | |
10 | | #pragma once |
11 | | #include <array> |
12 | | #include <vector> |
13 | | |
14 | | #include "db/flush_scheduler.h" |
15 | | #include "db/kv_checksum.h" |
16 | | #include "db/trim_history_scheduler.h" |
17 | | #include "db/write_thread.h" |
18 | | #include "rocksdb/db.h" |
19 | | #include "rocksdb/options.h" |
20 | | #include "rocksdb/types.h" |
21 | | #include "rocksdb/write_batch.h" |
22 | | #include "util/autovector.h" |
23 | | #include "util/cast_util.h" |
24 | | |
25 | | namespace ROCKSDB_NAMESPACE { |
26 | | |
27 | | class MemTable; |
28 | | class FlushScheduler; |
29 | | class ColumnFamilyData; |
30 | | |
31 | | class ColumnFamilyMemTables { |
32 | | public: |
33 | 11.0k | virtual ~ColumnFamilyMemTables() {} |
34 | | virtual bool Seek(uint32_t column_family_id) = 0; |
35 | | // returns true if the update to memtable should be ignored |
36 | | // (useful when recovering from log whose updates have already |
37 | | // been processed) |
38 | | virtual uint64_t GetLogNumber() const = 0; |
39 | | virtual MemTable* GetMemTable() const = 0; |
40 | | virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0; |
41 | 0 | virtual ColumnFamilyData* current() { return nullptr; } |
42 | | }; |
43 | | |
44 | | class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables { |
45 | | public: |
46 | | explicit ColumnFamilyMemTablesDefault(MemTable* mem) |
47 | 0 | : ok_(false), mem_(mem) {} |
48 | | |
49 | 0 | bool Seek(uint32_t column_family_id) override { |
50 | 0 | ok_ = (column_family_id == 0); |
51 | 0 | return ok_; |
52 | 0 | } |
53 | | |
54 | 0 | uint64_t GetLogNumber() const override { return 0; } |
55 | | |
56 | 0 | MemTable* GetMemTable() const override { |
57 | 0 | assert(ok_); |
58 | 0 | return mem_; |
59 | 0 | } |
60 | | |
61 | 0 | ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; } |
62 | | |
63 | | private: |
64 | | bool ok_; |
65 | | MemTable* mem_; |
66 | | }; |
67 | | |
68 | | struct WriteBatch::ProtectionInfo { |
69 | | // `WriteBatch` usually doesn't contain a huge number of keys so protecting |
70 | | // with a fixed, non-configurable eight bytes per key may work well enough. |
71 | | autovector<ProtectionInfoKVOC64> entries_; |
72 | | |
73 | 0 | size_t GetBytesPerKey() const { return 8; } |
74 | | }; |
75 | | |
76 | | // WriteBatchInternal provides static methods for manipulating a |
77 | | // WriteBatch that we don't want in the public WriteBatch interface. |
78 | | class WriteBatchInternal { |
79 | | public: |
80 | | // WriteBatch header has an 8-byte sequence number followed by a 4-byte count. |
81 | | static constexpr size_t kHeader = 12; |
82 | | |
83 | | // WriteBatch methods with column_family_id instead of ColumnFamilyHandle* |
84 | | static Status Put(WriteBatch* batch, uint32_t column_family_id, |
85 | | const Slice& key, const Slice& value); |
86 | | |
87 | | static Status Put(WriteBatch* batch, uint32_t column_family_id, |
88 | | const SliceParts& key, const SliceParts& value); |
89 | | |
90 | | static Status TimedPut(WriteBatch* batch, uint32_t column_family_id, |
91 | | const Slice& key, const Slice& value, |
92 | | uint64_t unix_write_time); |
93 | | |
94 | | static Status PutEntity(WriteBatch* batch, uint32_t column_family_id, |
95 | | const Slice& key, const WideColumns& columns); |
96 | | |
97 | | static Status Delete(WriteBatch* batch, uint32_t column_family_id, |
98 | | const SliceParts& key); |
99 | | |
100 | | static Status Delete(WriteBatch* batch, uint32_t column_family_id, |
101 | | const Slice& key); |
102 | | |
103 | | static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, |
104 | | const SliceParts& key); |
105 | | |
106 | | static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id, |
107 | | const Slice& key); |
108 | | |
109 | | static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, |
110 | | const Slice& begin_key, const Slice& end_key); |
111 | | |
112 | | static Status DeleteRange(WriteBatch* b, uint32_t column_family_id, |
113 | | const SliceParts& begin_key, |
114 | | const SliceParts& end_key); |
115 | | |
116 | | static Status Merge(WriteBatch* batch, uint32_t column_family_id, |
117 | | const Slice& key, const Slice& value); |
118 | | |
119 | | static Status Merge(WriteBatch* batch, uint32_t column_family_id, |
120 | | const SliceParts& key, const SliceParts& value); |
121 | | |
122 | | static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id, |
123 | | const Slice& key, const Slice& value); |
124 | | |
125 | | static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid, |
126 | | const bool write_after_commit = true, |
127 | | const bool unprepared_batch = false); |
128 | | |
129 | | static Status MarkRollback(WriteBatch* batch, const Slice& xid); |
130 | | |
131 | | static Status MarkCommit(WriteBatch* batch, const Slice& xid); |
132 | | |
133 | | static Status MarkCommitWithTimestamp(WriteBatch* batch, const Slice& xid, |
134 | | const Slice& commit_ts); |
135 | | |
136 | | static Status InsertNoop(WriteBatch* batch); |
137 | | |
138 | | // Return the number of entries in the batch. |
139 | | static uint32_t Count(const WriteBatch* batch); |
140 | | |
141 | | // Set the count for the number of entries in the batch. |
142 | | static void SetCount(WriteBatch* batch, uint32_t n); |
143 | | |
144 | | // Return the sequence number for the start of this batch. |
145 | | static SequenceNumber Sequence(const WriteBatch* batch); |
146 | | |
147 | | // Store the specified number as the sequence number for the start of |
148 | | // this batch. |
149 | | static void SetSequence(WriteBatch* batch, SequenceNumber seq); |
150 | | |
151 | | // Returns the offset of the first entry in the batch. |
152 | | // This offset is only valid if the batch is not empty. |
153 | | static size_t GetFirstOffset(WriteBatch* batch); |
154 | | |
155 | 1.16M | static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); } |
156 | | |
157 | 2.32M | static size_t ByteSize(const WriteBatch* batch) { return batch->rep_.size(); } |
158 | | |
159 | | static Status SetContents(WriteBatch* batch, const Slice& contents); |
160 | | |
161 | | static Status CheckSlicePartsLength(const SliceParts& key, |
162 | | const SliceParts& value); |
163 | | |
164 | | // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive. |
165 | | // |
166 | | // If ignore_missing_column_families == true. WriteBatch |
167 | | // referencing non-existing column family will be ignored. |
168 | | // If ignore_missing_column_families == false, processing of the |
169 | | // batches will be stopped if a reference is found to a non-existing |
170 | | // column family and InvalidArgument() will be returned. The writes |
171 | | // in batches may be only partially applied at that point. |
172 | | // |
173 | | // If log_number is non-zero, the memtable will be updated only if |
174 | | // memtables->GetLogNumber() >= log_number. |
175 | | // |
176 | | // If flush_scheduler is non-null, it will be invoked if the memtable |
177 | | // should be flushed. |
178 | | // |
179 | | // Under concurrent use, the caller is responsible for making sure that |
180 | | // the memtables object itself is thread-local. |
181 | | static Status InsertInto( |
182 | | WriteThread::WriteGroup& write_group, SequenceNumber sequence, |
183 | | ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, |
184 | | TrimHistoryScheduler* trim_history_scheduler, |
185 | | bool ignore_missing_column_families = false, uint64_t log_number = 0, |
186 | | DB* db = nullptr, bool concurrent_memtable_writes = false, |
187 | | bool seq_per_batch = false, bool batch_per_txn = true); |
188 | | |
189 | | // Convenience form of InsertInto when you have only one batch |
190 | | // next_seq returns the seq after last sequence number used in MemTable insert |
191 | | static Status InsertInto( |
192 | | const WriteBatch* batch, ColumnFamilyMemTables* memtables, |
193 | | FlushScheduler* flush_scheduler, |
194 | | TrimHistoryScheduler* trim_history_scheduler, |
195 | | bool ignore_missing_column_families = false, uint64_t log_number = 0, |
196 | | DB* db = nullptr, bool concurrent_memtable_writes = false, |
197 | | SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr, |
198 | | bool seq_per_batch = false, bool batch_per_txn = true); |
199 | | |
200 | | static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence, |
201 | | ColumnFamilyMemTables* memtables, |
202 | | FlushScheduler* flush_scheduler, |
203 | | TrimHistoryScheduler* trim_history_scheduler, |
204 | | bool ignore_missing_column_families = false, |
205 | | uint64_t log_number = 0, DB* db = nullptr, |
206 | | bool concurrent_memtable_writes = false, |
207 | | bool seq_per_batch = false, size_t batch_cnt = 0, |
208 | | bool batch_per_txn = true, |
209 | | bool hint_per_batch = false); |
210 | | |
211 | | // Appends src write batch to dst write batch and updates count in dst |
212 | | // write batch. Returns OK if the append is successful. Checks number of |
213 | | // checksum against count in dst and src write batches, and returns Corruption |
214 | | // if the count is inconsistent. |
215 | | static Status Append(WriteBatch* dst, const WriteBatch* src, |
216 | | const bool WAL_only = false); |
217 | | |
218 | | // Returns the byte size of appending a WriteBatch with ByteSize |
219 | | // leftByteSize and a WriteBatch with ByteSize rightByteSize |
220 | | static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize); |
221 | | |
222 | | // Iterate over [begin, end) range of a write batch |
223 | | static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler, |
224 | | size_t begin, size_t end); |
225 | | |
226 | | // This write batch includes the latest state that should be persisted. Such |
227 | | // state meant to be used only during recovery. |
228 | | static void SetAsLatestPersistentState(WriteBatch* b); |
229 | | static bool IsLatestPersistentState(const WriteBatch* b); |
230 | | |
231 | | static void SetDefaultColumnFamilyTimestampSize(WriteBatch* wb, |
232 | | size_t default_cf_ts_sz); |
233 | | |
234 | | static std::tuple<Status, uint32_t, size_t> GetColumnFamilyIdAndTimestampSize( |
235 | | WriteBatch* b, ColumnFamilyHandle* column_family); |
236 | | |
237 | 1.16M | static bool TimestampsUpdateNeeded(const WriteBatch& wb) { |
238 | 1.16M | return wb.needs_in_place_update_ts_; |
239 | 1.16M | } |
240 | | |
241 | 0 | static bool HasKeyWithTimestamp(const WriteBatch& wb) { |
242 | 0 | return wb.has_key_with_ts_; |
243 | 0 | } |
244 | | |
245 | | // Update per-key value protection information on this write batch. |
246 | | // If checksum is provided, the batch content is verfied against the checksum. |
247 | | static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key, |
248 | | uint64_t* checksum = nullptr); |
249 | | }; |
250 | | |
251 | | // LocalSavePoint is similar to a scope guard |
252 | | class LocalSavePoint { |
253 | | public: |
254 | | explicit LocalSavePoint(WriteBatch* batch) |
255 | | : batch_(batch), |
256 | | savepoint_(batch->GetDataSize(), batch->Count(), |
257 | | batch->content_flags_.load(std::memory_order_relaxed)) |
258 | | #ifndef NDEBUG |
259 | | , |
260 | | committed_(false) |
261 | | #endif |
262 | 1.16M | { |
263 | 1.16M | } |
264 | | |
265 | | #ifndef NDEBUG |
266 | | ~LocalSavePoint() { assert(committed_); } |
267 | | #endif |
268 | 1.16M | Status commit() { |
269 | | #ifndef NDEBUG |
270 | | committed_ = true; |
271 | | #endif |
272 | 1.16M | if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) { |
273 | 0 | batch_->rep_.resize(savepoint_.size); |
274 | 0 | WriteBatchInternal::SetCount(batch_, savepoint_.count); |
275 | 0 | if (batch_->prot_info_ != nullptr) { |
276 | 0 | batch_->prot_info_->entries_.resize(savepoint_.count); |
277 | 0 | } |
278 | 0 | batch_->content_flags_.store(savepoint_.content_flags, |
279 | 0 | std::memory_order_relaxed); |
280 | 0 | return Status::MemoryLimit(); |
281 | 0 | } |
282 | 1.16M | return Status::OK(); |
283 | 1.16M | } |
284 | | |
285 | | private: |
286 | | WriteBatch* batch_; |
287 | | SavePoint savepoint_; |
288 | | #ifndef NDEBUG |
289 | | bool committed_; |
290 | | #endif |
291 | | }; |
292 | | |
293 | | template <typename TimestampSizeFuncType> |
294 | | class TimestampUpdater : public WriteBatch::Handler { |
295 | | public: |
296 | | explicit TimestampUpdater(WriteBatch::ProtectionInfo* prot_info, |
297 | | TimestampSizeFuncType&& ts_sz_func, const Slice& ts) |
298 | | : prot_info_(prot_info), |
299 | | ts_sz_func_(std::move(ts_sz_func)), |
300 | 0 | timestamp_(ts) { |
301 | 0 | assert(!timestamp_.empty()); |
302 | 0 | } |
303 | | |
304 | 0 | ~TimestampUpdater() override {} |
305 | | |
306 | 0 | Status PutCF(uint32_t cf, const Slice& key, const Slice&) override { |
307 | 0 | return UpdateTimestamp(cf, key); |
308 | 0 | } |
309 | | |
310 | 0 | Status DeleteCF(uint32_t cf, const Slice& key) override { |
311 | 0 | return UpdateTimestamp(cf, key); |
312 | 0 | } |
313 | | |
314 | 0 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { |
315 | 0 | return UpdateTimestamp(cf, key); |
316 | 0 | } |
317 | | |
318 | | Status DeleteRangeCF(uint32_t cf, const Slice& begin_key, |
319 | 0 | const Slice& end_key) override { |
320 | 0 | Status s = UpdateTimestamp(cf, begin_key, true /* is_key */); |
321 | 0 | if (s.ok()) { |
322 | 0 | s = UpdateTimestamp(cf, end_key, false /* is_key */); |
323 | 0 | } |
324 | 0 | return s; |
325 | 0 | } |
326 | | |
327 | 0 | Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override { |
328 | 0 | return UpdateTimestamp(cf, key); |
329 | 0 | } |
330 | | |
331 | 0 | Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override { |
332 | 0 | return UpdateTimestamp(cf, key); |
333 | 0 | } |
334 | | |
335 | 0 | Status MarkBeginPrepare(bool) override { return Status::OK(); } |
336 | | |
337 | 0 | Status MarkEndPrepare(const Slice&) override { return Status::OK(); } |
338 | | |
339 | 0 | Status MarkCommit(const Slice&) override { return Status::OK(); } |
340 | | |
341 | 0 | Status MarkCommitWithTimestamp(const Slice&, const Slice&) override { |
342 | 0 | return Status::OK(); |
343 | 0 | } |
344 | | |
345 | 0 | Status MarkRollback(const Slice&) override { return Status::OK(); } |
346 | | |
347 | 0 | Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); } |
348 | | |
349 | | private: |
350 | | // @param is_key specifies whether the update is for key or value. |
351 | 0 | Status UpdateTimestamp(uint32_t cf, const Slice& buf, bool is_key = true) { |
352 | 0 | Status s = UpdateTimestampImpl(cf, buf, idx_, is_key); |
353 | 0 | ++idx_; |
354 | 0 | return s; |
355 | 0 | } |
356 | | |
357 | | Status UpdateTimestampImpl(uint32_t cf, const Slice& buf, size_t /*idx*/, |
358 | 0 | bool is_key) { |
359 | 0 | if (timestamp_.empty()) { |
360 | 0 | return Status::InvalidArgument("Timestamp is empty"); |
361 | 0 | } |
362 | 0 | size_t cf_ts_sz = ts_sz_func_(cf); |
363 | 0 | if (0 == cf_ts_sz) { |
364 | | // Skip this column family. |
365 | 0 | return Status::OK(); |
366 | 0 | } else if (std::numeric_limits<size_t>::max() == cf_ts_sz) { |
367 | | // Column family timestamp info not found. |
368 | 0 | return Status::NotFound(); |
369 | 0 | } else if (cf_ts_sz != timestamp_.size()) { |
370 | 0 | return Status::InvalidArgument("timestamp size mismatch"); |
371 | 0 | } |
372 | 0 | UpdateProtectionInformationIfNeeded(buf, timestamp_, is_key); |
373 | |
|
374 | 0 | char* ptr = const_cast<char*>(buf.data() + buf.size() - cf_ts_sz); |
375 | 0 | assert(ptr); |
376 | 0 | memcpy(ptr, timestamp_.data(), timestamp_.size()); |
377 | 0 | return Status::OK(); |
378 | 0 | } |
379 | | |
380 | | void UpdateProtectionInformationIfNeeded(const Slice& buf, const Slice& ts, |
381 | 0 | bool is_key) { |
382 | 0 | if (prot_info_ != nullptr) { |
383 | 0 | const size_t ts_sz = ts.size(); |
384 | 0 | SliceParts old(&buf, 1); |
385 | 0 | Slice old_no_ts(buf.data(), buf.size() - ts_sz); |
386 | 0 | std::array<Slice, 2> new_key_cmpts{{old_no_ts, ts}}; |
387 | 0 | SliceParts new_parts(new_key_cmpts.data(), 2); |
388 | 0 | if (is_key) { |
389 | 0 | prot_info_->entries_[idx_].UpdateK(old, new_parts); |
390 | 0 | } else { |
391 | 0 | prot_info_->entries_[idx_].UpdateV(old, new_parts); |
392 | 0 | } |
393 | 0 | } |
394 | 0 | } |
395 | | |
396 | | // No copy or move. |
397 | | TimestampUpdater(const TimestampUpdater&) = delete; |
398 | | TimestampUpdater(TimestampUpdater&&) = delete; |
399 | | TimestampUpdater& operator=(const TimestampUpdater&) = delete; |
400 | | TimestampUpdater& operator=(TimestampUpdater&&) = delete; |
401 | | |
402 | | WriteBatch::ProtectionInfo* const prot_info_ = nullptr; |
403 | | const TimestampSizeFuncType ts_sz_func_{}; |
404 | | const Slice timestamp_; |
405 | | size_t idx_ = 0; |
406 | | }; |
407 | | |
408 | | } // namespace ROCKSDB_NAMESPACE |