Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/db/write_batch_internal.h
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
//
6
// Copyright (c) 2011 The LevelDB Authors. All rights reserved.
7
// Use of this source code is governed by a BSD-style license that can be
8
// found in the LICENSE file. See the AUTHORS file for names of contributors.
9
10
#pragma once
11
#include <array>
12
#include <vector>
13
14
#include "db/flush_scheduler.h"
15
#include "db/kv_checksum.h"
16
#include "db/trim_history_scheduler.h"
17
#include "db/write_thread.h"
18
#include "rocksdb/db.h"
19
#include "rocksdb/options.h"
20
#include "rocksdb/types.h"
21
#include "rocksdb/write_batch.h"
22
#include "util/autovector.h"
23
#include "util/cast_util.h"
24
25
namespace ROCKSDB_NAMESPACE {
26
27
class MemTable;
28
class FlushScheduler;
29
class ColumnFamilyData;
30
31
class ColumnFamilyMemTables {
32
 public:
33
11.0k
  virtual ~ColumnFamilyMemTables() {}
34
  virtual bool Seek(uint32_t column_family_id) = 0;
35
  // returns true if the update to memtable should be ignored
36
  // (useful when recovering from log whose updates have already
37
  // been processed)
38
  virtual uint64_t GetLogNumber() const = 0;
39
  virtual MemTable* GetMemTable() const = 0;
40
  virtual ColumnFamilyHandle* GetColumnFamilyHandle() = 0;
41
0
  virtual ColumnFamilyData* current() { return nullptr; }
42
};
43
44
class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
45
 public:
46
  explicit ColumnFamilyMemTablesDefault(MemTable* mem)
47
0
      : ok_(false), mem_(mem) {}
48
49
0
  bool Seek(uint32_t column_family_id) override {
50
0
    ok_ = (column_family_id == 0);
51
0
    return ok_;
52
0
  }
53
54
0
  uint64_t GetLogNumber() const override { return 0; }
55
56
0
  MemTable* GetMemTable() const override {
57
0
    assert(ok_);
58
0
    return mem_;
59
0
  }
60
61
0
  ColumnFamilyHandle* GetColumnFamilyHandle() override { return nullptr; }
62
63
 private:
64
  bool ok_;
65
  MemTable* mem_;
66
};
67
68
struct WriteBatch::ProtectionInfo {
69
  // `WriteBatch` usually doesn't contain a huge number of keys so protecting
70
  // with a fixed, non-configurable eight bytes per key may work well enough.
71
  autovector<ProtectionInfoKVOC64> entries_;
72
73
0
  size_t GetBytesPerKey() const { return 8; }
74
};
75
76
// WriteBatchInternal provides static methods for manipulating a
77
// WriteBatch that we don't want in the public WriteBatch interface.
78
class WriteBatchInternal {
79
 public:
80
  // WriteBatch header has an 8-byte sequence number followed by a 4-byte count.
81
  static constexpr size_t kHeader = 12;
82
83
  // WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
84
  static Status Put(WriteBatch* batch, uint32_t column_family_id,
85
                    const Slice& key, const Slice& value);
86
87
  static Status Put(WriteBatch* batch, uint32_t column_family_id,
88
                    const SliceParts& key, const SliceParts& value);
89
90
  static Status TimedPut(WriteBatch* batch, uint32_t column_family_id,
91
                         const Slice& key, const Slice& value,
92
                         uint64_t unix_write_time);
93
94
  static Status PutEntity(WriteBatch* batch, uint32_t column_family_id,
95
                          const Slice& key, const WideColumns& columns);
96
97
  static Status Delete(WriteBatch* batch, uint32_t column_family_id,
98
                       const SliceParts& key);
99
100
  static Status Delete(WriteBatch* batch, uint32_t column_family_id,
101
                       const Slice& key);
102
103
  static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
104
                             const SliceParts& key);
105
106
  static Status SingleDelete(WriteBatch* batch, uint32_t column_family_id,
107
                             const Slice& key);
108
109
  static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
110
                            const Slice& begin_key, const Slice& end_key);
111
112
  static Status DeleteRange(WriteBatch* b, uint32_t column_family_id,
113
                            const SliceParts& begin_key,
114
                            const SliceParts& end_key);
115
116
  static Status Merge(WriteBatch* batch, uint32_t column_family_id,
117
                      const Slice& key, const Slice& value);
