/src/rocksdb/utilities/transactions/pessimistic_transaction.h
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #pragma once |
7 | | |
8 | | #include <algorithm> |
9 | | #include <atomic> |
10 | | #include <mutex> |
11 | | #include <stack> |
12 | | #include <string> |
13 | | #include <unordered_map> |
14 | | #include <vector> |
15 | | |
16 | | #include "db/write_callback.h" |
17 | | #include "rocksdb/db.h" |
18 | | #include "rocksdb/slice.h" |
19 | | #include "rocksdb/snapshot.h" |
20 | | #include "rocksdb/status.h" |
21 | | #include "rocksdb/types.h" |
22 | | #include "rocksdb/utilities/transaction.h" |
23 | | #include "rocksdb/utilities/transaction_db.h" |
24 | | #include "rocksdb/utilities/write_batch_with_index.h" |
25 | | #include "util/autovector.h" |
26 | | #include "utilities/transactions/transaction_base.h" |
27 | | #include "utilities/transactions/transaction_util.h" |
28 | | |
29 | | namespace ROCKSDB_NAMESPACE { |
30 | | |
31 | | class PessimisticTransactionDB; |
32 | | |
33 | | // A transaction under pessimistic concurrency control. This class implements |
34 | | // the locking API and interfaces with the lock manager as well as the |
35 | | // pessimistic transactional db. |
36 | | class PessimisticTransaction : public TransactionBaseImpl { |
37 | | public: |
38 | | PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options, |
39 | | const TransactionOptions& txn_options, |
40 | | const bool init = true); |
41 | | // No copying allowed |
42 | | PessimisticTransaction(const PessimisticTransaction&) = delete; |
43 | | void operator=(const PessimisticTransaction&) = delete; |
44 | | |
45 | | ~PessimisticTransaction() override; |
46 | | |
47 | | void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, |
48 | | const TransactionOptions& txn_options); |
49 | | |
50 | | Status Prepare() override; |
51 | | |
52 | | Status Commit() override; |
53 | | |
54 | | // It is basically Commit without going through Prepare phase. The write batch |
55 | | // is also directly provided instead of expecting txn to gradually batch the |
56 | | // transactions writes to an internal write batch. |
57 | | Status CommitBatch(WriteBatch* batch); |
58 | | |
59 | | Status Rollback() override; |
60 | | |
61 | | Status RollbackToSavePoint() override; |
62 | | |
63 | | Status SetName(const TransactionName& name) override; |
64 | | |
65 | | // Generate a new unique transaction identifier |
66 | | static TransactionID GenTxnID(); |
67 | | |
68 | 0 | TransactionID GetID() const override { return txn_id_; } |
69 | | |
70 | | std::vector<TransactionID> GetWaitingTxns(uint32_t* column_family_id, |
71 | 0 | std::string* key) const override { |
72 | 0 | std::lock_guard<std::mutex> lock(wait_mutex_); |
73 | 0 | std::vector<TransactionID> ids(waiting_txn_ids_.size()); |
74 | 0 | if (key) *key = waiting_key_ ? *waiting_key_ : ""; |
75 | 0 | if (column_family_id) *column_family_id = waiting_cf_id_; |
76 | 0 | std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin()); |
77 | 0 | return ids; |
78 | 0 | } |
79 | | |
80 | | void SetWaitingTxn(autovector<TransactionID> ids, uint32_t column_family_id, |
81 | 0 | const std::string* key) { |
82 | 0 | std::lock_guard<std::mutex> lock(wait_mutex_); |
83 | 0 | waiting_txn_ids_ = ids; |
84 | 0 | waiting_cf_id_ = column_family_id; |
85 | 0 | waiting_key_ = key; |
86 | 0 | } |
87 | | |
88 | 0 | void ClearWaitingTxn() { |
89 | 0 | std::lock_guard<std::mutex> lock(wait_mutex_); |
90 | 0 | waiting_txn_ids_.clear(); |
91 | 0 | waiting_cf_id_ = 0; |
92 | 0 | waiting_key_ = nullptr; |
93 | 0 | } |
94 | | |
95 | | // Returns the time (in microseconds according to Env->GetMicros()) |
96 | | // that this transaction will be expired. Returns 0 if this transaction does |
97 | | // not expire. |
98 | 0 | uint64_t GetExpirationTime() const { return expiration_time_; } |
99 | | |
100 | | // returns true if this transaction has an expiration_time and has expired. |
101 | | bool IsExpired() const; |
102 | | |
103 | | // Returns the number of microseconds a transaction can wait on acquiring a |
104 | | // lock or -1 if there is no timeout. |
105 | 0 | int64_t GetLockTimeout() const { return lock_timeout_; } |
106 | 0 | void SetLockTimeout(int64_t timeout) override { |
107 | 0 | lock_timeout_ = timeout * 1000; |
108 | 0 | } |
109 | | |
110 | | // Returns true if locks were stolen successfully, false otherwise. |
111 | | bool TryStealingLocks(); |
112 | | |
113 | 0 | bool IsDeadlockDetect() const override { return deadlock_detect_; } |
114 | | |
115 | 0 | int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } |
116 | | |
117 | | Status GetRangeLock(ColumnFamilyHandle* column_family, |
118 | | const Endpoint& start_key, |
119 | | const Endpoint& end_key) override; |
120 | | |
121 | | Status CollapseKey(const ReadOptions& options, const Slice& key, |
122 | | ColumnFamilyHandle* column_family = nullptr) override; |
123 | | |
124 | | protected: |
125 | | // Refer to |
126 | | // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery |
127 | | bool use_only_the_last_commit_time_batch_for_recovery_ = false; |
128 | | // Refer to |
129 | | // TransactionOptions::skip_prepare |
130 | | bool skip_prepare_ = false; |
131 | | |
132 | | virtual Status PrepareInternal() = 0; |
133 | | |
134 | | virtual Status CommitWithoutPrepareInternal() = 0; |
135 | | |
136 | | // batch_cnt if non-zero is the number of sub-batches. A sub-batch is a batch |
137 | | // with no duplicate keys. If zero, then the number of sub-batches is unknown. |
138 | | virtual Status CommitBatchInternal(WriteBatch* batch, |
139 | | size_t batch_cnt = 0) = 0; |
140 | | |
141 | | virtual Status CommitInternal() = 0; |
142 | | |
143 | | virtual Status RollbackInternal() = 0; |
144 | | |
145 | | virtual void Initialize(const TransactionOptions& txn_options); |
146 | | |
147 | | Status LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock); |
148 | | |
149 | | Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, |
150 | | bool read_only, bool exclusive, const bool do_validate = true, |
151 | | const bool assume_tracked = false) override; |
152 | | |
153 | | void Clear() override; |
154 | | |
155 | | PessimisticTransactionDB* txn_db_impl_; |
156 | | DBImpl* db_impl_; |
157 | | |
158 | | // If non-zero, this transaction should not be committed after this time (in |
159 | | // microseconds according to Env->NowMicros()) |
160 | | uint64_t expiration_time_; |
161 | | |
162 | | // Timestamp used by the transaction to perform all GetForUpdate. |
163 | | // Use this timestamp for conflict checking. |
164 | | // read_timestamp_ == kMaxTxnTimestamp means this transaction has not |
165 | | // performed any GetForUpdate. It is possible that the transaction has |
166 | | // performed blind writes or Get, though. |
167 | | TxnTimestamp read_timestamp_{kMaxTxnTimestamp}; |
168 | | TxnTimestamp commit_timestamp_{kMaxTxnTimestamp}; |
169 | | |
170 | | private: |
171 | | friend class TransactionTest_ValidateSnapshotTest_Test; |
172 | | // Used to create unique ids for transactions. |
173 | | static std::atomic<TransactionID> txn_id_counter_; |
174 | | |
175 | | // Unique ID for this transaction |
176 | | TransactionID txn_id_; |
177 | | |
178 | | // IDs for the transactions that are blocking the current transaction. |
179 | | // |
180 | | // empty if current transaction is not waiting. |
181 | | autovector<TransactionID> waiting_txn_ids_; |
182 | | |
183 | | // The following two represents the (cf, key) that a transaction is waiting |
184 | | // on. |
185 | | // |
186 | | // If waiting_key_ is not null, then the pointer should always point to |
187 | | // a valid string object. The reason is that it is only non-null when the |
188 | | // transaction is blocked in the PointLockManager::AcquireWithTimeout |
189 | | // function. At that point, the key string object is one of the function |
190 | | // parameters. |
191 | | uint32_t waiting_cf_id_; |
192 | | const std::string* waiting_key_; |
193 | | |
194 | | // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_. |
195 | | mutable std::mutex wait_mutex_; |
196 | | |
197 | | // Timeout in microseconds when locking a key or -1 if there is no timeout. |
198 | | int64_t lock_timeout_; |
199 | | |
200 | | // Whether to perform deadlock detection or not. |
201 | | bool deadlock_detect_; |
202 | | |
203 | | // Whether to perform deadlock detection or not. |
204 | | int64_t deadlock_detect_depth_; |
205 | | |
206 | | // Refer to TransactionOptions::skip_concurrency_control |
207 | | bool skip_concurrency_control_; |
208 | | |
209 | | virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family, |
210 | | const Slice& key, |
211 | | SequenceNumber* tracked_at_seq); |
212 | | |
213 | | void UnlockGetForUpdate(ColumnFamilyHandle* column_family, |
214 | | const Slice& key) override; |
215 | | }; |
216 | | |
217 | | class WriteCommittedTxn : public PessimisticTransaction { |
218 | | public: |
219 | | WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options, |
220 | | const TransactionOptions& txn_options); |
221 | | // No copying allowed |
222 | | WriteCommittedTxn(const WriteCommittedTxn&) = delete; |
223 | | void operator=(const WriteCommittedTxn&) = delete; |
224 | | |
225 | 0 | ~WriteCommittedTxn() override {} |
226 | | |
227 | | using TransactionBaseImpl::GetForUpdate; |
228 | | Status GetForUpdate(const ReadOptions& read_options, |
229 | | ColumnFamilyHandle* column_family, const Slice& key, |
230 | | std::string* value, bool exclusive, |
231 | | const bool do_validate) override; |
232 | | Status GetForUpdate(const ReadOptions& read_options, |
233 | | ColumnFamilyHandle* column_family, const Slice& key, |
234 | | PinnableSlice* pinnable_val, bool exclusive, |
235 | | const bool do_validate) override; |
236 | | |
237 | | Status GetEntityForUpdate(const ReadOptions& read_options, |
238 | | ColumnFamilyHandle* column_family, const Slice& key, |
239 | | PinnableWideColumns* columns, bool exclusive, |
240 | | bool do_validate) override; |
241 | | |
242 | | using TransactionBaseImpl::Put; |
243 | | // `key` does NOT include timestamp even when it's enabled. |
244 | | Status Put(ColumnFamilyHandle* column_family, const Slice& key, |
245 | | const Slice& value, const bool assume_tracked = false) override; |
246 | | Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, |
247 | | const SliceParts& value, |
248 | | const bool assume_tracked = false) override; |
249 | | |
250 | | using TransactionBaseImpl::PutUntracked; |
251 | | Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, |
252 | | const Slice& value) override; |
253 | | Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, |
254 | | const SliceParts& value) override; |
255 | | |
256 | | // `key` does NOT include timestamp even when it's enabled. |
257 | | Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key, |
258 | | const WideColumns& columns, |
259 | 0 | bool assume_tracked = false) override { |
260 | 0 | const bool do_validate = !assume_tracked; |
261 | |
|
262 | 0 | return PutEntityImpl(column_family, key, columns, do_validate, |
263 | 0 | assume_tracked); |
264 | 0 | } |
265 | | |
266 | | Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key, |
267 | 0 | const WideColumns& columns) override { |
268 | 0 | constexpr bool do_validate = false; |
269 | 0 | constexpr bool assume_tracked = false; |
270 | |
|
271 | 0 | return PutEntityImpl(column_family, key, columns, do_validate, |
272 | 0 | assume_tracked); |
273 | 0 | } |
274 | | |
275 | | using TransactionBaseImpl::Delete; |
276 | | // `key` does NOT include timestamp even when it's enabled. |
277 | | Status Delete(ColumnFamilyHandle* column_family, const Slice& key, |
278 | | const bool assume_tracked = false) override; |
279 | | Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key, |
280 | | const bool assume_tracked = false) override; |
281 | | |
282 | | using TransactionBaseImpl::DeleteUntracked; |
283 | | Status DeleteUntracked(ColumnFamilyHandle* column_family, |
284 | | const Slice& key) override; |
285 | | Status DeleteUntracked(ColumnFamilyHandle* column_family, |
286 | | const SliceParts& key) override; |
287 | | |
288 | | using TransactionBaseImpl::SingleDelete; |
289 | | // `key` does NOT include timestamp even when it's enabled. |
290 | | Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, |
291 | | const bool assume_tracked = false) override; |
292 | | Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, |
293 | | const bool assume_tracked = false) override; |
294 | | |
295 | | using TransactionBaseImpl::SingleDeleteUntracked; |
296 | | Status SingleDeleteUntracked(ColumnFamilyHandle* column_family, |
297 | | const Slice& key) override; |
298 | | |
299 | | using TransactionBaseImpl::Merge; |
300 | | Status Merge(ColumnFamilyHandle* column_family, const Slice& key, |
301 | | const Slice& value, const bool assume_tracked = false) override; |
302 | | |
303 | | Status SetReadTimestampForValidation(TxnTimestamp ts) override; |
304 | | Status SetCommitTimestamp(TxnTimestamp ts) override; |
305 | 0 | TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; } |
306 | | |
307 | | private: |
308 | | template <typename TValue> |
309 | | Status GetForUpdateImpl(const ReadOptions& read_options, |
310 | | ColumnFamilyHandle* column_family, const Slice& key, |
311 | | TValue* value, bool exclusive, |
312 | | const bool do_validate); |
313 | | |
314 | | Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key, |
315 | | const WideColumns& columns, bool do_validate, |
316 | | bool assume_tracked); |
317 | | |
318 | | template <typename TKey, typename TOperation> |
319 | | Status Operate(ColumnFamilyHandle* column_family, const TKey& key, |
320 | | const bool do_validate, const bool assume_tracked, |
321 | | TOperation&& operation); |
322 | | |
323 | | Status PrepareInternal() override; |
324 | | |
325 | | Status CommitWithoutPrepareInternal() override; |
326 | | |
327 | | Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override; |
328 | | |
329 | | Status CommitInternal() override; |
330 | | |
331 | | Status RollbackInternal() override; |
332 | | |
333 | | // Column families that enable timestamps and whose data are written when |
334 | | // indexing_enabled_ is false. If a key is written when indexing_enabled_ is |
335 | | // true, then the corresponding column family is not added to cfs_with_ts |
336 | | // even if it enables timestamp. |
337 | | std::unordered_set<uint32_t> cfs_with_ts_tracked_when_indexing_disabled_; |
338 | | }; |
339 | | |
340 | | } // namespace ROCKSDB_NAMESPACE |