Coverage Report

Created: 2024-07-27 06:53

/src/rocksdb/utilities/transactions/pessimistic_transaction.h
Line
Count
Source (jump to first uncovered line)
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <algorithm>
9
#include <atomic>
10
#include <mutex>
11
#include <stack>
12
#include <string>
13
#include <unordered_map>
14
#include <vector>
15
16
#include "db/write_callback.h"
17
#include "rocksdb/db.h"
18
#include "rocksdb/slice.h"
19
#include "rocksdb/snapshot.h"
20
#include "rocksdb/status.h"
21
#include "rocksdb/types.h"
22
#include "rocksdb/utilities/transaction.h"
23
#include "rocksdb/utilities/transaction_db.h"
24
#include "rocksdb/utilities/write_batch_with_index.h"
25
#include "util/autovector.h"
26
#include "utilities/transactions/transaction_base.h"
27
#include "utilities/transactions/transaction_util.h"
28
29
namespace ROCKSDB_NAMESPACE {
30
31
class PessimisticTransactionDB;
32
33
// A transaction under pessimistic concurrency control. This class implements
34
// the locking API and interfaces with the lock manager as well as the
35
// pessimistic transactional db.
36
class PessimisticTransaction : public TransactionBaseImpl {
37
 public:
38
  PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options,
39
                         const TransactionOptions& txn_options,
40
                         const bool init = true);
41
  // No copying allowed
42
  PessimisticTransaction(const PessimisticTransaction&) = delete;
43
  void operator=(const PessimisticTransaction&) = delete;
44
45
  ~PessimisticTransaction() override;
46
47
  void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options,
48
                    const TransactionOptions& txn_options);
49
50
  Status Prepare() override;
51
52
  Status Commit() override;
53
54
  // It is basically Commit without going through Prepare phase. The write batch
55
  // is also directly provided instead of expecting txn to gradually batch the
56
  // transactions writes to an internal write batch.
57
  Status CommitBatch(WriteBatch* batch);
58
59
  Status Rollback() override;
60
61
  Status RollbackToSavePoint() override;
62
63
  Status SetName(const TransactionName& name) override;
64
65
  // Generate a new unique transaction identifier
66
  static TransactionID GenTxnID();
67
68
0
  TransactionID GetID() const override { return txn_id_; }
69
70
  std::vector<TransactionID> GetWaitingTxns(uint32_t* column_family_id,
71
0
                                            std::string* key) const override {
72
0
    std::lock_guard<std::mutex> lock(wait_mutex_);
73
0
    std::vector<TransactionID> ids(waiting_txn_ids_.size());
74
0
    if (key) *key = waiting_key_ ? *waiting_key_ : "";
75
0
    if (column_family_id) *column_family_id = waiting_cf_id_;
76
0
    std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin());
77
0
    return ids;
78
0
  }
79
80
  void SetWaitingTxn(autovector<TransactionID> ids, uint32_t column_family_id,
81
0
                     const std::string* key) {
82
0
    std::lock_guard<std::mutex> lock(wait_mutex_);
83
0
    waiting_txn_ids_ = ids;
84
0
    waiting_cf_id_ = column_family_id;
85
0
    waiting_key_ = key;
86
0
  }
87
88
0
  void ClearWaitingTxn() {
89
0
    std::lock_guard<std::mutex> lock(wait_mutex_);
90
0
    waiting_txn_ids_.clear();
91
0
    waiting_cf_id_ = 0;
92
0
    waiting_key_ = nullptr;
93
0
  }
94
95
  // Returns the time (in microseconds according to Env->GetMicros())
96
  // that this transaction will be expired.  Returns 0 if this transaction does
97
  // not expire.
98
0
  uint64_t GetExpirationTime() const { return expiration_time_; }
99
100
  // returns true if this transaction has an expiration_time and has expired.
101
  bool IsExpired() const;
102
103
  // Returns the number of microseconds a transaction can wait on acquiring a
104
  // lock or -1 if there is no timeout.
105
0
  int64_t GetLockTimeout() const { return lock_timeout_; }
106
0
  void SetLockTimeout(int64_t timeout) override {
107
0
    lock_timeout_ = timeout * 1000;
108
0
  }
109
110
  // Returns true if locks were stolen successfully, false otherwise.
111
  bool TryStealingLocks();
112
113
0
  bool IsDeadlockDetect() const override { return deadlock_detect_; }
114
115
0
  int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; }
