Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/transaction_base.h
Line
Count
Source
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
// This source code is licensed under both the GPLv2 (found in the
3
// COPYING file in the root directory) and Apache 2.0 License
4
// (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <stack>
9
#include <string>
10
#include <vector>
11
12
#include "db/write_batch_internal.h"
13
#include "rocksdb/db.h"
14
#include "rocksdb/slice.h"
15
#include "rocksdb/snapshot.h"
16
#include "rocksdb/status.h"
17
#include "rocksdb/types.h"
18
#include "rocksdb/utilities/transaction.h"
19
#include "rocksdb/utilities/transaction_db.h"
20
#include "rocksdb/utilities/write_batch_with_index.h"
21
#include "util/autovector.h"
22
#include "utilities/transactions/lock/lock_tracker.h"
23
#include "utilities/transactions/transaction_util.h"
24
25
namespace ROCKSDB_NAMESPACE {
26
27
class TransactionBaseImpl : public Transaction {
28
 public:
29
  TransactionBaseImpl(DB* db, const WriteOptions& write_options,
30
                      const LockTrackerFactory& lock_tracker_factory);
31
32
  ~TransactionBaseImpl() override;
33
34
  // Remove pending operations queued in this transaction.
35
  virtual void Clear();
36
37
  void Reinitialize(DB* db, const WriteOptions& write_options);
38
39
  // Called before executing Put, PutEntity, Merge, Delete, and GetForUpdate. If
40
  // TryLock returns non-OK, the Put/PutEntity/Merge/Delete/GetForUpdate will be
41
  // failed. do_validate will be false if called from PutUntracked,
42
  // PutEntityUntracked, DeleteUntracked, MergeUntracked, or
43
  // GetForUpdate(do_validate=false)
44
  virtual Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
45
                         bool read_only, bool exclusive,
46
                         const bool do_validate = true,
47
                         const bool assume_tracked = false) = 0;
48
49
  void SetSavePoint() override;
50
51
  Status RollbackToSavePoint() override;
52
53
  Status PopSavePoint() override;
54
55
  using Transaction::Get;
56
  Status Get(const ReadOptions& _read_options,
57
             ColumnFamilyHandle* column_family, const Slice& key,
58
             std::string* value) override;
59
60
  Status Get(const ReadOptions& _read_options,
61
             ColumnFamilyHandle* column_family, const Slice& key,
62
             PinnableSlice* value) override;
63
64
  Status Get(const ReadOptions& options, const Slice& key,
65
0
             std::string* value) override {
66
0
    return Get(options, db_->DefaultColumnFamily(), key, value);
67
0
  }
68
69
  Status GetEntity(const ReadOptions& options,
70
                   ColumnFamilyHandle* column_family, const Slice& key,
71
                   PinnableWideColumns* columns) override;
72
73
  using Transaction::GetForUpdate;
74
  Status GetForUpdate(const ReadOptions& options,
75
                      ColumnFamilyHandle* column_family, const Slice& key,
76
                      std::string* value, bool exclusive,
77
                      const bool do_validate) override;
78
79
  Status GetForUpdate(const ReadOptions& options,
80
                      ColumnFamilyHandle* column_family, const Slice& key,
81
                      PinnableSlice* pinnable_val, bool exclusive,
82
                      const bool do_validate) override;
83
84
  Status GetForUpdate(const ReadOptions& options, const Slice& key,
85
                      std::string* value, bool exclusive,
86
0
                      const bool do_validate) override {
87
0
    return GetForUpdate(options, db_->DefaultColumnFamily(), key, value,
88
0
                        exclusive, do_validate);
89
0
  }
90
91
  Status GetForUpdate(const ReadOptions& options, const Slice& key,
92
                      PinnableSlice* pinnable_val, bool exclusive,
93
0
                      const bool do_validate) override {
94
0
    return GetForUpdate(options, db_->DefaultColumnFamily(), key, pinnable_val,
95
0
                        exclusive, do_validate);
96
0
  }
97
98
  Status GetEntityForUpdate(const ReadOptions& read_options,
99
                            ColumnFamilyHandle* column_family, const Slice& key,
100
                            PinnableWideColumns* columns, bool exclusive = true,
101
                            bool do_validate = true) override;
102
103
  using Transaction::MultiGet;
104
  std::vector<Status> MultiGet(
105
      const ReadOptions& _read_options,
106
      const std::vector<ColumnFamilyHandle*>& column_family,
107
      const std::vector<Slice>& keys,
108
      std::vector<std::string>* values) override;
109
110
  std::vector<Status> MultiGet(const ReadOptions& options,
111
                               const std::vector<Slice>& keys,
112
0
                               std::vector<std::string>* values) override {
113
0
    return MultiGet(options,
114
0
                    std::vector<ColumnFamilyHandle*>(
115
0
                        keys.size(), db_->DefaultColumnFamily()),
116
0
                    keys, values);
117
0
  }
118
119
  void MultiGet(const ReadOptions& _read_options,
120
                ColumnFamilyHandle* column_family, const size_t num_keys,
121
                const Slice* keys, PinnableSlice* values, Status* statuses,
122
                const bool sorted_input = false) override;
123
124
  void MultiGetEntity(const ReadOptions& options,
125
                      ColumnFamilyHandle* column_family, size_t num_keys,
126
                      const Slice* keys, PinnableWideColumns* results,
127
                      Status* statuses, bool sorted_input = false) override;
128
129
  using Transaction::MultiGetForUpdate;
130
  std::vector<Status> MultiGetForUpdate(
131
      const ReadOptions& options,
132
      const std::vector<ColumnFamilyHandle*>& column_family,
133
      const std::vector<Slice>& keys,
134
      std::vector<std::string>* values) override;
135
136
  std::vector<Status> MultiGetForUpdate(
137
      const ReadOptions& options, const std::vector<Slice>& keys,
138
0
      std::vector<std::string>* values) override {
139
0
    return MultiGetForUpdate(options,
140
0
                             std::vector<ColumnFamilyHandle*>(
141
0
                                 keys.size(), db_->DefaultColumnFamily()),
142
0
                             keys, values);
143
0
  }
144
145
  Iterator* GetIterator(const ReadOptions& read_options) override;
146
  Iterator* GetIterator(const ReadOptions& read_options,
147
                        ColumnFamilyHandle* column_family) override;
148
149
  std::unique_ptr<Iterator> GetCoalescingIterator(
150
      const ReadOptions& read_options,
151
      const std::vector<ColumnFamilyHandle*>& column_families) override;
152
153
  std::unique_ptr<AttributeGroupIterator> GetAttributeGroupIterator(
154
      const ReadOptions& read_options,
155
      const std::vector<ColumnFamilyHandle*>& column_families) override;
156
157
  Status Put(ColumnFamilyHandle* column_family, const Slice& key,
158
             const Slice& value, const bool assume_tracked = false) override;
159
0
  Status Put(const Slice& key, const Slice& value) override {
160
0
    return Put(nullptr, key, value);
161
0
  }
162
163
  Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
164
             const SliceParts& value,
165
             const bool assume_tracked = false) override;
