Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/pessimistic_transaction.h
Line
Count
Source
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <algorithm>
9
#include <atomic>
10
#include <mutex>
11
#include <stack>
12
#include <string>
13
#include <unordered_map>
14
#include <vector>
15
16
#include "db/write_callback.h"
17
#include "rocksdb/db.h"
18
#include "rocksdb/slice.h"
19
#include "rocksdb/snapshot.h"
20
#include "rocksdb/status.h"
21
#include "rocksdb/types.h"
22
#include "rocksdb/utilities/transaction.h"
23
#include "rocksdb/utilities/transaction_db.h"
24
#include "rocksdb/utilities/write_batch_with_index.h"
25
#include "util/autovector.h"
26
#include "utilities/transactions/transaction_base.h"
27
#include "utilities/transactions/transaction_util.h"
28
29
namespace ROCKSDB_NAMESPACE {
30
31
class PessimisticTransactionDB;
32
33
// A transaction under pessimistic concurrency control. This class implements
34
// the locking API and interfaces with the lock manager as well as the
35
// pessimistic transactional db.
36
class PessimisticTransaction : public TransactionBaseImpl {
37
 public:
38
  PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options,
39
                         const TransactionOptions& txn_options,
40
                         const bool init = true);
41
  // No copying allowed
42
  PessimisticTransaction(const PessimisticTransaction&) = delete;
43
  void operator=(const PessimisticTransaction&) = delete;
44
45
  ~PessimisticTransaction() override;
46
47
  void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options,
48
                    const TransactionOptions& txn_options);
49
50
  Status Prepare() override;
51
52
  Status Commit() override;
53
54
  // It is basically Commit without going through Prepare phase. The write batch
55
  // is also directly provided instead of expecting txn to gradually batch the
56
  // transactions writes to an internal write batch.
57
  Status CommitBatch(WriteBatch* batch);
58
59
  Status Rollback() override;
60
61
  Status RollbackToSavePoint() override;
62
63
  Status SetName(const TransactionName& name) override;
64
65
  // Generate a new unique transaction identifier
66
  static TransactionID GenTxnID();
67
68
0
  TransactionID GetID() const override { return txn_id_; }
69
70
  std::vector<TransactionID> GetWaitingTxns(uint32_t* column_family_id,
71
0
                                            std::string* key) const override {
72
0
    std::lock_guard<std::mutex> lock(wait_mutex_);
73
0
    std::vector<TransactionID> ids(waiting_txn_ids_.size());
74
0
    if (timed_out_key_.has_value()) {
75
0
      if (key) *key = timed_out_key_.value();
76
0
    } else {
77
0
      if (key) *key = waiting_key_ ? *waiting_key_ : "";
78
0
    }
79
0
    if (column_family_id) *column_family_id = waiting_cf_id_;
80
0
    std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin());
81
0
    return ids;
82
0
  }
83
84
  void SetWaitingTxn(autovector<TransactionID>& ids, uint32_t column_family_id,
85
0
                     const std::string* key, bool is_timed_out = false) {
86
0
    std::lock_guard<std::mutex> lock(wait_mutex_);
87
0
    waiting_txn_ids_ = ids;
88
0
    waiting_cf_id_ = column_family_id;
89
0
    if (is_timed_out) {
90
0
      timed_out_key_ = key ? *key : "";
91
0
    } else {
92
0
      waiting_key_ = key;
93
0
    }
94
0
  }
95
96
0
  void ClearWaitingTxn() {
97
0
    std::lock_guard<std::mutex> lock(wait_mutex_);
98
0
    waiting_txn_ids_.clear();
99
0
    waiting_cf_id_ = 0;
100
0
    waiting_key_ = nullptr;
101
0
  }
102
103
  // Returns the time (in microseconds according to Env->GetMicros())
104
  // that this transaction will be expired.  Returns 0 if this transaction does
105
  // not expire.
106
0
  uint64_t GetExpirationTime() const { return expiration_time_; }
107
108
  // returns true if this transaction has an expiration_time and has expired.
109
  bool IsExpired() const;
110
111
  // Returns the number of microseconds a transaction can wait on acquiring a
112
  // lock or -1 if there is no timeout.
113
0
  int64_t GetLockTimeout() const { return lock_timeout_; }
114
0
  void SetLockTimeout(int64_t timeout) override {
115
0
    lock_timeout_ = timeout * 1000;
116
0
  }
117
0
  int64_t GetDeadlockTimeout() const { return deadlock_timeout_us_; }
118
0
  void SetDeadlockTimeout(int64_t timeout_ms) override {
119
0
    deadlock_timeout_us_ = timeout_ms * 1000;
120
0
  }
121
122
  // Returns true if locks were stolen successfully, false otherwise.
123
  bool TryStealingLocks();
