/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #pragma once |
7 | | |
8 | | #include <mutex> |
9 | | #include <queue> |
10 | | #include <set> |
11 | | #include <string> |
12 | | #include <unordered_map> |
13 | | #include <vector> |
14 | | |
15 | | #include "db/db_iter.h" |
16 | | #include "db/read_callback.h" |
17 | | #include "db/snapshot_checker.h" |
18 | | #include "rocksdb/db.h" |
19 | | #include "rocksdb/options.h" |
20 | | #include "rocksdb/utilities/transaction_db.h" |
21 | | #include "util/cast_util.h" |
22 | | #include "utilities/transactions/lock/lock_manager.h" |
23 | | #include "utilities/transactions/lock/range/range_lock_manager.h" |
24 | | #include "utilities/transactions/pessimistic_transaction.h" |
25 | | #include "utilities/transactions/write_prepared_txn.h" |
26 | | |
27 | | namespace ROCKSDB_NAMESPACE { |
28 | | |
29 | | class PessimisticTransactionDB : public TransactionDB { |
30 | | public: |
31 | | explicit PessimisticTransactionDB(DB* db, |
32 | | const TransactionDBOptions& txn_db_options); |
33 | | |
34 | | explicit PessimisticTransactionDB(StackableDB* db, |
35 | | const TransactionDBOptions& txn_db_options); |
36 | | |
37 | | virtual ~PessimisticTransactionDB(); |
38 | | |
39 | 0 | const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } |
40 | | |
41 | | virtual Status Initialize( |
42 | | const std::vector<size_t>& compaction_enabled_cf_indices, |
43 | | const std::vector<ColumnFamilyHandle*>& handles); |
44 | | |
45 | | Transaction* BeginTransaction(const WriteOptions& write_options, |
46 | | const TransactionOptions& txn_options, |
47 | | Transaction* old_txn) override = 0; |
48 | | |
49 | | using StackableDB::Put; |
50 | | Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, |
51 | | const Slice& key, const Slice& val) override; |
52 | | |
53 | | Status PutEntity(const WriteOptions& options, |
54 | | ColumnFamilyHandle* column_family, const Slice& key, |
55 | | const WideColumns& columns) override; |
56 | | Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */, |
57 | 0 | const AttributeGroups& attribute_groups) override { |
58 | 0 | if (attribute_groups.empty()) { |
59 | 0 | return Status::InvalidArgument( |
60 | 0 | "Cannot call this method without attribute groups"); |
61 | 0 | } |
62 | 0 | return Status::NotSupported( |
63 | 0 | "PutEntity with AttributeGroups not supported by " |
64 | 0 | "PessimisticTransactionDB"); |
65 | 0 | } |
66 | | |
67 | | using StackableDB::Delete; |
68 | | Status Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family, |
69 | | const Slice& key) override; |
70 | | |
71 | | using StackableDB::SingleDelete; |
72 | | Status SingleDelete(const WriteOptions& wopts, |
73 | | ColumnFamilyHandle* column_family, |
74 | | const Slice& key) override; |
75 | | |
76 | | using StackableDB::Merge; |
77 | | Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family, |
78 | | const Slice& key, const Slice& value) override; |
79 | | |
80 | | using TransactionDB::Write; |
81 | | Status Write(const WriteOptions& opts, WriteBatch* updates) override; |
82 | | inline Status WriteWithConcurrencyControl(const WriteOptions& opts, |
83 | 0 | WriteBatch* updates) { |
84 | 0 | Status s; |
85 | 0 | if (opts.protection_bytes_per_key > 0) { |
86 | 0 | s = WriteBatchInternal::UpdateProtectionInfo( |
87 | 0 | updates, opts.protection_bytes_per_key); |
88 | 0 | } |
89 | 0 | if (s.ok()) { |
90 | | // Need to lock all keys in this batch to prevent write conflicts with |
91 | | // concurrent transactions. |
92 | 0 | Transaction* txn = BeginInternalTransaction(opts); |
93 | 0 | txn->DisableIndexing(); |
94 | |
|
95 | 0 | auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn); |
96 | | |
97 | | // Since commitBatch sorts the keys before locking, concurrent Write() |
98 | | // operations will not cause a deadlock. |
99 | | // In order to avoid a deadlock with a concurrent Transaction, |
100 | | // Transactions should use a lock timeout. |
101 | 0 | s = txn_impl->CommitBatch(updates); |
102 | |
|
103 | 0 | delete txn; |
104 | 0 | } |
105 | |
|
106 | 0 | return s; |
107 | 0 | } |
108 | | |
109 | | using StackableDB::CreateColumnFamily; |
110 | | Status CreateColumnFamily(const ColumnFamilyOptions& options, |
111 | | const std::string& column_family_name, |
112 | | ColumnFamilyHandle** handle) override; |
113 | | |
114 | | Status CreateColumnFamilies( |
115 | | const ColumnFamilyOptions& options, |
116 | | const std::vector<std::string>& column_family_names, |
117 | | std::vector<ColumnFamilyHandle*>* handles) override; |
118 | | |
119 | | Status CreateColumnFamilies( |
120 | | const std::vector<ColumnFamilyDescriptor>& column_families, |
121 | | std::vector<ColumnFamilyHandle*>* handles) override; |
122 | | |
123 | | using StackableDB::CreateColumnFamilyWithImport; |
124 | | Status CreateColumnFamilyWithImport( |
125 | | const ColumnFamilyOptions& options, const std::string& column_family_name, |
126 | | const ImportColumnFamilyOptions& import_options, |
127 | | const ExportImportFilesMetaData& metadata, |
128 | 0 | ColumnFamilyHandle** handle) override { |
129 | 0 | const std::vector<const ExportImportFilesMetaData*>& metadatas{&metadata}; |
130 | 0 | return CreateColumnFamilyWithImport(options, column_family_name, |
131 | 0 | import_options, metadatas, handle); |
132 | 0 | } |
133 | | |
134 | | Status CreateColumnFamilyWithImport( |
135 | | const ColumnFamilyOptions& options, const std::string& column_family_name, |
136 | | const ImportColumnFamilyOptions& import_options, |
137 | | const std::vector<const ExportImportFilesMetaData*>& metadatas, |
138 | | ColumnFamilyHandle** handle) override; |
139 | | |
140 | | using StackableDB::DropColumnFamily; |
141 | | Status DropColumnFamily(ColumnFamilyHandle* column_family) override; |
142 | | |
143 | | Status DropColumnFamilies( |
144 | | const std::vector<ColumnFamilyHandle*>& column_families) override; |
145 | | |
146 | | Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id, |
147 | | const std::string& key, bool exclusive); |
148 | | Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id, |
149 | | const Endpoint& start_endp, const Endpoint& end_endp); |
150 | | |
151 | | void UnLock(PessimisticTransaction* txn, const LockTracker& keys); |
152 | | void UnLock(PessimisticTransaction* txn, uint32_t cfh_id, |
153 | | const std::string& key); |
154 | | |
155 | | void AddColumnFamily(const ColumnFamilyHandle* handle); |
156 | | |
157 | | static TransactionDBOptions ValidateTxnDBOptions( |
158 | | const TransactionDBOptions& txn_db_options); |
159 | | |
160 | 0 | const TransactionDBOptions& GetTxnDBOptions() const { |
161 | 0 | return txn_db_options_; |
162 | 0 | } |
163 | | |
164 | | void InsertExpirableTransaction(TransactionID tx_id, |
165 | | PessimisticTransaction* tx); |
166 | | void RemoveExpirableTransaction(TransactionID tx_id); |
167 | | |
168 | | // If transaction is no longer available, locks can be stolen |
169 | | // If transaction is available, try stealing locks directly from transaction |
170 | | // It is the caller's responsibility to ensure that the referred transaction |
171 | | // is expirable (GetExpirationTime() > 0) and that it is expired. |
172 | | bool TryStealingExpiredTransactionLocks(TransactionID tx_id); |
173 | | |
174 | | Transaction* GetTransactionByName(const TransactionName& name) override; |
175 | | |
176 | | Status RegisterTransaction(Transaction* txn); |
177 | | void UnregisterTransaction(Transaction* txn); |
178 | | |
179 | | // not thread safe. current use case is during recovery (single thread) |
180 | | void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override; |
181 | | |
182 | | LockManager::PointLockStatus GetLockStatusData() override; |
183 | | |
184 | | std::vector<DeadlockPath> GetDeadlockInfoBuffer() override; |
185 | | void SetDeadlockInfoBufferSize(uint32_t target_size) override; |
186 | | |
187 | | // The default implementation does nothing. The actual implementation is moved |
188 | | // to the child classes that actually need this information. This was due to |
189 | | // an odd performance drop we observed when the added std::atomic member to |
190 | | // the base class even when the subclass do not read it in the fast path. |
191 | 0 | virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {} |
192 | 0 | virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {} |
193 | | |
194 | | // Use the returned factory to create LockTrackers in transactions. |
195 | 0 | const LockTrackerFactory& GetLockTrackerFactory() const { |
196 | 0 | return lock_manager_->GetLockTrackerFactory(); |
197 | 0 | } |
198 | | |
199 | | std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot( |
200 | | TxnTimestamp ts) override; |
201 | | |
202 | | std::shared_ptr<const Snapshot> GetTimestampedSnapshot( |
203 | | TxnTimestamp ts) const override; |
204 | | |
205 | | void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override; |
206 | | |
207 | | Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub, |
208 | | std::vector<std::shared_ptr<const Snapshot>>& |
209 | | timestamped_snapshots) const override; |
210 | | |
211 | | protected: |
212 | | DBImpl* db_impl_; |
213 | | std::shared_ptr<Logger> info_log_; |
214 | | const TransactionDBOptions txn_db_options_; |
215 | | |
216 | | static Status FailIfBatchHasTs(const WriteBatch* wb); |
217 | | |
218 | | static Status FailIfCfEnablesTs(const DB* db, |
219 | | const ColumnFamilyHandle* column_family); |
220 | | |
221 | | void ReinitializeTransaction( |
222 | | Transaction* txn, const WriteOptions& write_options, |
223 | | const TransactionOptions& txn_options = TransactionOptions()); |
224 | | |
225 | | virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options); |
226 | | |
227 | | private: |
228 | | friend class WritePreparedTxnDB; |
229 | | friend class WritePreparedTxnDBMock; |
230 | | friend class WriteUnpreparedTxn; |
231 | | friend class TransactionTest_DoubleCrashInRecovery_Test; |
232 | | friend class TransactionTest_DoubleEmptyWrite_Test; |
233 | | friend class TransactionTest_DuplicateKeys_Test; |
234 | | friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test; |
235 | | friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test; |
236 | | friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test; |
237 | | friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test; |
238 | | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; |
239 | | friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test; |
240 | | |
241 | | Transaction* BeginInternalTransaction(const WriteOptions& options); |
242 | | Transaction* GetTransactionByNameLocked(const TransactionName& name); |
243 | | |
244 | | std::shared_ptr<LockManager> lock_manager_; |
245 | | |
246 | | // Must be held when adding/dropping column families. |
247 | | InstrumentedMutex column_family_mutex_; |
248 | | |
249 | | // Used to ensure that no locks are stolen from an expirable transaction |
250 | | // that has started a commit. Only transactions with an expiration time |
251 | | // should be in this map. |
252 | | std::mutex map_mutex_; |
253 | | std::unordered_map<TransactionID, PessimisticTransaction*> |
254 | | expirable_transactions_map_; |
255 | | |
256 | | // map from name to two phase transaction instance |
257 | | std::mutex name_map_mutex_; |
258 | | std::unordered_map<TransactionName, Transaction*> transactions_; |
259 | | |
260 | | // Signal that we are testing a crash scenario. Some asserts could be relaxed |
261 | | // in such cases. |
262 | 0 | virtual void TEST_Crash() {} |
263 | | }; |
264 | | |
265 | | // A PessimisticTransactionDB that writes the data to the DB after the commit. |
266 | | // In this way the DB only contains the committed data. |
267 | | class WriteCommittedTxnDB : public PessimisticTransactionDB { |
268 | | public: |
269 | | explicit WriteCommittedTxnDB(DB* db, |
270 | | const TransactionDBOptions& txn_db_options) |
271 | 0 | : PessimisticTransactionDB(db, txn_db_options) {} |
272 | | |
273 | | explicit WriteCommittedTxnDB(StackableDB* db, |
274 | | const TransactionDBOptions& txn_db_options) |
275 | 0 | : PessimisticTransactionDB(db, txn_db_options) {} |
276 | | |
277 | 0 | virtual ~WriteCommittedTxnDB() {} |
278 | | |
279 | | Transaction* BeginTransaction(const WriteOptions& write_options, |
280 | | const TransactionOptions& txn_options, |
281 | | Transaction* old_txn) override; |
282 | | |
283 | | // Optimized version of ::Write that makes use of skip_concurrency_control |
284 | | // hint |
285 | | using TransactionDB::Write; |
286 | | Status Write(const WriteOptions& opts, |
287 | | const TransactionDBWriteOptimizations& optimizations, |
288 | | WriteBatch* updates) override; |
289 | | Status Write(const WriteOptions& opts, WriteBatch* updates) override; |
290 | | }; |
291 | | |
292 | | inline Status PessimisticTransactionDB::FailIfBatchHasTs( |
293 | 0 | const WriteBatch* batch) { |
294 | 0 | if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) { |
295 | 0 | return Status::NotSupported( |
296 | 0 | "Writes with timestamp must go through transaction API instead of " |
297 | 0 | "TransactionDB."); |
298 | 0 | } |
299 | 0 | return Status::OK(); |
300 | 0 | } |
301 | | |
302 | | inline Status PessimisticTransactionDB::FailIfCfEnablesTs( |
303 | 0 | const DB* db, const ColumnFamilyHandle* column_family) { |
304 | 0 | assert(db); |
305 | 0 | column_family = column_family ? column_family : db->DefaultColumnFamily(); |
306 | 0 | assert(column_family); |
307 | 0 | const Comparator* const ucmp = column_family->GetComparator(); |
308 | 0 | assert(ucmp); |
309 | 0 | if (ucmp->timestamp_size() > 0) { |
310 | 0 | return Status::NotSupported( |
311 | 0 | "Write operation with user timestamp must go through the transaction " |
312 | 0 | "API instead of TransactionDB."); |
313 | 0 | } |
314 | 0 | return Status::OK(); |
315 | 0 | } |
316 | | |
317 | | class SnapshotCreationCallback : public PostMemTableCallback { |
318 | | public: |
319 | | explicit SnapshotCreationCallback( |
320 | | DBImpl* dbi, TxnTimestamp commit_ts, |
321 | | const std::shared_ptr<TransactionNotifier>& notifier, |
322 | | std::shared_ptr<const Snapshot>& snapshot) |
323 | 0 | : db_impl_(dbi), |
324 | 0 | commit_ts_(commit_ts), |
325 | 0 | snapshot_notifier_(notifier), |
326 | 0 | snapshot_(snapshot) { |
327 | 0 | assert(db_impl_); |
328 | 0 | } |
329 | | |
330 | 0 | ~SnapshotCreationCallback() override { |
331 | 0 | snapshot_creation_status_.PermitUncheckedError(); |
332 | 0 | } |
333 | | |
334 | | Status operator()(SequenceNumber seq, bool disable_memtable) override; |
335 | | |
336 | | private: |
337 | | DBImpl* const db_impl_; |
338 | | const TxnTimestamp commit_ts_; |
339 | | std::shared_ptr<TransactionNotifier> snapshot_notifier_; |
340 | | std::shared_ptr<const Snapshot>& snapshot_; |
341 | | |
342 | | Status snapshot_creation_status_; |
343 | | }; |
344 | | |
345 | | } // namespace ROCKSDB_NAMESPACE |