Coverage Report

Created: 2025-10-26 07:13

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/pessimistic_transaction_db.h
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <mutex>
9
#include <queue>
10
#include <set>
11
#include <string>
12
#include <unordered_map>
13
#include <vector>
14
15
#include "db/db_iter.h"
16
#include "db/read_callback.h"
17
#include "db/snapshot_checker.h"
18
#include "rocksdb/db.h"
19
#include "rocksdb/options.h"
20
#include "rocksdb/utilities/transaction_db.h"
21
#include "util/cast_util.h"
22
#include "utilities/transactions/lock/lock_manager.h"
23
#include "utilities/transactions/lock/range/range_lock_manager.h"
24
#include "utilities/transactions/pessimistic_transaction.h"
25
#include "utilities/transactions/write_prepared_txn.h"
26
27
namespace ROCKSDB_NAMESPACE {
28
29
class PessimisticTransactionDB : public TransactionDB {
30
 public:
31
  explicit PessimisticTransactionDB(DB* db,
32
                                    const TransactionDBOptions& txn_db_options);
33
34
  explicit PessimisticTransactionDB(StackableDB* db,
35
                                    const TransactionDBOptions& txn_db_options);
36
37
  virtual ~PessimisticTransactionDB();
38
39
0
  const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); }
40
41
  virtual Status Initialize(
42
      const std::vector<size_t>& compaction_enabled_cf_indices,
43
      const std::vector<ColumnFamilyHandle*>& handles);
44
45
  Transaction* BeginTransaction(const WriteOptions& write_options,
46
                                const TransactionOptions& txn_options,
47
                                Transaction* old_txn) override = 0;
48
49
  using StackableDB::Put;
50
  Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family,
51
             const Slice& key, const Slice& val) override;
52
53
  Status PutEntity(const WriteOptions& options,
54
                   ColumnFamilyHandle* column_family, const Slice& key,
55
                   const WideColumns& columns) override;
56
  Status PutEntity(const WriteOptions& /* options */, const Slice& /* key */,
57
0
                   const AttributeGroups& attribute_groups) override {
58
0
    if (attribute_groups.empty()) {
59
0
      return Status::InvalidArgument(
60
0
          "Cannot call this method without attribute groups");
61
0
    }
62
0
    return Status::NotSupported(
63
0
        "PutEntity with AttributeGroups not supported by "
64
0
        "PessimisticTransactionDB");
65
0
  }
66
67
  using StackableDB::Delete;
68
  Status Delete(const WriteOptions& wopts, ColumnFamilyHandle* column_family,
69
                const Slice& key) override;
70
71
  using StackableDB::SingleDelete;
72
  Status SingleDelete(const WriteOptions& wopts,
73
                      ColumnFamilyHandle* column_family,
74
                      const Slice& key) override;
75
76
  using StackableDB::Merge;
77
  Status Merge(const WriteOptions& options, ColumnFamilyHandle* column_family,
78
               const Slice& key, const Slice& value) override;
79
80
  using TransactionDB::Write;
81
  Status Write(const WriteOptions& opts, WriteBatch* updates) override;
82
  inline Status WriteWithConcurrencyControl(const WriteOptions& opts,
83
0
                                            WriteBatch* updates) {
84
0
    Status s;
85
0
    if (opts.protection_bytes_per_key > 0) {
86
0
      s = WriteBatchInternal::UpdateProtectionInfo(
87
0
          updates, opts.protection_bytes_per_key);
88
0
    }
89
0
    if (s.ok()) {
90
      // Need to lock all keys in this batch to prevent write conflicts with
91
      // concurrent transactions.
92
0
      Transaction* txn = BeginInternalTransaction(opts);
93
0
      txn->DisableIndexing();
94
95
0
      auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
96
97
      // Since commitBatch sorts the keys before locking, concurrent Write()
98
      // operations will not cause a deadlock.
99
      // In order to avoid a deadlock with a concurrent Transaction,
100
      // Transactions should use a lock timeout.
101
0
      s = txn_impl->CommitBatch(updates);
102
103
0
      delete txn;
104
0
    }
105
106
0
    return s;
107
0
  }
108
109
  using StackableDB::CreateColumnFamily;
110
  Status CreateColumnFamily(const ColumnFamilyOptions& options,
111
                            const std::string& column_family_name,
112
                            ColumnFamilyHandle** handle) override;
113
114
  Status CreateColumnFamilies(
115
      const ColumnFamilyOptions& options,
116
      const std::vector<std::string>& column_family_names,
117
      std::vector<ColumnFamilyHandle*>* handles) override;
118
119
  Status CreateColumnFamilies(
120
      const std::vector<ColumnFamilyDescriptor>& column_families,
121
      std::vector<ColumnFamilyHandle*>* handles) override;
122
123
  using StackableDB::CreateColumnFamilyWithImport;
