/src/rocksdb/utilities/transactions/pessimistic_transaction.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #pragma once |
7 | | |
8 | | #include <algorithm> |
9 | | #include <atomic> |
10 | | #include <mutex> |
11 | | #include <stack> |
12 | | #include <string> |
13 | | #include <unordered_map> |
14 | | #include <vector> |
15 | | |
16 | | #include "db/write_callback.h" |
17 | | #include "rocksdb/db.h" |
18 | | #include "rocksdb/slice.h" |
19 | | #include "rocksdb/snapshot.h" |
20 | | #include "rocksdb/status.h" |
21 | | #include "rocksdb/types.h" |
22 | | #include "rocksdb/utilities/transaction.h" |
23 | | #include "rocksdb/utilities/transaction_db.h" |
24 | | #include "rocksdb/utilities/write_batch_with_index.h" |
25 | | #include "util/autovector.h" |
26 | | #include "utilities/transactions/transaction_base.h" |
27 | | #include "utilities/transactions/transaction_util.h" |
28 | | |
29 | | namespace ROCKSDB_NAMESPACE { |
30 | | |
31 | | class PessimisticTransactionDB; |
32 | | |
33 | | // A transaction under pessimistic concurrency control. This class implements |
34 | | // the locking API and interfaces with the lock manager as well as the |
35 | | // pessimistic transactional db. |
36 | | class PessimisticTransaction : public TransactionBaseImpl { |
37 | | public: |
38 | | PessimisticTransaction(TransactionDB* db, const WriteOptions& write_options, |
39 | | const TransactionOptions& txn_options, |
40 | | const bool init = true); |
41 | | // No copying allowed |
42 | | PessimisticTransaction(const PessimisticTransaction&) = delete; |
43 | | void operator=(const PessimisticTransaction&) = delete; |
44 | | |
45 | | ~PessimisticTransaction() override; |
46 | | |
47 | | void Reinitialize(TransactionDB* txn_db, const WriteOptions& write_options, |
48 | | const TransactionOptions& txn_options); |
49 | | |
50 | | Status Prepare() override; |
51 | | |
52 | | Status Commit() override; |
53 | | |
54 | | // It is basically Commit without going through Prepare phase. The write batch |
55 | | // is also directly provided instead of expecting txn to gradually batch the |
56 | | // transactions writes to an internal write batch. |
57 | | Status CommitBatch(WriteBatch* batch); |
58 | | |
59 | | Status Rollback() override; |
60 | | |
61 | | Status RollbackToSavePoint() override; |
62 | | |
63 | | Status SetName(const TransactionName& name) override; |
64 | | |
65 | | // Generate a new unique transaction identifier |
66 | | static TransactionID GenTxnID(); |
67 | | |
68 | 0 | TransactionID GetID() const override { return txn_id_; } |
69 | | |
70 | | std::vector<TransactionID> GetWaitingTxns(uint32_t* column_family_id, |
71 | 0 | std::string* key) const override { |
72 | 0 | std::lock_guard<std::mutex> lock(wait_mutex_); |
73 | 0 | std::vector<TransactionID> ids(waiting_txn_ids_.size()); |
74 | 0 | if (timed_out_key_.has_value()) { |
75 | 0 | if (key) *key = timed_out_key_.value(); |
76 | 0 | } else { |
77 | 0 | if (key) *key = waiting_key_ ? *waiting_key_ : ""; |
78 | 0 | } |
79 | 0 | if (column_family_id) *column_family_id = waiting_cf_id_; |
80 | 0 | std::copy(waiting_txn_ids_.begin(), waiting_txn_ids_.end(), ids.begin()); |
81 | 0 | return ids; |
82 | 0 | } |
83 | | |
84 | | void SetWaitingTxn(autovector<TransactionID>& ids, uint32_t column_family_id, |
85 | 0 | const std::string* key, bool is_timed_out = false) { |
86 | 0 | std::lock_guard<std::mutex> lock(wait_mutex_); |
87 | 0 | waiting_txn_ids_ = ids; |
88 | 0 | waiting_cf_id_ = column_family_id; |
89 | 0 | if (is_timed_out) { |
90 | 0 | timed_out_key_ = key ? *key : ""; |
91 | 0 | } else { |
92 | 0 | waiting_key_ = key; |
93 | 0 | } |
94 | 0 | } |
95 | | |
96 | 0 | void ClearWaitingTxn() { |
97 | 0 | std::lock_guard<std::mutex> lock(wait_mutex_); |
98 | 0 | waiting_txn_ids_.clear(); |
99 | 0 | waiting_cf_id_ = 0; |
100 | 0 | waiting_key_ = nullptr; |
101 | 0 | } |
102 | | |
103 | | // Returns the time (in microseconds according to Env->GetMicros()) |
104 | | // that this transaction will be expired. Returns 0 if this transaction does |
105 | | // not expire. |
106 | 0 | uint64_t GetExpirationTime() const { return expiration_time_; } |
107 | | |
108 | | // returns true if this transaction has an expiration_time and has expired. |
109 | | bool IsExpired() const; |
110 | | |
111 | | // Returns the number of microseconds a transaction can wait on acquiring a |
112 | | // lock or -1 if there is no timeout. |
113 | 0 | int64_t GetLockTimeout() const { return lock_timeout_; } |
114 | 0 | void SetLockTimeout(int64_t timeout) override { |
115 | 0 | lock_timeout_ = timeout * 1000; |
116 | 0 | } |
117 | 0 | int64_t GetDeadlockTimeout() const { return deadlock_timeout_us_; } |
118 | 0 | void SetDeadlockTimeout(int64_t timeout_ms) override { |
119 | 0 | deadlock_timeout_us_ = timeout_ms * 1000; |
120 | 0 | } |
121 | | |
122 | | // Returns true if locks were stolen successfully, false otherwise. |
123 | | bool TryStealingLocks(); |
124 | | |
125 | 0 | bool IsDeadlockDetect() const override { return deadlock_detect_; } |
126 | | |
127 | 0 | int64_t GetDeadlockDetectDepth() const { return deadlock_detect_depth_; } |
128 | | |
129 | | Status GetRangeLock(ColumnFamilyHandle* column_family, |
130 | | const Endpoint& start_key, |
131 | | const Endpoint& end_key) override; |
132 | | |
133 | | Status CollapseKey(const ReadOptions& options, const Slice& key, |
134 | | ColumnFamilyHandle* column_family = nullptr) override; |
135 | | |
136 | | protected: |
137 | | virtual Status PrepareInternal() = 0; |
138 | | |
139 | | virtual Status CommitWithoutPrepareInternal() = 0; |
140 | | |
141 | | // batch_cnt if non-zero is the number of sub-batches. A sub-batch is a batch |
142 | | // with no duplicate keys. If zero, then the number of sub-batches is unknown. |
143 | | virtual Status CommitBatchInternal(WriteBatch* batch, |
144 | | size_t batch_cnt = 0) = 0; |
145 | | |
146 | | virtual Status CommitInternal() = 0; |
147 | | |
148 | | virtual Status RollbackInternal() = 0; |
149 | | |
150 | | virtual void Initialize(const TransactionOptions& txn_options); |
151 | | |
152 | | Status LockBatch(WriteBatch* batch, LockTracker* keys_to_unlock); |
153 | | |
154 | | Status TryLock(ColumnFamilyHandle* column_family, const Slice& key, |
155 | | bool read_only, bool exclusive, const bool do_validate = true, |
156 | | const bool assume_tracked = false) override; |
157 | | |
158 | | void Clear() override; |
159 | | |
160 | | PessimisticTransactionDB* txn_db_impl_; |
161 | | DBImpl* db_impl_; |
162 | | |
163 | | // If non-zero, this transaction should not be committed after this time (in |
164 | | // microseconds according to Env->NowMicros()) |
165 | | uint64_t expiration_time_; |
166 | | |
167 | | // Timestamp used by the transaction to perform all GetForUpdate. |
168 | | // Use this timestamp for conflict checking. |
169 | | // read_timestamp_ == kMaxTxnTimestamp means this transaction has not |
170 | | // performed any GetForUpdate. It is possible that the transaction has |
171 | | // performed blind writes or Get, though. |
172 | | TxnTimestamp read_timestamp_{kMaxTxnTimestamp}; |
173 | | TxnTimestamp commit_timestamp_{kMaxTxnTimestamp}; |
174 | | |
175 | | // Refer to |
176 | | // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery |
177 | | bool use_only_the_last_commit_time_batch_for_recovery_ = false; |
178 | | // Refer to |
179 | | // TransactionOptions::skip_prepare |
180 | | bool skip_prepare_ = false; |
181 | | // Refer to TransactionOptions::commit_bypass_memtable |
182 | | uint32_t commit_bypass_memtable_threshold_ = |
183 | | std::numeric_limits<uint32_t>::max(); |
184 | | uint64_t commit_bypass_memtable_byte_threshold_ = |
185 | | std::numeric_limits<uint64_t>::max(); |
186 | | |
187 | | private: |
188 | | friend class TransactionTest_ValidateSnapshotTest_Test; |
189 | | // Used to create unique ids for transactions. |
190 | | static std::atomic<TransactionID> txn_id_counter_; |
191 | | |
192 | | // Unique ID for this transaction |
193 | | TransactionID txn_id_; |
194 | | |
195 | | // IDs for the transactions that are blocking the current transaction. |
196 | | // |
197 | | // empty if current transaction is not waiting or has timed out |
198 | | autovector<TransactionID> waiting_txn_ids_; |
199 | | |
200 | | // The following two represents the (cf, key) that a transaction is waiting |
201 | | // on. |
202 | | // |
203 | | // If waiting_key_ is not null, then the pointer should always point to |
204 | | // a valid string object. The reason is that it is only non-null when the |
205 | | // transaction is blocked in the PointLockManager::AcquireWithTimeout |
206 | | // function. At that point, the key string object is one of the function |
207 | | // parameters. |
208 | | uint32_t waiting_cf_id_; |
209 | | const std::string* waiting_key_; |
210 | | |
211 | | // Waiting key with lifetime of the txn so it can be accessed after timeouts |
212 | | std::optional<std::string> timed_out_key_; |
213 | | |
214 | | // Mutex protecting waiting_txn_ids_, waiting_cf_id_ and waiting_key_. |
215 | | mutable std::mutex wait_mutex_; |
216 | | |
217 | | // Timeout in microseconds when locking a key or -1 if there is no timeout. |
218 | | int64_t lock_timeout_; |
219 | | |
220 | | // Timeout in microseconds before perform dead lock detection. |
221 | | // If 0, deadlock detection will be performed immediately. |
222 | | int64_t deadlock_timeout_us_; |
223 | | |
224 | | // Whether to perform deadlock detection or not. |
225 | | bool deadlock_detect_; |
226 | | |
227 | | // Whether to perform deadlock detection or not. |
228 | | int64_t deadlock_detect_depth_; |
229 | | |
230 | | // Refer to TransactionOptions::skip_concurrency_control |
231 | | bool skip_concurrency_control_; |
232 | | |
233 | | virtual Status ValidateSnapshot(ColumnFamilyHandle* column_family, |
234 | | const Slice& key, |
235 | | SequenceNumber* tracked_at_seq); |
236 | | |
237 | | void UnlockGetForUpdate(ColumnFamilyHandle* column_family, |
238 | | const Slice& key) override; |
239 | | }; |
240 | | |
241 | | class WriteCommittedTxn : public PessimisticTransaction { |
242 | | public: |
243 | | WriteCommittedTxn(TransactionDB* db, const WriteOptions& write_options, |
244 | | const TransactionOptions& txn_options); |
245 | | // No copying allowed |
246 | | WriteCommittedTxn(const WriteCommittedTxn&) = delete; |
247 | | void operator=(const WriteCommittedTxn&) = delete; |
248 | | |
249 | 0 | ~WriteCommittedTxn() override {} |
250 | | |
251 | | using TransactionBaseImpl::GetForUpdate; |
252 | | Status GetForUpdate(const ReadOptions& read_options, |
253 | | ColumnFamilyHandle* column_family, const Slice& key, |
254 | | std::string* value, bool exclusive, |
255 | | const bool do_validate) override; |
256 | | Status GetForUpdate(const ReadOptions& read_options, |
257 | | ColumnFamilyHandle* column_family, const Slice& key, |
258 | | PinnableSlice* pinnable_val, bool exclusive, |
259 | | const bool do_validate) override; |
260 | | |
261 | | Status GetEntityForUpdate(const ReadOptions& read_options, |
262 | | ColumnFamilyHandle* column_family, const Slice& key, |
263 | | PinnableWideColumns* columns, bool exclusive, |
264 | | bool do_validate) override; |
265 | | |
266 | | using TransactionBaseImpl::Put; |
267 | | // `key` does NOT include timestamp even when it's enabled. |
268 | | Status Put(ColumnFamilyHandle* column_family, const Slice& key, |
269 | | const Slice& value, const bool assume_tracked = false) override; |
270 | | Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, |
271 | | const SliceParts& value, |
272 | | const bool assume_tracked = false) override; |
273 | | |
274 | | using TransactionBaseImpl::PutUntracked; |
275 | | Status PutUntracked(ColumnFamilyHandle* column_family, const Slice& key, |
276 | | const Slice& value) override; |
277 | | Status PutUntracked(ColumnFamilyHandle* column_family, const SliceParts& key, |
278 | | const SliceParts& value) override; |
279 | | |
280 | | // `key` does NOT include timestamp even when it's enabled. |
281 | | Status PutEntity(ColumnFamilyHandle* column_family, const Slice& key, |
282 | | const WideColumns& columns, |
283 | 0 | bool assume_tracked = false) override { |
284 | 0 | const bool do_validate = !assume_tracked; |
285 | |
|
286 | 0 | return PutEntityImpl(column_family, key, columns, do_validate, |
287 | 0 | assume_tracked); |
288 | 0 | } |
289 | | |
290 | | Status PutEntityUntracked(ColumnFamilyHandle* column_family, const Slice& key, |
291 | 0 | const WideColumns& columns) override { |
292 | 0 | constexpr bool do_validate = false; |
293 | 0 | constexpr bool assume_tracked = false; |
294 | |
|
295 | 0 | return PutEntityImpl(column_family, key, columns, do_validate, |
296 | 0 | assume_tracked); |
297 | 0 | } |
298 | | |
299 | | using TransactionBaseImpl::Delete; |
300 | | // `key` does NOT include timestamp even when it's enabled. |
301 | | Status Delete(ColumnFamilyHandle* column_family, const Slice& key, |
302 | | const bool assume_tracked = false) override; |
303 | | Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key, |
304 | | const bool assume_tracked = false) override; |
305 | | |
306 | | using TransactionBaseImpl::DeleteUntracked; |
307 | | Status DeleteUntracked(ColumnFamilyHandle* column_family, |
308 | | const Slice& key) override; |
309 | | Status DeleteUntracked(ColumnFamilyHandle* column_family, |
310 | | const SliceParts& key) override; |
311 | | |
312 | | using TransactionBaseImpl::SingleDelete; |
313 | | // `key` does NOT include timestamp even when it's enabled. |
314 | | Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, |
315 | | const bool assume_tracked = false) override; |
316 | | Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, |
317 | | const bool assume_tracked = false) override; |
318 | | |
319 | | using TransactionBaseImpl::SingleDeleteUntracked; |
320 | | Status SingleDeleteUntracked(ColumnFamilyHandle* column_family, |
321 | | const Slice& key) override; |
322 | | |
323 | | using TransactionBaseImpl::Merge; |
324 | | Status Merge(ColumnFamilyHandle* column_family, const Slice& key, |
325 | | const Slice& value, const bool assume_tracked = false) override; |
326 | | |
327 | | Status SetReadTimestampForValidation(TxnTimestamp ts) override; |
328 | | Status SetCommitTimestamp(TxnTimestamp ts) override; |
329 | 0 | TxnTimestamp GetCommitTimestamp() const override { return commit_timestamp_; } |
330 | | |
331 | | private: |
332 | | template <typename TValue> |
333 | | Status GetForUpdateImpl(const ReadOptions& read_options, |
334 | | ColumnFamilyHandle* column_family, const Slice& key, |
335 | | TValue* value, bool exclusive, |
336 | | const bool do_validate); |
337 | | |
338 | | Status PutEntityImpl(ColumnFamilyHandle* column_family, const Slice& key, |
339 | | const WideColumns& columns, bool do_validate, |
340 | | bool assume_tracked); |
341 | | |
342 | | template <typename TKey, typename TOperation> |
343 | | Status Operate(ColumnFamilyHandle* column_family, const TKey& key, |
344 | | const bool do_validate, const bool assume_tracked, |
345 | | TOperation&& operation); |
346 | | |
347 | | Status PrepareInternal() override; |
348 | | |
349 | | Status CommitWithoutPrepareInternal() override; |
350 | | |
351 | | Status CommitBatchInternal(WriteBatch* batch, size_t batch_cnt) override; |
352 | | |
353 | | Status CommitInternal() override; |
354 | | |
355 | | Status RollbackInternal() override; |
356 | | |
357 | | // Checks if the combination of `do_validate`, the read timestamp set in |
358 | | // `read_timestamp_` and the `enable_udt_validation` flag in |
359 | | // TransactionDBOptions make sense together. |
360 | | Status SanityCheckReadTimestamp(bool do_validate); |
361 | | |
362 | | // Column families that enable timestamps and whose data are written when |
363 | | // indexing_enabled_ is false. If a key is written when indexing_enabled_ is |
364 | | // true, then the corresponding column family is not added to cfs_with_ts |
365 | | // even if it enables timestamp. |
366 | | std::unordered_set<uint32_t> cfs_with_ts_tracked_when_indexing_disabled_; |
367 | | }; |
368 | | |
369 | | } // namespace ROCKSDB_NAMESPACE |