124
125
0
  bool IsDeadlockDetect() const override { return deadlock_detect_; }
126
127
0
  int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
128
129
  Status GetRangeLock(ColumnFamilyHandle* column_family,
130
                      const Endpoint& start_key,
131
                      const Endpoint& end_key) override;
132
133
  Status CollapseKey(const ReadOptions& options, const Slice& key,
134
                     ColumnFamilyHandle* column_family = nullptr) override;
135
136
 protected:
137
  virtual Status PrepareInternal() = 0;
138
139
  virtual Status CommitWithoutPrepareInternal() = 0;
140
141
  // batch_cnt if non-zero is the number of sub-batches. A sub-batch is a batch
142
  // with no duplicate keys. If zero, then the number of sub-batches is unknown.
143
  virtual Status CommitBatchInternal(WriteBatch* batch,
144
                                     size_t batch_cnt = 0) = 0;
145
146
  virtual Status CommitInternal() = 0;
147
148
  virtual Status RollbackInternal() = 0;
149
150
  virtual void Initialize(const TransactionOptions& txn_options);
151
152
  Status LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock);
153
154
  Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
155
                 bool read_only, bool exclusive, const bool do_validate = true,
156
                 const bool assume_tracked = false) override;
157
158
  void Clear() override;
159
160
  PessimisticTransactionDB* txn_db_impl_;
161
  DBImpl* db_impl_;
162
163
  // If non-zero, this transaction should not be committed after this time (in
164
  // microseconds according to Env->NowMicros())
165
  uint64_t expiration_time_;
166
167
  // Timestamp used by the transaction to perform all GetForUpdate.
168
  // Use this timestamp for conflict checking.
169
  // read_timestamp_ == kMaxTxnTimestamp means this transaction has not
170
  // performed any GetForUpdate. It is possible that the transaction has
171
  // performed blind writes or Get, though.
172
  TxnTimestamp read_timestamp_{kMaxTxnTimestamp};
173
  TxnTimestamp commit_timestamp_{kMaxTxnTimestamp};
174
175
  // Refer to
176
  // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery
177
  bool use_only_the_last_commit_time_batch_for_recovery_ = false;
178
  // Refer to
179
  // TransactionOptions::skip_prepare
180
  bool skip_prepare_ = false;
181
  // Refer to TransactionOptions::commit_bypass_memtable
182
  uint32_t commit_bypass_memtable_threshold_ =
183
      std::numeric_limits<uint32_t>::max();
184
  uint64_t commit_bypass_memtable_byte_threshold_ =
185
      std::numeric_limits<uint64_t>::max();
186
187
 private:
188
  friend class TransactionTest_ValidateSnapshotTest_Test;
189
  // Used to create unique ids for transactions.
190
  static std::atomic<TransactionID> txn_id_counter_;
191
192
  // Unique ID for this transaction
193
  TransactionID txn_id_;
194
195
  // IDs for the transactions that are blocking the current transaction.
196
  //
197
  // empty if current transaction is not waiting or has timed out
198
  autovector<TransactionID> waiting_txn_ids_;
199
200
  // The following two represents the (cf, key) that a transaction is waiting
201
  // on.
202
  //
203
  // If waiting_key_ is not null, then the pointer should always point to
204
  // a valid string object. The reason is that it is only non-null when the
205
  // transaction is blocked in the PointLockManager::AcquireWithTimeout
206
  // function. At that point, the key string object is one of the function
207
  // parameters.
208
  uint32_t waiting_cf_id_;
209
  const std::string* waiting_key_;
210
211
  // Waiting key with lifetime of the txn so it can be accessed after timeouts
212
  std::optional<std::string> timed_out_key_;
213
214
  // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_.
215
  mutable std::mutex wait_mutex_;
216
217
  // Timeout in microseconds when locking a key or -1 if there is no timeout.
218
  int64_t lock_timeout_;
219
220
  // Timeout in microseconds before perform dead lock detection.
221
  // If 0, deadlock detection will be performed immediately.
222
  int64_t deadlock_timeout_us_;
223
224
  // Whether to perform deadlock detection or not.
225
  bool deadlock_detect_;
226
227
  // Whether to perform deadlock detection or not.
228
  int64_t deadlock_detect_depth_;
229
230
  // Refer to TransactionOptions::skip_concurrency_control
231
  bool skip_concurrency_control_;
232
233
  virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
234
                                  const Slice& key,
235
                                  SequenceNumber* tracked_at_seq);
236
237
  void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
238
                          const Slice& key) override;
239
};
240
241
class WriteCommittedTxn : public PessimisticTransaction {
242
 public:
243
  WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options,
244
                    const TransactionOptions& txn_options);
245
  // No copying allowed
246
  WriteCommittedTxn(const WriteCommittedTxn&) = delete;