124
  Status CreateColumnFamilyWithImport(
125
      const ColumnFamilyOptions& options, const std::string& column_family_name,
126
      const ImportColumnFamilyOptions& import_options,
127
      const ExportImportFilesMetaData& metadata,
128
0
      ColumnFamilyHandle** handle) override {
129
0
    const std::vector<const ExportImportFilesMetaData*>& metadatas{&metadata};
130
0
    return CreateColumnFamilyWithImport(options, column_family_name,
131
0
                                        import_options, metadatas, handle);
132
0
  }
133
134
  Status CreateColumnFamilyWithImport(
135
      const ColumnFamilyOptions& options, const std::string& column_family_name,
136
      const ImportColumnFamilyOptions& import_options,
137
      const std::vector<const ExportImportFilesMetaData*>& metadatas,
138
      ColumnFamilyHandle** handle) override;
139
140
  using StackableDB::DropColumnFamily;
141
  Status DropColumnFamily(ColumnFamilyHandle* column_family) override;
142
143
  Status DropColumnFamilies(
144
      const std::vector<ColumnFamilyHandle*>& column_families) override;
145
146
  Status TryLock(PessimisticTransaction* txn, uint32_t cfh_id,
147
                 const std::string& key, bool exclusive);
148
  Status TryRangeLock(PessimisticTransaction* txn, uint32_t cfh_id,
149
                      const Endpoint& start_endp, const Endpoint& end_endp);
150
151
  void UnLock(PessimisticTransaction* txn, const LockTracker& keys);
152
  void UnLock(PessimisticTransaction* txn, uint32_t cfh_id,
153
              const std::string& key);
154
155
  void AddColumnFamily(const ColumnFamilyHandle* handle);
156
157
  static TransactionDBOptions ValidateTxnDBOptions(
158
      const TransactionDBOptions& txn_db_options);
159
160
0
  const TransactionDBOptions& GetTxnDBOptions() const {
161
0
    return txn_db_options_;
162
0
  }
163
164
  void InsertExpirableTransaction(TransactionID tx_id,
165
                                  PessimisticTransaction* tx);
166
  void RemoveExpirableTransaction(TransactionID tx_id);
167
168
  // If transaction is no longer available, locks can be stolen
169
  // If transaction is available, try stealing locks directly from transaction
170
  // It is the caller's responsibility to ensure that the referred transaction
171
  // is expirable (GetExpirationTime() > 0) and that it is expired.
172
  bool TryStealingExpiredTransactionLocks(TransactionID tx_id);
173
174
  Transaction* GetTransactionByName(const TransactionName& name) override;
175
176
  Status RegisterTransaction(Transaction* txn);
177
  void UnregisterTransaction(Transaction* txn);
178
179
  // not thread safe. current use case is during recovery (single thread)
180
  void GetAllPreparedTransactions(std::vector<Transaction*>* trans) override;
181
182
  LockManager::PointLockStatus GetLockStatusData() override;
183
184
  std::vector<DeadlockPath> GetDeadlockInfoBuffer() override;
185
  void SetDeadlockInfoBufferSize(uint32_t target_size) override;
186
187
  // The default implementation does nothing. The actual implementation is moved
188
  // to the child classes that actually need this information. This was due to
189
  // an odd performance drop we observed when the added std::atomic member to
190
  // the base class even when the subclass do not read it in the fast path.
191
0
  virtual void UpdateCFComparatorMap(const std::vector<ColumnFamilyHandle*>&) {}
192
0
  virtual void UpdateCFComparatorMap(ColumnFamilyHandle*) {}
193
194
  // Use the returned factory to create LockTrackers in transactions.
195
0
  const LockTrackerFactory& GetLockTrackerFactory() const {
196
0
    return lock_manager_->GetLockTrackerFactory();
197
0
  }
198
199
  std::pair<Status, std::shared_ptr<const Snapshot>> CreateTimestampedSnapshot(
200
      TxnTimestamp ts) override;
201
202
  std::shared_ptr<const Snapshot> GetTimestampedSnapshot(
203
      TxnTimestamp ts) const override;
204
205
  void ReleaseTimestampedSnapshotsOlderThan(TxnTimestamp ts) override;
206
207
  Status GetTimestampedSnapshots(TxnTimestamp ts_lb, TxnTimestamp ts_ub,
208
                                 std::vector<std::shared_ptr<const Snapshot>>&
209
                                     timestamped_snapshots) const override;
210
211
 protected:
212
  DBImpl* db_impl_;
213
  std::shared_ptr<Logger> info_log_;
214
  const TransactionDBOptions txn_db_options_;
215
216
  static Status FailIfBatchHasTs(const WriteBatch* wb);
217
218
  static Status FailIfCfEnablesTs(const DB* db,
219
                                  const ColumnFamilyHandle* column_family);
220
221
  void ReinitializeTransaction(
222
      Transaction* txn, const WriteOptions& write_options,
223
      const TransactionOptions& txn_options = TransactionOptions());
224
225
  virtual Status VerifyCFOptions(const ColumnFamilyOptions& cf_options);
226
227
 private:
228
  friend class WritePreparedTxnDB;
229
  friend class WritePreparedTxnDBMock;
230
  friend class WriteUnpreparedTxn;
231
  friend class TransactionTest_DoubleCrashInRecovery_Test;
232
  friend class TransactionTest_DoubleEmptyWrite_Test;