166
0
  Status Put(const SliceParts& key, const SliceParts& value) override {
167
0
    return Put(nullptr, key, value);
168
0
  }
169
170
  Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
171
                   const WideColumns& columns,
172
0
                   bool assume_tracked = false) override {
173
0
    const bool do_validate = !assume_tracked;
174
175
0
    return PutEntityImpl(column_family, key, columns, do_validate,
176
0
                         assume_tracked);
177
0
  }
178
179
  Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
180
               const Slice& value, const bool assume_tracked = false) override;
181
0
  Status Merge(const Slice& key, const Slice& value) override {
182
0
    return Merge(nullptr, key, value);
183
0
  }
184
185
  Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
186
                const bool assume_tracked = false) override;
187
0
  Status Delete(const Slice& key) override { return Delete(nullptr, key); }
188
  Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
189
                const bool assume_tracked = false) override;
190
0
  Status Delete(const SliceParts& key) override { return Delete(nullptr, key); }
191
192
  Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
193
                      const bool assume_tracked = false) override;
194
0
  Status SingleDelete(const Slice& key) override {
195
0
    return SingleDelete(nullptr, key);
196
0
  }
197
  Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
198
                      const bool assume_tracked = false) override;
199
0
  Status SingleDelete(const SliceParts& key) override {
200
0
    return SingleDelete(nullptr, key);
201
0
  }