247
  void operator=(const WriteCommittedTxn&) = delete;
248
249
0
  ~WriteCommittedTxn() override {}
250
251
  using TransactionBaseImpl::GetForUpdate;
252
  Status GetForUpdate(const ReadOptions& read_options,
253
                      ColumnFamilyHandle* column_family, const Slice& key,
254
                      std::string* value, bool exclusive,
255
                      const bool do_validate) override;
256
  Status GetForUpdate(const ReadOptions& read_options,
257
                      ColumnFamilyHandle* column_family, const Slice& key,
258
                      PinnableSlice* pinnable_val, bool exclusive,
259
                      const bool do_validate) override;
260
261
  Status GetEntityForUpdate(const ReadOptions& read_options,
262
                            ColumnFamilyHandle* column_family, const Slice& key,
263
                            PinnableWideColumns* columns, bool exclusive,
264
                            bool do_validate) override;
265
266
  using TransactionBaseImpl::Put;
267
  // `key` does NOT include timestamp even when it's enabled.
268
  Status Put(ColumnFamilyHandle* column_family, const Slice& key,
269
             const Slice& value, const bool assume_tracked = false) override;
270
  Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
271
             const SliceParts& value,
272
             const bool assume_tracked = false) override;
273
274
  using TransactionBaseImpl::PutUntracked;
275
  Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
276
                      const Slice& value) override;
277
  Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
278
                      const SliceParts& value) override;
279
280
  // `key` does NOT include timestamp even when it's enabled.
281
  Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
282
                   const WideColumns& columns,
283
0
                   bool assume_tracked = false) override {
284
0
    const bool do_validate = !assume_tracked;
285
286
0
    return PutEntityImpl(column_family, key, columns, do_validate,
287
0
                         assume_tracked);
288
0
  }
289
290
  Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
291
0
                            const WideColumns& columns) override {
292
0
    constexpr bool do_validate = false;
293
0
    constexpr bool assume_tracked = false;
294
295
0
    return PutEntityImpl(column_family, key, columns, do_validate,
296
0
                         assume_tracked);
297
0
  }
298
299
  using TransactionBaseImpl::Delete;
300
  // `key` does NOT include timestamp even when it's enabled.
301
  Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
302
                const bool assume_tracked = false) override;
303
  Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
304
                const bool assume_tracked = false) override;
305
306
  using TransactionBaseImpl::DeleteUntracked;
307
  Status DeleteUntracked(ColumnFamilyHandle* column_family,
308
                         const Slice& key) override;
309
  Status DeleteUntracked(ColumnFamilyHandle* column_family,
310
                         const SliceParts& key) override;
311
312
  using TransactionBaseImpl::SingleDelete;
313
  // `key` does NOT include timestamp even when it's enabled.
314
  Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
315
                      const bool assume_tracked = false) override;
316
  Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
317
                      const bool assume_tracked = false) override;
318
319
  using TransactionBaseImpl::SingleDeleteUntracked;
320
  Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
321
                               const Slice& key) override;
322
323
  using TransactionBaseImpl::Merge;
324
  Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
325
               const Slice& value, const bool assume_tracked = false) override;
326
327
  Status SetReadTimestampForValidation(TxnTimestamp ts) override;
328
  Status SetCommitTimestamp(TxnTimestamp ts) override;
329
0
  TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; }
330
331
 private:
332
  template <typename TValue>
333
  Status GetForUpdateImpl(const ReadOptions& read_options,
334
                          ColumnFamilyHandle* column_family, const Slice& key,
335
                          TValue* value, bool exclusive,
336
                          const bool do_validate);
337
338
  Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key,
339
                       const WideColumns& columns, bool do_validate,
340
                       bool assume_tracked);
341
342
  template <typename TKey, typename TOperation>
343
  Status Operate(ColumnFamilyHandle* column_family, const TKey& key,
344
                 const bool do_validate, const bool assume_tracked,
345
                 TOperation&& operation);
346
347
  Status PrepareInternal() override;
348
349
  Status CommitWithoutPrepareInternal() override;
350
351
  Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override;
352
353
  Status CommitInternal() override;
354
355
  Status RollbackInternal() override;
356
357
  // Checks if the combination of `do_validate`, the read timestamp set in
358
  // `read_timestamp_` and the `enable_udt_validation` flag in
359
  // TransactionDBOptions make sense together.
360
  Status SanityCheckReadTimestamp(bool do_validate);
361
362
  // Column families that enable timestamps and whose data are written when
363
  // indexing_enabled_ is false. If a key is written when indexing_enabled_ is
364
  // true, then the corresponding column family is not added to cfs_with_ts
365
  // even if it enables timestamp.
366
  std::unordered_set<uint32_t> cfs_with_ts_tracked_when_indexing_disabled_;
367
};
368
369
}  // namespace ROCKSDB_NAMESPACE