233
  friend class TransactionTest_DuplicateKeys_Test;
234
  friend class TransactionTest_PersistentTwoPhaseTransactionTest_Test;
235
  friend class TransactionTest_TwoPhaseDoubleRecoveryTest_Test;
236
  friend class TransactionTest_TwoPhaseOutOfOrderDelete_Test;
237
  friend class TransactionStressTest_TwoPhaseLongPrepareTest_Test;
238
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
239
  friend class WriteUnpreparedTransactionTest_MarkLogWithPrepSection_Test;
240
241
  Transaction* BeginInternalTransaction(const WriteOptions& options);
242
  Transaction* GetTransactionByNameLocked(const TransactionName& name);
243
244
  std::shared_ptr<LockManager> lock_manager_;
245
246
  // Must be held when adding/dropping column families.
247
  InstrumentedMutex column_family_mutex_;
248
249
  // Used to ensure that no locks are stolen from an expirable transaction
250
  // that has started a commit. Only transactions with an expiration time
251
  // should be in this map.
252
  std::mutex map_mutex_;
253
  std::unordered_map<TransactionID, PessimisticTransaction*>
254
      expirable_transactions_map_;
255
256
  // map from name to two phase transaction instance
257
  std::mutex name_map_mutex_;
258
  std::unordered_map<TransactionName, Transaction*> transactions_;
259
260
  // Signal that we are testing a crash scenario. Some asserts could be relaxed
261
  // in such cases.
262
0
  virtual void TEST_Crash() {}
263
};
264
265
// A PessimisticTransactionDB that writes the data to the DB after the commit.
266
// In this way the DB only contains the committed data.
267
class WriteCommittedTxnDB : public PessimisticTransactionDB {
268
 public:
269
  explicit WriteCommittedTxnDB(DB* db,
270
                               const TransactionDBOptions& txn_db_options)
271
0
      : PessimisticTransactionDB(db, txn_db_options) {}
272
273
  explicit WriteCommittedTxnDB(StackableDB* db,
274
                               const TransactionDBOptions& txn_db_options)
275
0
      : PessimisticTransactionDB(db, txn_db_options) {}
276
277
0
  virtual ~WriteCommittedTxnDB() {}
278
279
  Transaction* BeginTransaction(const WriteOptions& write_options,
280
                                const TransactionOptions& txn_options,
281
                                Transaction* old_txn) override;
282
283
  // Optimized version of ::Write that makes use of skip_concurrency_control
284
  // hint
285
  using TransactionDB::Write;
286
  Status Write(const WriteOptions& opts,
287
               const TransactionDBWriteOptimizations& optimizations,
288
               WriteBatch* updates) override;
289
  Status Write(const WriteOptions& opts, WriteBatch* updates) override;
290
};
291
292
inline Status PessimisticTransactionDB::FailIfBatchHasTs(
293
0
    const WriteBatch* batch) {
294
0
  if (batch != nullptr && WriteBatchInternal::HasKeyWithTimestamp(*batch)) {
295
0
    return Status::NotSupported(
296
0
        "Writes with timestamp must go through transaction API instead of "
297
0
        "TransactionDB.");
298
0
  }
299
0
  return Status::OK();
300
0
}
301
302
inline Status PessimisticTransactionDB::FailIfCfEnablesTs(
303
0
    const DB* db, const ColumnFamilyHandle* column_family) {
304
0
  assert(db);
305
0
  column_family = column_family ? column_family : db->DefaultColumnFamily();
306
0
  assert(column_family);
307
0
  const Comparator* const ucmp = column_family->GetComparator();
308
0
  assert(ucmp);
309
0
  if (ucmp->timestamp_size() > 0) {
310
0
    return Status::NotSupported(
311
0
        "Write operation with user timestamp must go through the transaction "
312
0
        "API instead of TransactionDB.");
313
0
  }
314
0
  return Status::OK();
315
0
}
316
317
class SnapshotCreationCallback : public PostMemTableCallback {
318
 public:
319
  explicit SnapshotCreationCallback(
320
      DBImpl* dbi, TxnTimestamp commit_ts,
321
      const std::shared_ptr<TransactionNotifier>& notifier,
322
      std::shared_ptr<const Snapshot>& snapshot)
323
0
      : db_impl_(dbi),
324
0
        commit_ts_(commit_ts),
325
0
        snapshot_notifier_(notifier),
326
0
        snapshot_(snapshot) {
327
0
    assert(db_impl_);
328
0
  }
329
330
0
  ~SnapshotCreationCallback() override {
331
0
    snapshot_creation_status_.PermitUncheckedError();
332
0
  }
333
334
  Status operator()(SequenceNumber seq, bool disable_memtable) override;
335
336
 private:
337
  DBImpl* const db_impl_;
338
  const TxnTimestamp commit_ts_;
339
  std::shared_ptr<TransactionNotifier> snapshot_notifier_;
340
  std::shared_ptr<const Snapshot>& snapshot_;
341
342
  Status snapshot_creation_status_;
343
};
344
345
}  // namespace ROCKSDB_NAMESPACE