202
203
  Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
204
                      const Slice& value) override;
205
0
  Status PutUntracked(const Slice& key, const Slice& value) override {
206
0
    return PutUntracked(nullptr, key, value);
207
0
  }
208
209
  Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
210
                      const SliceParts& value) override;
211
0
  Status PutUntracked(const SliceParts& key, const SliceParts& value) override {
212
0
    return PutUntracked(nullptr, key, value);
213
0
  }
214
215
  Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
216
0
                            const WideColumns& columns) override {
217
0
    constexpr bool do_validate = false;
218
0
    constexpr bool assume_tracked = false;
219
220
0
    return PutEntityImpl(column_family, key, columns, do_validate,
221
0
                         assume_tracked);
222
0
  }
223
224
  Status MergeUntracked(ColumnFamilyHandle* column_family, const Slice& key,
225
                        const Slice& value) override;
226
0
  Status MergeUntracked(const Slice& key, const Slice& value) override {
227
0
    return MergeUntracked(nullptr, key, value);
228
0
  }
229
230
  Status DeleteUntracked(ColumnFamilyHandle* column_family,
231
                         const Slice& key) override;
232
0
  Status DeleteUntracked(const Slice& key) override {
233
0
    return DeleteUntracked(nullptr, key);
234
0
  }
235
  Status DeleteUntracked(ColumnFamilyHandle* column_family,
236
                         const SliceParts& key) override;
237
0
  Status DeleteUntracked(const SliceParts& key) override {
238
0
    return DeleteUntracked(nullptr, key);
239
0
  }
240
241
  Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
242
                               const Slice& key) override;
243
0
  Status SingleDeleteUntracked(const Slice& key) override {
244
0
    return SingleDeleteUntracked(nullptr, key);
245
0
  }
246
247
  void PutLogData(const Slice& blob) override;
248
249
  WriteBatchWithIndex* GetWriteBatch() override;
250
251
0
  void SetLockTimeout(int64_t /*timeout*/) override { /* Do nothing */ }
252
253
0
  void SetDeadlockTimeout(int64_t /*timeout*/) override { /* Do nothing */ }
254
255
0
  const Snapshot* GetSnapshot() const override {
256
    // will return nullptr when there is no snapshot
257
0
    return snapshot_.get();
258
0
  }
259
260
0
  std::shared_ptr<const Snapshot> GetTimestampedSnapshot() const override {
261
0
    return snapshot_;
262
0
  }
263
264
  void SetSnapshot() override;
265
  void SetSnapshotOnNextOperation(
266
      std::shared_ptr<TransactionNotifier> notifier = nullptr) override;
267
268
0
  void ClearSnapshot() override {
269
0
    snapshot_.reset();
270
0
    snapshot_needed_ = false;
271
0
    snapshot_notifier_ = nullptr;
272
0
  }
273
274
0
  void DisableIndexing() override { indexing_enabled_ = false; }
275
276
0
  void EnableIndexing() override { indexing_enabled_ = true; }
277
278
0
  bool IndexingEnabled() const { return indexing_enabled_; }
279
280
  uint64_t GetElapsedTime() const override;
281
282
  uint64_t GetNumPuts() const override;
283
284
  uint64_t GetNumPutEntities() const override;
285
286
  uint64_t GetNumDeletes() const override;
287
288
  uint64_t GetNumMerges() const override;
289
290
  uint64_t GetNumKeys() const override;
291
292
  void UndoGetForUpdate(ColumnFamilyHandle* column_family,
293
                        const Slice& key) override;