118
119
  static Status Merge(WriteBatch* batch, uint32_t column_family_id,
120
                      const SliceParts& key, const SliceParts& value);
121
122
  static Status PutBlobIndex(WriteBatch* batch, uint32_t column_family_id,
123
                             const Slice& key, const Slice& value);
124
125
  static Status MarkEndPrepare(WriteBatch* batch, const Slice& xid,
126
                               const bool write_after_commit = true,
127
                               const bool unprepared_batch = false);
128
129
  static Status MarkRollback(WriteBatch* batch, const Slice& xid);
130
131
  static Status MarkCommit(WriteBatch* batch, const Slice& xid);
132
133
  static Status MarkCommitWithTimestamp(WriteBatch* batch, const Slice& xid,
134
                                        const Slice& commit_ts);
135
136
  static Status InsertNoop(WriteBatch* batch);
137
138
  // Return the number of entries in the batch.
139
  static uint32_t Count(const WriteBatch* batch);
140
141
  // Set the count for the number of entries in the batch.
142
  static void SetCount(WriteBatch* batch, uint32_t n);
143
144
  // Return the sequence number for the start of this batch.
145
  static SequenceNumber Sequence(const WriteBatch* batch);
146
147
  // Store the specified number as the sequence number for the start of
148
  // this batch.
149
  static void SetSequence(WriteBatch* batch, SequenceNumber seq);
150
151
  // Returns the offset of the first entry in the batch.
152
  // This offset is only valid if the batch is not empty.
153
  static size_t GetFirstOffset(WriteBatch* batch);
154
155
1.16M
  static Slice Contents(const WriteBatch* batch) { return Slice(batch->rep_); }
156
157
2.32M
  static size_t ByteSize(const WriteBatch* batch) { return batch->rep_.size(); }
158
159
  static Status SetContents(WriteBatch* batch, const Slice& contents);
160
161
  static Status CheckSlicePartsLength(const SliceParts& key,
162
                                      const SliceParts& value);
163
164
  // Inserts batches[i] into memtable, for i in 0..num_batches-1 inclusive.
165
  //
166
  // If ignore_missing_column_families == true. WriteBatch
167
  // referencing non-existing column family will be ignored.
168
  // If ignore_missing_column_families == false, processing of the
169
  // batches will be stopped if a reference is found to a non-existing
170
  // column family and InvalidArgument() will be returned.  The writes
171
  // in batches may be only partially applied at that point.
172
  //
173
  // If log_number is non-zero, the memtable will be updated only if
174
  // memtables->GetLogNumber() >= log_number.
175
  //
176
  // If flush_scheduler is non-null, it will be invoked if the memtable
177
  // should be flushed.
178
  //
179
  // Under concurrent use, the caller is responsible for making sure that
180
  // the memtables object itself is thread-local.
181
  static Status InsertInto(
182
      WriteThread::WriteGroup& write_group, SequenceNumber sequence,
183
      ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler,
184
      TrimHistoryScheduler* trim_history_scheduler,
185
      bool ignore_missing_column_families = false, uint64_t log_number = 0,
186
      DB* db = nullptr, bool concurrent_memtable_writes = false,
187
      bool seq_per_batch = false, bool batch_per_txn = true);
188
189
  // Convenience form of InsertInto when you have only one batch
190
  // next_seq returns the seq after last sequence number used in MemTable insert
191
  static Status InsertInto(
192
      const WriteBatch* batch, ColumnFamilyMemTables* memtables,
193
      FlushScheduler* flush_scheduler,
194
      TrimHistoryScheduler* trim_history_scheduler,
195
      bool ignore_missing_column_families = false, uint64_t log_number = 0,
196
      DB* db = nullptr, bool concurrent_memtable_writes = false,
197
      SequenceNumber* next_seq = nullptr, bool* has_valid_writes = nullptr,
198
      bool seq_per_batch = false, bool batch_per_txn = true);
199
200
  static Status InsertInto(WriteThread::Writer* writer, SequenceNumber sequence,
201
                           ColumnFamilyMemTables* memtables,
202
                           FlushScheduler* flush_scheduler,
203
                           TrimHistoryScheduler* trim_history_scheduler,
204
                           bool ignore_missing_column_families = false,
205
                           uint64_t log_number = 0, DB* db = nullptr,
206
                           bool concurrent_memtable_writes = false,
207
                           bool seq_per_batch = false, size_t batch_cnt = 0,
208
                           bool batch_per_txn = true,
209
                           bool hint_per_batch = false);