116
117
  Status GetRangeLock(ColumnFamilyHandle* column_family,
118
                      const Endpoint& start_key,
119
                      const Endpoint& end_key) override;
120
121
  Status CollapseKey(const ReadOptions& options, const Slice& key,
122
                     ColumnFamilyHandle* column_family = nullptr) override;
123
124
 protected:
125
  // Refer to
126
  // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery
127
  bool use_only_the_last_commit_time_batch_for_recovery_ = false;
128
  // Refer to
129
  // TransactionOptions::skip_prepare
130
  bool skip_prepare_ = false;
131
132
  virtual Status PrepareInternal() = 0;
133
134
  virtual Status CommitWithoutPrepareInternal() = 0;
135
136
  // batch_cnt if non-zero is the number of sub-batches. A sub-batch is a batch
137
  // with no duplicate keys. If zero, then the number of sub-batches is unknown.
138
  virtual Status CommitBatchInternal(WriteBatch* batch,
139
                                     size_t batch_cnt = 0) = 0;
140
141
  virtual Status CommitInternal() = 0;
142
143
  virtual Status RollbackInternal() = 0;
144
145
  virtual void Initialize(const TransactionOptions& txn_options);
146
147
  Status LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock);
148
149
  Status TryLock(ColumnFamilyHandle* column_family, const Slice& key,
150
                 bool read_only, bool exclusive, const bool do_validate = true,
151
                 const bool assume_tracked = false) override;
152
153
  void Clear() override;
154
155
  PessimisticTransactionDB* txn_db_impl_;
156
  DBImpl* db_impl_;
157
158
  // If non-zero, this transaction should not be committed after this time (in
159
  // microseconds according to Env->NowMicros())
160
  uint64_t expiration_time_;
161
162
  // Timestamp used by the transaction to perform all GetForUpdate.
163
  // Use this timestamp for conflict checking.
164
  // read_timestamp_ == kMaxTxnTimestamp means this transaction has not
165
  // performed any GetForUpdate. It is possible that the transaction has
166
  // performed blind writes or Get, though.
167
  TxnTimestamp read_timestamp_{kMaxTxnTimestamp};
168
  TxnTimestamp commit_timestamp_{kMaxTxnTimestamp};
169
170
 private:
171
  friend class TransactionTest_ValidateSnapshotTest_Test;
172
  // Used to create unique ids for transactions.
173
  static std::atomic<TransactionID> txn_id_counter_;
174
175
  // Unique ID for this transaction
176
  TransactionID txn_id_;
177
178
  // IDs for the transactions that are blocking the current transaction.
179
  //
180
  // empty if current transaction is not waiting.
181
  autovector<TransactionID> waiting_txn_ids_;
182
183
  // The following two represents the (cf, key) that a transaction is waiting
184
  // on.
185
  //
186
  // If waiting_key_ is not null, then the pointer should always point to
187
  // a valid string object. The reason is that it is only non-null when the
188
  // transaction is blocked in the PointLockManager::AcquireWithTimeout
189
  // function. At that point, the key string object is one of the function
190
  // parameters.
191
  uint32_t waiting_cf_id_;
192
  const std::string* waiting_key_;
193
194
  // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_.
195
  mutable std::mutex wait_mutex_;
196
197
  // Timeout in microseconds when locking a key or -1 if there is no timeout.
198
  int64_t lock_timeout_;
199
200
  // Whether to perform deadlock detection or not.
201
  bool deadlock_detect_;
202
203
  // Whether to perform deadlock detection or not.
204
  int64_t deadlock_detect_depth_;
205
206
  // Refer to TransactionOptions::skip_concurrency_control
207
  bool skip_concurrency_control_;
208
209
  virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family,
210
                                  const Slice& key,
211
                                  SequenceNumber* tracked_at_seq);
212
213
  void UnlockGetForUpdate(ColumnFamilyHandle* column_family,
214
                          const Slice& key) override;
215
};
216
217
class WriteCommittedTxn : public PessimisticTransaction {
218
 public:
219
  WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options,
220
                    const TransactionOptions& txn_options);
221
  // No copying allowed
222
  WriteCommittedTxn(const WriteCommittedTxn&) = delete;
223
  void operator=(const WriteCommittedTxn&) = delete;
224
225
0
  ~WriteCommittedTxn() override {}
226
227
  using TransactionBaseImpl::GetForUpdate;