294
0
  void UndoGetForUpdate(const Slice& key) override {
295
0
    return UndoGetForUpdate(nullptr, key);
296
0
  }
297
298
0
  WriteOptions* GetWriteOptions() override { return &write_options_; }
299
300
0
  void SetWriteOptions(const WriteOptions& write_options) override {
301
0
    write_options_ = write_options;
302
0
  }
303
304
  // Used for memory management for snapshot_
305
  void ReleaseSnapshot(const Snapshot* snapshot, DB* db);
306
307
  // iterates over the given batch and makes the appropriate inserts.
308
  // used for rebuilding prepared transactions after recovery.
309
  Status RebuildFromWriteBatch(WriteBatch* src_batch) override;
310
311
  WriteBatch* GetCommitTimeWriteBatch() override;
312
313
0
  LockTracker& GetTrackedLocks() { return *tracked_locks_; }
314
315
 protected:
316
0
  ColumnFamilyHandle* DefaultColumnFamily() const {
317
0
    assert(db_);
318
0
    return db_->DefaultColumnFamily();
319
0
  }
320
321
  template <typename IterType, typename ImplType,
322
            typename ErrorIteratorFuncType>
323
  std::unique_ptr<IterType> NewMultiCfIterator(
324
      const ReadOptions& read_options,
325
      const std::vector<ColumnFamilyHandle*>& column_families,
326
      ErrorIteratorFuncType error_iterator_func);
327
328
  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
329
                 const Slice& key, std::string* value) override;
330
331
  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
332
                 const Slice& key, PinnableSlice* value) override;
333
334
  Status GetEntityImpl(const ReadOptions& options,
335
                       ColumnFamilyHandle* column_family, const Slice& key,
336
0
                       PinnableWideColumns* columns) {
337
0
    return write_batch_.GetEntityFromBatchAndDB(db_, options, column_family,
338
0
                                                key, columns);
339
0
  }
340
341
  void MultiGetEntityImpl(const ReadOptions& options,
342
                          ColumnFamilyHandle* column_family, size_t num_keys,
343
                          const Slice* keys, PinnableWideColumns* results,
344
0
                          Status* statuses, bool sorted_input) {
345
0
    write_batch_.MultiGetEntityFromBatchAndDB(db_, options, column_family,
346
0
                                              num_keys, keys, results, statuses,
347
0
                                              sorted_input);
348
0
  }
349
350
  Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key,
351
                       const WideColumns& columns, bool do_validate,
352
                       bool assume_tracked);
353
354
  // Add a key to the list of tracked keys.
355
  //
356
  // seqno is the earliest seqno this key was involved with this transaction.
357
  // readonly should be set to true if no data was written for this key
358
  void TrackKey(uint32_t cfh_id, const std::string& key, SequenceNumber seqno,
359
                bool readonly, bool exclusive);
360
361
  // Called when UndoGetForUpdate determines that this key can be unlocked.
362
  virtual void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
363
                                  const Slice& key) = 0;
364
365
  // Sets a snapshot if SetSnapshotOnNextOperation() has been called.
366
  void SetSnapshotIfNeeded();
367
368
  // Initialize write_batch_ for 2PC by inserting Noop.
369
0
  inline void InitWriteBatch(bool clear = false) {
370
0
    if (clear) {
371
0
      write_batch_.Clear();
372
0
    }
373
0
    assert(write_batch_.GetDataSize() == WriteBatchInternal::kHeader);
374
0
    auto s = WriteBatchInternal::InsertNoop(write_batch_.GetWriteBatch());
375
0
    assert(s.ok());
376
0
  }
377
378
  WriteBatchBase* GetBatchForWrite();
379
380
  DB* db_;
381
  DBImpl* dbimpl_;
382
383
  WriteOptions write_options_;
384
385
  const Comparator* cmp_;
386
387
  const LockTrackerFactory& lock_tracker_factory_;
388
389
  // Stores that time the txn was constructed, in microseconds.
390
  uint64_t start_time_;