210
211
  // Appends src write batch to dst write batch and updates count in dst
212
  // write batch. Returns OK if the append is successful. Checks number of
213
  // checksum against count in dst and src write batches, and returns Corruption
214
  // if the count is inconsistent.
215
  static Status Append(WriteBatch* dst, const WriteBatch* src,
216
                       const bool WAL_only = false);
217
218
  // Returns the byte size of appending a WriteBatch with ByteSize
219
  // leftByteSize and a WriteBatch with ByteSize rightByteSize
220
  static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
221
222
  // Iterate over [begin, end) range of a write batch
223
  static Status Iterate(const WriteBatch* wb, WriteBatch::Handler* handler,
224
                        size_t begin, size_t end);
225
226
  // This write batch includes the latest state that should be persisted. Such
227
  // state meant to be used only during recovery.
228
  static void SetAsLatestPersistentState(WriteBatch* b);
229
  static bool IsLatestPersistentState(const WriteBatch* b);
230
231
  static void SetDefaultColumnFamilyTimestampSize(WriteBatch* wb,
232
                                                  size_t default_cf_ts_sz);
233
234
  static std::tuple<Status, uint32_t, size_t> GetColumnFamilyIdAndTimestampSize(
235
      WriteBatch* b, ColumnFamilyHandle* column_family);
236
237
1.16M
  static bool TimestampsUpdateNeeded(const WriteBatch& wb) {
238
1.16M
    return wb.needs_in_place_update_ts_;
239
1.16M
  }
240
241
0
  static bool HasKeyWithTimestamp(const WriteBatch& wb) {
242
0
    return wb.has_key_with_ts_;
243
0
  }
244
245
  // Update per-key value protection information on this write batch.
246
  // If checksum is provided, the batch content is verfied against the checksum.
247
  static Status UpdateProtectionInfo(WriteBatch* wb, size_t bytes_per_key,
248
                                     uint64_t* checksum = nullptr);
249
};
250
251
// LocalSavePoint is similar to a scope guard
252
class LocalSavePoint {
253
 public:
254
  explicit LocalSavePoint(WriteBatch* batch)
255
      : batch_(batch),
256
        savepoint_(batch->GetDataSize(), batch->Count(),
257
                   batch->content_flags_.load(std::memory_order_relaxed))
258
#ifndef NDEBUG
259
        ,
260
        committed_(false)
261
#endif
262
1.16M
  {
263
1.16M
  }
264
265
#ifndef NDEBUG
266
  ~LocalSavePoint() { assert(committed_); }
267
#endif
268
1.16M
  Status commit() {
269
#ifndef NDEBUG
270
    committed_ = true;
271
#endif
272
1.16M
    if (batch_->max_bytes_ && batch_->rep_.size() > batch_->max_bytes_) {
273
0
      batch_->rep_.resize(savepoint_.size);
274
0
      WriteBatchInternal::SetCount(batch_, savepoint_.count);
275
0
      if (batch_->prot_info_ != nullptr) {
276
0
        batch_->prot_info_->entries_.resize(savepoint_.count);
277
0
      }
278
0
      batch_->content_flags_.store(savepoint_.content_flags,
279
0
                                   std::memory_order_relaxed);
280
0
      return Status::MemoryLimit();
281
0
    }
282
1.16M
    return Status::OK();
283
1.16M
  }
284
285
 private:
286
  WriteBatch* batch_;
287
  SavePoint savepoint_;
288
#ifndef NDEBUG
289
  bool committed_;
290
#endif
291
};
292
293
template <typename TimestampSizeFuncType>
294
class TimestampUpdater : public WriteBatch::Handler {
295
 public:
296
  explicit TimestampUpdater(WriteBatch::ProtectionInfo* prot_info,
297
                            TimestampSizeFuncType&& ts_sz_func, const Slice& ts)
298
      : prot_info_(prot_info),
299
        ts_sz_func_(std::move(ts_sz_func)),
300
0
        timestamp_(ts) {
301
0
    assert(!timestamp_.empty());
302
0
  }
303
304
0
  ~TimestampUpdater() override {}
305
306
0
  Status PutCF(uint32_t cf, const Slice& key, const Slice&) override {
307
0
    return UpdateTimestamp(cf, key);
308
0
  }
309
310
0
  Status DeleteCF(uint32_t cf, const Slice& key) override {
311
0
    return UpdateTimestamp(cf, key);
312
0
  }
313
314
0
  Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
315
0
    return UpdateTimestamp(cf, key);
316
0
  }