228
  Status GetForUpdate(const ReadOptions& read_options,
229
                      ColumnFamilyHandle* column_family, const Slice& key,
230
                      std::string* value, bool exclusive,
231
                      const bool do_validate) override;
232
  Status GetForUpdate(const ReadOptions& read_options,
233
                      ColumnFamilyHandle* column_family, const Slice& key,
234
                      PinnableSlice* pinnable_val, bool exclusive,
235
                      const bool do_validate) override;
236
237
  Status GetEntityForUpdate(const ReadOptions& read_options,
238
                            ColumnFamilyHandle* column_family, const Slice& key,
239
                            PinnableWideColumns* columns, bool exclusive,
240
                            bool do_validate) override;
241
242
  using TransactionBaseImpl::Put;
243
  // `key` does NOT include timestamp even when it's enabled.
244
  Status Put(ColumnFamilyHandle* column_family, const Slice& key,
245
             const Slice& value, const bool assume_tracked = false) override;
246
  Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
247
             const SliceParts& value,
248
             const bool assume_tracked = false) override;
249
250
  using TransactionBaseImpl::PutUntracked;
251
  Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key,
252
                      const Slice& value) override;
253
  Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key,
254
                      const SliceParts& value) override;
255
256
  // `key` does NOT include timestamp even when it's enabled.
257
  Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key,
258
                   const WideColumns& columns,
259
0
                   bool assume_tracked = false) override {
260
0
    const bool do_validate = !assume_tracked;
261
262
0
    return PutEntityImpl(column_family, key, columns, do_validate,
263
0
                         assume_tracked);
264
0
  }
265
266
  Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key,
267
0
                            const WideColumns& columns) override {
268
0
    constexpr bool do_validate = false;
269
0
    constexpr bool assume_tracked = false;
270
271
0
    return PutEntityImpl(column_family, key, columns, do_validate,
272
0
                         assume_tracked);
273
0
  }
274
275
  using TransactionBaseImpl::Delete;
276
  // `key` does NOT include timestamp even when it's enabled.
277
  Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
278
                const bool assume_tracked = false) override;
279
  Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
280
                const bool assume_tracked = false) override;
281
282
  using TransactionBaseImpl::DeleteUntracked;
283
  Status DeleteUntracked(ColumnFamilyHandle* column_family,
284
                         const Slice& key) override;
285
  Status DeleteUntracked(ColumnFamilyHandle* column_family,
286
                         const SliceParts& key) override;
287
288
  using TransactionBaseImpl::SingleDelete;
289
  // `key` does NOT include timestamp even when it's enabled.
290
  Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
291
                      const bool assume_tracked = false) override;
292
  Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
293
                      const bool assume_tracked = false) override;
294
295
  using TransactionBaseImpl::SingleDeleteUntracked;
296
  Status SingleDeleteUntracked(ColumnFamilyHandle* column_family,
297
                               const Slice& key) override;
298
299
  using TransactionBaseImpl::Merge;
300
  Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
301
               const Slice& value, const bool assume_tracked = false) override;
302
303
  Status SetReadTimestampForValidation(TxnTimestamp ts) override;
304
  Status SetCommitTimestamp(TxnTimestamp ts) override;
305
0
  TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; }
306
307
 private:
308
  template <typename TValue>
309
  Status GetForUpdateImpl(const ReadOptions& read_options,
310
                          ColumnFamilyHandle* column_family, const Slice& key,
311
                          TValue* value, bool exclusive,
312
                          const bool do_validate);
313
314
  Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key,
315
                       const WideColumns& columns, bool do_validate,
316
                       bool assume_tracked);
317
318
  template <typename TKey, typename TOperation>
319
  Status Operate(ColumnFamilyHandle* column_family, const TKey& key,
320
                 const bool do_validate, const bool assume_tracked,
321
                 TOperation&& operation);
322
323
  Status PrepareInternal() override;
324
325
  Status CommitWithoutPrepareInternal() override;
326
327
  Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override;
328
329
  Status CommitInternal() override;
330
331
  Status RollbackInternal() override;
332
333
  // Column families that enable timestamps and whose data are written when
334
  // indexing_enabled_ is false. If a key is written when indexing_enabled_ is
335
  // true, then the corresponding column family is not added to cfs_with_ts
336
  // even if it enables timestamp.
337
  std::unordered_set<uint32_t> cfs_with_ts_tracked_when_indexing_disabled_;
338
};
339
340
}  // namespace ROCKSDB_NAMESPACE