391
392
  // Stores the current snapshot that was set by SetSnapshot or null if
393
  // no snapshot is currently set.
394
  std::shared_ptr<const Snapshot> snapshot_;
395
396
  // Count of various operations pending in this transaction
397
  uint64_t num_puts_ = 0;
398
  uint64_t num_put_entities_ = 0;
399
  uint64_t num_deletes_ = 0;
400
  uint64_t num_merges_ = 0;
401
402
  struct SavePoint {
403
    std::shared_ptr<const Snapshot> snapshot_;
404
    bool snapshot_needed_ = false;
405
    std::shared_ptr<TransactionNotifier> snapshot_notifier_;
406
    uint64_t num_puts_ = 0;
407
    uint64_t num_put_entities_ = 0;
408
    uint64_t num_deletes_ = 0;
409
    uint64_t num_merges_ = 0;
410
411
    // Record all locks tracked since the last savepoint
412
    std::shared_ptr<LockTracker> new_locks_;
413
414
    SavePoint(std::shared_ptr<const Snapshot> snapshot, bool snapshot_needed,
415
              std::shared_ptr<TransactionNotifier> snapshot_notifier,
416
              uint64_t num_puts, uint64_t num_put_entities,
417
              uint64_t num_deletes, uint64_t num_merges,
418
              const LockTrackerFactory& lock_tracker_factory)
419
0
        : snapshot_(snapshot),
420
0
          snapshot_needed_(snapshot_needed),
421
0
          snapshot_notifier_(snapshot_notifier),
422
0
          num_puts_(num_puts),
423
0
          num_put_entities_(num_put_entities),
424
0
          num_deletes_(num_deletes),
425
0
          num_merges_(num_merges),
426
0
          new_locks_(lock_tracker_factory.Create()) {}
427
428
    explicit SavePoint(const LockTrackerFactory& lock_tracker_factory)
429
0
        : new_locks_(lock_tracker_factory.Create()) {}
430
  };
431
432
  // Records writes pending in this transaction
433
  WriteBatchWithIndex write_batch_;
434
435
  // For Pessimistic Transactions this is the set of acquired locks.
436
  // Optimistic Transactions will keep note the requested locks (not actually
437
  // locked), and do conflict checking until commit time based on the tracked
438
  // lock requests.
439
  std::unique_ptr<LockTracker> tracked_locks_;
440
441
  // Stack of the Snapshot saved at each save point. Saved snapshots may be
442
  // nullptr if there was no snapshot at the time SetSavePoint() was called.
443
  std::unique_ptr<std::stack<TransactionBaseImpl::SavePoint,
444
                             autovector<TransactionBaseImpl::SavePoint>>>
445
      save_points_;
446
447
 private:
448
  friend class WriteCommittedTxn;
449
  friend class WritePreparedTxn;
450
451
  // Extra data to be persisted with the commit. Note this is only used when
452
  // prepare phase is not skipped.
453
  WriteBatch commit_time_batch_;
454
455
  // If true, future Put/PutEntity/Merge/Delete operations will be indexed in
456
  // the WriteBatchWithIndex. If false, future Put/PutEntity/Merge/Delete
457
  // operations will be inserted directly into the underlying WriteBatch and not
458
  // indexed in the WriteBatchWithIndex.
459
  bool indexing_enabled_;
460
461
  // SetSnapshotOnNextOperation() has been called and the snapshot has not yet
462
  // been reset.
463
  bool snapshot_needed_ = false;
464
465
  // SetSnapshotOnNextOperation() has been called and the caller would like
466
  // a notification through the TransactionNotifier interface
467
  std::shared_ptr<TransactionNotifier> snapshot_notifier_ = nullptr;
468
469
  Status TryLock(ColumnFamilyHandle* column_family, const SliceParts& key,
470
                 bool read_only, bool exclusive, const bool do_validate = true,
471
                 const bool assume_tracked = false);
472
473
  void SetSnapshotInternal(const Snapshot* snapshot);
474
};
475
476
}  // namespace ROCKSDB_NAMESPACE