317
318
  Status DeleteRangeCF(uint32_t cf, const Slice& begin_key,
319
0
                       const Slice& end_key) override {
320
0
    Status s = UpdateTimestamp(cf, begin_key, true /* is_key */);
321
0
    if (s.ok()) {
322
0
      s = UpdateTimestamp(cf, end_key, false /* is_key */);
323
0
    }
324
0
    return s;
325
0
  }
326
327
0
  Status MergeCF(uint32_t cf, const Slice& key, const Slice&) override {
328
0
    return UpdateTimestamp(cf, key);
329
0
  }
330
331
0
  Status PutBlobIndexCF(uint32_t cf, const Slice& key, const Slice&) override {
332
0
    return UpdateTimestamp(cf, key);
333
0
  }
334
335
0
  Status MarkBeginPrepare(bool) override { return Status::OK(); }
336
337
0
  Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
338
339
0
  Status MarkCommit(const Slice&) override { return Status::OK(); }
340
341
0
  Status MarkCommitWithTimestamp(const Slice&, const Slice&) override {
342
0
    return Status::OK();
343
0
  }
344
345
0
  Status MarkRollback(const Slice&) override { return Status::OK(); }
346
347
0
  Status MarkNoop(bool /*empty_batch*/) override { return Status::OK(); }
348
349
 private:
350
  // @param is_key specifies whether the update is for key or value.
351
0
  Status UpdateTimestamp(uint32_t cf, const Slice& buf, bool is_key = true) {
352
0
    Status s = UpdateTimestampImpl(cf, buf, idx_, is_key);
353
0
    ++idx_;
354
0
    return s;
355
0
  }
356
357
  Status UpdateTimestampImpl(uint32_t cf, const Slice& buf, size_t /*idx*/,
358
0
                             bool is_key) {
359
0
    if (timestamp_.empty()) {
360
0
      return Status::InvalidArgument("Timestamp is empty");
361
0
    }
362
0
    size_t cf_ts_sz = ts_sz_func_(cf);
363
0
    if (0 == cf_ts_sz) {
364
      // Skip this column family.
365
0
      return Status::OK();
366
0
    } else if (std::numeric_limits<size_t>::max() == cf_ts_sz) {
367
      // Column family timestamp info not found.
368
0
      return Status::NotFound();
369
0
    } else if (cf_ts_sz != timestamp_.size()) {
370
0
      return Status::InvalidArgument("timestamp size mismatch");
371
0
    }
372
0
    UpdateProtectionInformationIfNeeded(buf, timestamp_, is_key);
373
374
0
    char* ptr = const_cast<char*>(buf.data() + buf.size() - cf_ts_sz);
375
0
    assert(ptr);
376
0
    memcpy(ptr, timestamp_.data(), timestamp_.size());
377
0
    return Status::OK();
378
0
  }
379
380
  void UpdateProtectionInformationIfNeeded(const Slice& buf, const Slice& ts,
381
0
                                           bool is_key) {
382
0
    if (prot_info_ != nullptr) {
383
0
      const size_t ts_sz = ts.size();
384
0
      SliceParts old(&buf, 1);
385
0
      Slice old_no_ts(buf.data(), buf.size() - ts_sz);
386
0
      std::array<Slice, 2> new_key_cmpts{{old_no_ts, ts}};
387
0
      SliceParts new_parts(new_key_cmpts.data(), 2);
388
0
      if (is_key) {
389
0
        prot_info_->entries_[idx_].UpdateK(old, new_parts);
390
0
      } else {
391
0
        prot_info_->entries_[idx_].UpdateV(old, new_parts);
392
0
      }
393
0
    }
394
0
  }
395
396
  // No copy or move.
397
  TimestampUpdater(const TimestampUpdater&) = delete;
398
  TimestampUpdater(TimestampUpdater&&) = delete;
399
  TimestampUpdater& operator=(const TimestampUpdater&) = delete;
400
  TimestampUpdater& operator=(TimestampUpdater&&) = delete;
401
402
  WriteBatch::ProtectionInfo* const prot_info_ = nullptr;
403
  const TimestampSizeFuncType ts_sz_func_{};
404
  const Slice timestamp_;
405
  size_t idx_ = 0;
406
};
407
408
}  // namespace ROCKSDB_NAMESPACE