Coverage Report

Created: 2026-05-31 07:45

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "utilities/transactions/pessimistic_transaction_db.h"
7
8
#include <cinttypes>
9
#include <memory>
10
#include <sstream>
11
#include <string>
12
#include <unordered_set>
13
#include <vector>
14
15
#include "db/db_impl/db_impl.h"
16
#include "logging/logging.h"
17
#include "rocksdb/db.h"
18
#include "rocksdb/options.h"
19
#include "rocksdb/utilities/transaction_db.h"
20
#include "test_util/sync_point.h"
21
#include "util/cast_util.h"
22
#include "util/mutexlock.h"
23
#include "utilities/secondary_index/secondary_index_mixin.h"
24
#include "utilities/transactions/pessimistic_transaction.h"
25
#include "utilities/transactions/transaction_db_mutex_impl.h"
26
#include "utilities/transactions/write_prepared_txn_db.h"
27
#include "utilities/transactions/write_unprepared_txn_db.h"
28
29
namespace ROCKSDB_NAMESPACE {
30
31
PessimisticTransactionDB::PessimisticTransactionDB(
32
    DB* db, const TransactionDBOptions& txn_db_options)
33
0
    : TransactionDB(db),
34
0
      db_impl_(static_cast_with_check<DBImpl>(db)),
35
0
      txn_db_options_(txn_db_options),
36
0
      lock_manager_(NewLockManager(this, txn_db_options)) {
37
0
  assert(db_impl_ != nullptr);
38
0
  info_log_ = db_impl_->GetDBOptions().info_log;
39
0
}
40
41
// Support initiliazing PessimisticTransactionDB from a stackable db
42
//
43
//    PessimisticTransactionDB
44
//     ^        ^
45
//     |        |
46
//     |        +
47
//     |   StackableDB
48
//     |   ^
49
//     |   |
50
//     +   +
51
//     DBImpl
52
//       ^
53
//       |(inherit)
54
//       +
55
//       DB
56
//
57
PessimisticTransactionDB::PessimisticTransactionDB(
58
    StackableDB* db, const TransactionDBOptions& txn_db_options)
59
0
    : TransactionDB(db),
60
0
      db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
61
0
      txn_db_options_(txn_db_options),
62
0
      lock_manager_(NewLockManager(this, txn_db_options)) {
63
0
  assert(db_impl_ != nullptr);
64
0
}
65
66
0
PessimisticTransactionDB::~PessimisticTransactionDB() {
67
0
  while (!transactions_.empty()) {
68
0
    delete transactions_.begin()->second;
69
    // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
70
    // clear whether delete would also remove the entry from transactions_.
71
0
  }
72
0
}
73
74
Status PessimisticTransactionDB::VerifyCFOptions(
75
0
    const ColumnFamilyOptions& cf_options) {
76
0
  const Comparator* const ucmp = cf_options.comparator;
77
0
  assert(ucmp);
78
0
  size_t ts_sz = ucmp->timestamp_size();
79
0
  if (0 == ts_sz) {
80
0
    return Status::OK();
81
0
  }
82
0
  if (ts_sz != sizeof(TxnTimestamp)) {
83
0
    std::ostringstream oss;
84
0
    oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp)
85
0
        << " bytes. CF comparator " << std::string(ucmp->Name())
86
0
        << " timestamp size is " << ts_sz << " bytes";
87
0
    return Status::InvalidArgument(oss.str());
88
0
  }
89
0
  if (txn_db_options_.write_policy != WRITE_COMMITTED) {
90
0
    return Status::NotSupported("Only WriteCommittedTxn supports timestamp");
91
0
  }
92
0
  return Status::OK();
93
0
}
94
95
Status PessimisticTransactionDB::Initialize(
96
    const std::vector<size_t>& compaction_enabled_cf_indices,
97
0
    const std::vector<ColumnFamilyHandle*>& handles) {
98
0
  for (auto cf_ptr : handles) {
99
0
    AddColumnFamily(cf_ptr);
100
0
  }
101
  // Verify cf options
102
0
  for (auto handle : handles) {
103
0
    ColumnFamilyDescriptor cfd;
104
0
    Status s = handle->GetDescriptor(&cfd);
105
0
    if (!s.ok()) {
106
0
      return s;
107
0
    }
108
0
    s = VerifyCFOptions(cfd.options);
109
0
    if (!s.ok()) {
110
0
      return s;
111
0
    }
112
0
  }
113
114
  // Re-enable compaction for the column families that initially had
115
  // compaction enabled.
116
0
  std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
117
0
  compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
118
0
  for (auto index : compaction_enabled_cf_indices) {
119
0
    compaction_enabled_cf_handles.push_back(handles[index]);
120
0
  }
121
122
0
  Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
123
124
  // create 'real' transactions from recovered shell transactions
125
0
  auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
126
0
  assert(dbimpl != nullptr);
127
0
  auto rtrxs = dbimpl->recovered_transactions();
128
129
0
  if (txn_db_options_.write_policy == WRITE_COMMITTED) {
130
    // Protect commit_bypass_memtable's install-before-publish window:
131
    // WBWI keys land in immutable memtables at seqnos above LastSequence()
132
    // before SetLastSequence() catches up, so a flush starting in that
133
    // window may drop older versions still visible at the published
134
    // boundary. WP/WU handle this via their own SnapshotChecker.
135
0
    db_impl_->EnableTrackPublishedSeqInSnapshotContext();
136
0
  }
137
138
0
  for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
139
0
    auto recovered_trx = it->second;
140
0
    assert(recovered_trx);
141
0
    assert(recovered_trx->batches_.size() == 1);
142
0
    const auto& seq = recovered_trx->batches_.begin()->first;
143
0
    const auto& batch_info = recovered_trx->batches_.begin()->second;
144
0
    assert(batch_info.log_number_);
145
0
    assert(recovered_trx->name_.length());
146
147
    // TODO: plumb Env::IOActivity, Env::IOPriority
148
0
    WriteOptions w_options;
149
0
    w_options.sync = true;
150
0
    TransactionOptions t_options;
151
    // This would help avoiding deadlock for keys that although exist in the WAL
152
    // did not go through concurrency control. This includes the merge that
153
    // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
154
    // there is a conflict between the keys of two transactions that must be
155
    // avoided, it is already avoided by the application, MyRocks, before the
156
    // restart (ii) application, MyRocks, guarntees to rollback/commit the
157
    // recovered transactions before new transactions start.
158
0
    t_options.skip_concurrency_control = true;
159
160
0
    Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
161
0
    assert(real_trx);
162
0
    real_trx->SetLogNumber(batch_info.log_number_);
163
0
    assert(seq != kMaxSequenceNumber);
164
0
    if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
165
0
      real_trx->SetId(seq);
166
0
    }
167
168
0
    s = real_trx->SetName(recovered_trx->name_);
169
0
    if (!s.ok()) {
170
0
      break;
171
0
    }
172
173
0
    s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
174
    // WriteCommitted set this to to disable this check that is specific to
175
    // WritePrepared txns
176
0
    assert(batch_info.batch_cnt_ == 0 ||
177
0
           real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
178
0
    real_trx->SetState(Transaction::PREPARED);
179
0
    if (!s.ok()) {
180
0
      break;
181
0
    }
182
0
  }
183
0
  if (s.ok()) {
184
0
    dbimpl->DeleteAllRecoveredTransactions();
185
0
  }
186
0
  return s;
187
0
}
188
189
Transaction* WriteCommittedTxnDB::BeginTransaction(
190
    const WriteOptions& write_options, const TransactionOptions& txn_options,
191
0
    Transaction* old_txn) {
192
0
  if (old_txn != nullptr) {
193
0
    ReinitializeTransaction(old_txn, write_options, txn_options);
194
0
    return old_txn;
195
0
  } else {
196
0
    if (!txn_db_options_.secondary_indices.empty()) {
197
0
      return new SecondaryIndexMixin<WriteCommittedTxn>(
198
0
          &txn_db_options_.secondary_indices, this, write_options, txn_options);
199
0
    } else {
200
0
      return new WriteCommittedTxn(this, write_options, txn_options);
201
0
    }
202
0
  }
203
0
}
204
205
TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
206
0
    const TransactionDBOptions& txn_db_options) {
207
0
  TransactionDBOptions validated = txn_db_options;
208
209
0
  if (txn_db_options.num_stripes == 0) {
210
0
    validated.num_stripes = 1;
211
0
  }
212
213
0
  return validated;
214
0
}
215
216
Status TransactionDB::Open(const Options& options,
217
                           const TransactionDBOptions& txn_db_options,
218
0
                           const std::string& dbname, TransactionDB** dbptr) {
219
0
  DBOptions db_options(options);
220
0
  ColumnFamilyOptions cf_options(options);
221
0
  std::vector<ColumnFamilyDescriptor> column_families;
222
0
  column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
223
0
  std::vector<ColumnFamilyHandle*> handles;
224
0
  Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
225
0
                                 column_families, &handles, dbptr);
226
0
  if (s.ok()) {
227
0
    assert(handles.size() == 1);
228
    // i can delete the handle since DBImpl is always holding a reference to
229
    // default column family
230
0
    delete handles[0];
231
0
  }
232
233
0
  return s;
234
0
}
235
236
Status TransactionDB::Open(
237
    const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
238
    const std::string& dbname,
239
    const std::vector<ColumnFamilyDescriptor>& column_families,
240
0
    std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
241
0
  Status s;
242
0
  std::unique_ptr<DB> db;
243
0
  if (txn_db_options.write_policy == WRITE_COMMITTED &&
244
0
      db_options.unordered_write) {
245
0
    return Status::NotSupported(
246
0
        "WRITE_COMMITTED is incompatible with unordered_writes");
247
0
  }
248
0
  if (txn_db_options.write_policy == WRITE_UNPREPARED &&
249
0
      db_options.unordered_write) {
250
    // TODO(lth): support it
251
0
    return Status::NotSupported(
252
0
        "WRITE_UNPREPARED is currently incompatible with unordered_writes");
253
0
  }
254
0
  if (txn_db_options.write_policy == WRITE_PREPARED &&
255
0
      db_options.unordered_write && !db_options.two_write_queues) {
256
0
    return Status::NotSupported(
257
0
        "WRITE_PREPARED is incompatible with unordered_writes if "
258
0
        "two_write_queues is not enabled.");
259
0
  }
260
261
0
  std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
262
0
  std::vector<size_t> compaction_enabled_cf_indices;
263
0
  DBOptions db_options_2pc = db_options;
264
0
  PrepareWrap(&db_options_2pc, &column_families_copy,
265
0
              &compaction_enabled_cf_indices);
266
0
  const bool use_seq_per_batch =
267
0
      txn_db_options.write_policy == WRITE_PREPARED ||
268
0
      txn_db_options.write_policy == WRITE_UNPREPARED;
269
0
  const bool use_batch_per_txn =
270
0
      txn_db_options.write_policy == WRITE_COMMITTED ||
271
0
      txn_db_options.write_policy == WRITE_PREPARED;
272
0
  s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
273
0
                   use_seq_per_batch, use_batch_per_txn,
274
0
                   /*is_retry=*/false, /*can_retry=*/nullptr);
275
0
  if (s.ok()) {
276
0
    ROCKS_LOG_WARN(db->GetDBOptions().info_log,
277
0
                   "Transaction write_policy is %" PRId32,
278
0
                   static_cast<int>(txn_db_options.write_policy));
279
    // if WrapDB return non-ok, db will be deleted in WrapDB() via
280
    // ~StackableDB().
281
0
    s = WrapDB(db.release(), txn_db_options, compaction_enabled_cf_indices,
282
0
               *handles, dbptr);
283
0
  }
284
0
  return s;
285
0
}
286
287
void TransactionDB::PrepareWrap(
288
    DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
289
0
    std::vector<size_t>* compaction_enabled_cf_indices) {
290
0
  compaction_enabled_cf_indices->clear();
291
292
  // Enable MemTable History if not already enabled
293
0
  for (size_t i = 0; i < column_families->size(); i++) {
294
0
    ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
295
296
0
    if (cf_options->max_write_buffer_size_to_maintain == 0) {
297
      // Setting to -1 will set the History size to
298
      // max_write_buffer_number * write_buffer_size.
299
0
      cf_options->max_write_buffer_size_to_maintain = -1;
300
0
    }
301
0
    if (!cf_options->disable_auto_compactions) {
302
      // Disable compactions momentarily to prevent race with DB::Open
303
0
      cf_options->disable_auto_compactions = true;
304
0
      compaction_enabled_cf_indices->push_back(i);
305
0
    }
306
0
  }
307
0
  db_options->allow_2pc = true;
308
0
}
309
310
namespace {
311
template <typename DBType>
312
Status WrapAnotherDBInternal(
313
    DBType* db, const TransactionDBOptions& txn_db_options,
314
    const std::vector<size_t>& compaction_enabled_cf_indices,
315
0
    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
316
0
  assert(db != nullptr);
317
0
  assert(dbptr != nullptr);
318
0
  *dbptr = nullptr;
319
0
  std::unique_ptr<PessimisticTransactionDB> txn_db;
320
  // txn_db owns object pointed to by the raw db pointer.
321
0
  switch (txn_db_options.write_policy) {
322
0
    case WRITE_UNPREPARED:
323
0
      txn_db.reset(new WriteUnpreparedTxnDB(
324
0
          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
325
0
      break;
326
0
    case WRITE_PREPARED:
327
0
      txn_db.reset(new WritePreparedTxnDB(
328
0
          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
329
0
      break;
330
0
    case WRITE_COMMITTED:
331
0
    default:
332
0
      txn_db.reset(new WriteCommittedTxnDB(
333
0
          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
334
0
  }
335
0
  txn_db->UpdateCFComparatorMap(handles);
336
0
  Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
337
  // In case of a failure at this point, db is deleted via the txn_db destructor
338
  // and set to nullptr.
339
0
  if (s.ok()) {
340
0
    *dbptr = txn_db.release();
341
0
  } else {
342
0
    for (auto* h : handles) {
343
0
      delete h;
344
0
    }
345
    // txn_db still owns db, and ~StackableDB() will be called when txn_db goes
346
    // out of scope, deleting the input db pointer.
347
0
    ROCKS_LOG_FATAL(db->GetDBOptions().info_log,
348
0
                    "Failed to initialize txn_db: %s", s.ToString().c_str());
349
0
  }
350
0
  return s;
351
0
}
Unexecuted instantiation: pessimistic_transaction_db.cc:rocksdb::Status rocksdb::(anonymous namespace)::WrapAnotherDBInternal<rocksdb::DB>(rocksdb::DB*, rocksdb::TransactionDBOptions const&, std::__1::vector<unsigned long, std::__1::allocator<unsigned long> > const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionDB**)
Unexecuted instantiation: pessimistic_transaction_db.cc:rocksdb::Status rocksdb::(anonymous namespace)::WrapAnotherDBInternal<rocksdb::StackableDB>(rocksdb::StackableDB*, rocksdb::TransactionDBOptions const&, std::__1::vector<unsigned long, std::__1::allocator<unsigned long> > const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionDB**)
352
}  // namespace
353
354
Status TransactionDB::WrapDB(
355
    // make sure this db is already opened with memtable history enabled,
356
    // auto compaction distabled and 2 phase commit enabled
357
    DB* db, const TransactionDBOptions& txn_db_options,
358
    const std::vector<size_t>& compaction_enabled_cf_indices,
359
0
    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
360
0
  return WrapAnotherDBInternal(db, txn_db_options,
361
0
                               compaction_enabled_cf_indices, handles, dbptr);
362
0
}
363
364
Status TransactionDB::WrapStackableDB(
365
    // make sure this stackable_db is already opened with memtable history
366
    // enabled, auto compaction distabled and 2 phase commit enabled
367
    StackableDB* db, const TransactionDBOptions& txn_db_options,
368
    const std::vector<size_t>& compaction_enabled_cf_indices,
369
0
    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
370
0
  return WrapAnotherDBInternal(db, txn_db_options,
371
0
                               compaction_enabled_cf_indices, handles, dbptr);
372
0
}
373
374
// Let LockManager know that this column family exists so it can
375
// allocate a LockMap for it.
376
void PessimisticTransactionDB::AddColumnFamily(
377
0
    const ColumnFamilyHandle* handle) {
378
0
  lock_manager_->AddColumnFamily(handle);
379
0
}
380
381
Status PessimisticTransactionDB::CreateColumnFamily(
382
    const ColumnFamilyOptions& options, const std::string& column_family_name,
383
0
    ColumnFamilyHandle** handle) {
384
0
  InstrumentedMutexLock l(&column_family_mutex_);
385
0
  Status s = VerifyCFOptions(options);
386
0
  if (!s.ok()) {
387
0
    return s;
388
0
  }
389
390
0
  s = db_->CreateColumnFamily(options, column_family_name, handle);
391
0
  if (s.ok()) {
392
0
    lock_manager_->AddColumnFamily(*handle);
393
0
    UpdateCFComparatorMap(*handle);
394
0
  }
395
396
0
  return s;
397
0
}
398
399
Status PessimisticTransactionDB::CreateColumnFamilies(
400
    const ColumnFamilyOptions& options,
401
    const std::vector<std::string>& column_family_names,
402
0
    std::vector<ColumnFamilyHandle*>* handles) {
403
0
  InstrumentedMutexLock l(&column_family_mutex_);
404
405
0
  Status s = VerifyCFOptions(options);
406
0
  if (!s.ok()) {
407
0
    return s;
408
0
  }
409
410
0
  s = db_->CreateColumnFamilies(options, column_family_names, handles);
411
0
  if (s.ok()) {
412
0
    for (auto* handle : *handles) {
413
0
      lock_manager_->AddColumnFamily(handle);
414
0
      UpdateCFComparatorMap(handle);
415
0
    }
416
0
  }
417
418
0
  return s;
419
0
}
420
421
Status PessimisticTransactionDB::CreateColumnFamilies(
422
    const std::vector<ColumnFamilyDescriptor>& column_families,
423
0
    std::vector<ColumnFamilyHandle*>* handles) {
424
0
  InstrumentedMutexLock l(&column_family_mutex_);
425
426
0
  for (auto& cf_desc : column_families) {
427
0
    Status s = VerifyCFOptions(cf_desc.options);
428
0
    if (!s.ok()) {
429
0
      return s;
430
0
    }
431
0
  }
432
433
0
  Status s = db_->CreateColumnFamilies(column_families, handles);
434
0
  if (s.ok()) {
435
0
    for (auto* handle : *handles) {
436
0
      lock_manager_->AddColumnFamily(handle);
437
0
      UpdateCFComparatorMap(handle);
438
0
    }
439
0
  }
440
441
0
  return s;
442
0
}
443
444
Status PessimisticTransactionDB::CreateColumnFamilyWithImport(
445
    const ColumnFamilyOptions& options, const std::string& column_family_name,
446
    const ImportColumnFamilyOptions& import_options,
447
    const std::vector<const ExportImportFilesMetaData*>& metadatas,
448
0
    ColumnFamilyHandle** handle) {
449
0
  InstrumentedMutexLock l(&column_family_mutex_);
450
0
  Status s = VerifyCFOptions(options);
451
0
  if (!s.ok()) {
452
0
    return s;
453
0
  }
454
455
0
  s = db_->CreateColumnFamilyWithImport(options, column_family_name,
456
0
                                        import_options, metadatas, handle);
457
0
  if (s.ok()) {
458
0
    lock_manager_->AddColumnFamily(*handle);
459
0
    UpdateCFComparatorMap(*handle);
460
0
  }
461
462
0
  return s;
463
0
}
464
465
// Let LockManager know that it can deallocate the LockMap for this
466
// column family.
467
Status PessimisticTransactionDB::DropColumnFamily(
468
0
    ColumnFamilyHandle* column_family) {
469
0
  InstrumentedMutexLock l(&column_family_mutex_);
470
471
0
  Status s = db_->DropColumnFamily(column_family);
472
0
  if (s.ok()) {
473
0
    lock_manager_->RemoveColumnFamily(column_family);
474
0
  }
475
476
0
  return s;
477
0
}
478
479
Status PessimisticTransactionDB::DropColumnFamilies(
480
0
    const std::vector<ColumnFamilyHandle*>& column_families) {
481
0
  InstrumentedMutexLock l(&column_family_mutex_);
482
483
0
  Status s = db_->DropColumnFamilies(column_families);
484
0
  if (s.ok()) {
485
0
    for (auto* handle : column_families) {
486
0
      lock_manager_->RemoveColumnFamily(handle);
487
0
    }
488
0
  }
489
490
0
  return s;
491
0
}
492
493
Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
494
                                         uint32_t cfh_id,
495
                                         const std::string& key,
496
0
                                         bool exclusive) {
497
0
  return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
498
0
}
499
500
Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
501
                                              uint32_t cfh_id,
502
                                              const Endpoint& start_endp,
503
0
                                              const Endpoint& end_endp) {
504
0
  return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
505
0
                                /*exclusive=*/true);
506
0
}
507
508
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
509
0
                                      const LockTracker& keys) {
510
0
  lock_manager_->UnLock(txn, keys, GetEnv());
511
0
}
512
513
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
514
0
                                      uint32_t cfh_id, const std::string& key) {
515
0
  lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
516
0
}
517
518
// Used when wrapping DB write operations in a transaction
519
Transaction* PessimisticTransactionDB::BeginInternalTransaction(
520
0
    const WriteOptions& options) {
521
0
  TransactionOptions txn_options;
522
0
  Transaction* txn = BeginTransaction(options, txn_options, nullptr);
523
524
  // Use default timeout for non-transactional writes
525
0
  txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
526
0
  return txn;
527
0
}
528
529
// All user Put, PutEntity, Merge, Delete, and Write requests must be
530
// intercepted to make sure that they lock all keys that they are writing to
531
// avoid causing conflicts with any concurrent transactions. The easiest way to
532
// do this is to wrap all write operations in a transaction.
533
//
534
// Put(), PutEntity(), Merge(), and Delete() only lock a single key per call.
535
// Write() will sort its keys before locking them.  This guarantees that
536
// TransactionDB write methods cannot deadlock with each other (but still could
537
// deadlock with a Transaction).
538
Status PessimisticTransactionDB::Put(const WriteOptions& options,
539
                                     ColumnFamilyHandle* column_family,
540
0
                                     const Slice& key, const Slice& val) {
541
0
  Status s = FailIfCfEnablesTs(this, column_family);
542
0
  if (!s.ok()) {
543
0
    return s;
544
0
  }
545
546
0
  Transaction* txn = BeginInternalTransaction(options);
547
0
  txn->DisableIndexing();
548
549
  // Since the client didn't create a transaction, they don't care about
550
  // conflict checking for this write.  So we just need to do PutUntracked().
551
0
  s = txn->PutUntracked(column_family, key, val);
552
553
0
  if (s.ok()) {
554
0
    s = txn->Commit();
555
0
  }
556
557
0
  delete txn;
558
559
0
  return s;
560
0
}
561
562
Status PessimisticTransactionDB::PutEntity(const WriteOptions& options,
563
                                           ColumnFamilyHandle* column_family,
564
                                           const Slice& key,
565
0
                                           const WideColumns& columns) {
566
0
  {
567
0
    const Status s = FailIfCfEnablesTs(this, column_family);
568
0
    if (!s.ok()) {
569
0
      return s;
570
0
    }
571
0
  }
572
573
0
  {
574
0
    std::unique_ptr<Transaction> txn(BeginInternalTransaction(options));
575
0
    txn->DisableIndexing();
576
577
    // Since the client didn't create a transaction, they don't care about
578
    // conflict checking for this write.  So we just need to do
579
    // PutEntityUntracked().
580
0
    {
581
0
      const Status s = txn->PutEntityUntracked(column_family, key, columns);
582
0
      if (!s.ok()) {
583
0
        return s;
584
0
      }
585
0
    }
586
587
0
    {
588
0
      const Status s = txn->Commit();
589
0
      if (!s.ok()) {
590
0
        return s;
591
0
      }
592
0
    }
593
0
  }
594
595
0
  return Status::OK();
596
0
}
597
598
Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
599
                                        ColumnFamilyHandle* column_family,
600
0
                                        const Slice& key) {
601
0
  Status s = FailIfCfEnablesTs(this, column_family);
602
0
  if (!s.ok()) {
603
0
    return s;
604
0
  }
605
606
0
  Transaction* txn = BeginInternalTransaction(wopts);
607
0
  txn->DisableIndexing();
608
609
  // Since the client didn't create a transaction, they don't care about
610
  // conflict checking for this write.  So we just need to do
611
  // DeleteUntracked().
612
0
  s = txn->DeleteUntracked(column_family, key);
613
614
0
  if (s.ok()) {
615
0
    s = txn->Commit();
616
0
  }
617
618
0
  delete txn;
619
620
0
  return s;
621
0
}
622
623
Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
624
                                              ColumnFamilyHandle* column_family,
625
0
                                              const Slice& key) {
626
0
  Status s = FailIfCfEnablesTs(this, column_family);
627
0
  if (!s.ok()) {
628
0
    return s;
629
0
  }
630
631
0
  Transaction* txn = BeginInternalTransaction(wopts);
632
0
  txn->DisableIndexing();
633
634
  // Since the client didn't create a transaction, they don't care about
635
  // conflict checking for this write.  So we just need to do
636
  // SingleDeleteUntracked().
637
0
  s = txn->SingleDeleteUntracked(column_family, key);
638
639
0
  if (s.ok()) {
640
0
    s = txn->Commit();
641
0
  }
642
643
0
  delete txn;
644
645
0
  return s;
646
0
}
647
648
Status PessimisticTransactionDB::Merge(const WriteOptions& options,
649
                                       ColumnFamilyHandle* column_family,
650
0
                                       const Slice& key, const Slice& value) {
651
0
  Status s = FailIfCfEnablesTs(this, column_family);
652
0
  if (!s.ok()) {
653
0
    return s;
654
0
  }
655
656
0
  Transaction* txn = BeginInternalTransaction(options);
657
0
  txn->DisableIndexing();
658
659
  // Since the client didn't create a transaction, they don't care about
660
  // conflict checking for this write.  So we just need to do
661
  // MergeUntracked().
662
0
  s = txn->MergeUntracked(column_family, key, value);
663
664
0
  if (s.ok()) {
665
0
    s = txn->Commit();
666
0
  }
667
668
0
  delete txn;
669
670
0
  return s;
671
0
}
672
673
Status PessimisticTransactionDB::Write(const WriteOptions& opts,
674
0
                                       WriteBatch* updates) {
675
0
  return WriteWithConcurrencyControl(opts, updates);
676
0
}
677
678
Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
679
0
                                  WriteBatch* updates) {
680
0
  Status s = FailIfBatchHasTs(updates);
681
0
  if (!s.ok()) {
682
0
    return s;
683
0
  }
684
0
  if (txn_db_options_.skip_concurrency_control) {
685
0
    return db_impl_->Write(opts, updates);
686
0
  } else {
687
0
    return WriteWithConcurrencyControl(opts, updates);
688
0
  }
689
0
}
690
691
Status WriteCommittedTxnDB::Write(
692
    const WriteOptions& opts,
693
0
    const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
694
0
  Status s = FailIfBatchHasTs(updates);
695
0
  if (!s.ok()) {
696
0
    return s;
697
0
  }
698
0
  if (optimizations.skip_concurrency_control) {
699
0
    return db_impl_->Write(opts, updates);
700
0
  } else {
701
0
    return WriteWithConcurrencyControl(opts, updates);
702
0
  }
703
0
}
704
705
void PessimisticTransactionDB::InsertExpirableTransaction(
706
0
    TransactionID tx_id, PessimisticTransaction* tx) {
707
0
  assert(tx->GetExpirationTime() > 0);
708
0
  std::lock_guard<std::mutex> lock(map_mutex_);
709
0
  expirable_transactions_map_.insert({tx_id, tx});
710
0
}
711
712
0
void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
713
0
  std::lock_guard<std::mutex> lock(map_mutex_);
714
0
  expirable_transactions_map_.erase(tx_id);
715
0
}
716
717
bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
718
0
    TransactionID tx_id) {
719
0
  std::lock_guard<std::mutex> lock(map_mutex_);
720
721
0
  auto tx_it = expirable_transactions_map_.find(tx_id);
722
0
  if (tx_it == expirable_transactions_map_.end()) {
723
0
    return true;
724
0
  }
725
0
  PessimisticTransaction& tx = *(tx_it->second);
726
0
  return tx.TryStealingLocks();
727
0
}
728
729
void PessimisticTransactionDB::ReinitializeTransaction(
730
    Transaction* txn, const WriteOptions& write_options,
731
0
    const TransactionOptions& txn_options) {
732
0
  auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
733
734
0
  txn_impl->Reinitialize(this, write_options, txn_options);
735
0
}
736
737
Transaction* PessimisticTransactionDB::GetTransactionByName(
738
0
    const TransactionName& name) {
739
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
740
0
  return GetTransactionByNameLocked(name);
741
0
}
742
743
Transaction* PessimisticTransactionDB::GetTransactionByNameLocked(
744
0
    const TransactionName& name) {
745
0
  auto it = transactions_.find(name);
746
0
  if (it == transactions_.end()) {
747
0
    return nullptr;
748
0
  } else {
749
0
    return it->second;
750
0
  }
751
0
}
752
753
void PessimisticTransactionDB::GetAllPreparedTransactions(
754
0
    std::vector<Transaction*>* transv) {
755
0
  assert(transv);
756
0
  transv->clear();
757
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
758
0
  for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
759
0
    if (it->second->GetState() == Transaction::PREPARED) {
760
0
      transv->push_back(it->second);
761
0
    }
762
0
  }
763
0
}
764
765
0
LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
766
0
  return lock_manager_->GetPointLockStatus();
767
0
}
768
769
0
std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
770
0
  return lock_manager_->GetDeadlockInfoBuffer();
771
0
}
772
773
0
void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
774
0
  lock_manager_->Resize(target_size);
775
0
}
776
777
0
Status PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
778
0
  assert(txn);
779
0
  assert(txn->GetName().length() > 0);
780
0
  assert(txn->GetState() == Transaction::STARTED);
781
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
782
0
  if (!transactions_.insert({txn->GetName(), txn}).second) {
783
0
    return Status::InvalidArgument("Duplicate txn name " + txn->GetName());
784
0
  }
785
0
  return Status::OK();
786
0
}
787
788
0
void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
789
0
  assert(txn);
790
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
791
0
  auto it = transactions_.find(txn->GetName());
792
0
  assert(it != transactions_.end());
793
0
  transactions_.erase(it);
794
0
}
795
796
std::pair<Status, std::shared_ptr<const Snapshot>>
797
0
PessimisticTransactionDB::CreateTimestampedSnapshot(TxnTimestamp ts) {
798
0
  if (kMaxTxnTimestamp == ts) {
799
0
    return std::make_pair(Status::InvalidArgument("invalid ts"), nullptr);
800
0
  }
801
0
  assert(db_impl_);
802
0
  return db_impl_->CreateTimestampedSnapshot(kMaxSequenceNumber, ts);
803
0
}
804
805
std::shared_ptr<const Snapshot>
806
0
PessimisticTransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const {
807
0
  assert(db_impl_);
808
0
  return db_impl_->GetTimestampedSnapshot(ts);
809
0
}
810
811
void PessimisticTransactionDB::ReleaseTimestampedSnapshotsOlderThan(
812
0
    TxnTimestamp ts) {
813
0
  assert(db_impl_);
814
0
  db_impl_->ReleaseTimestampedSnapshotsOlderThan(ts);
815
0
}
816
817
Status PessimisticTransactionDB::GetTimestampedSnapshots(
818
    TxnTimestamp ts_lb, TxnTimestamp ts_ub,
819
0
    std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const {
820
0
  assert(db_impl_);
821
0
  return db_impl_->GetTimestampedSnapshots(ts_lb, ts_ub, timestamped_snapshots);
822
0
}
823
824
Status SnapshotCreationCallback::operator()(SequenceNumber seq,
825
0
                                            bool disable_memtable) {
826
0
  assert(db_impl_);
827
0
  assert(commit_ts_ != kMaxTxnTimestamp);
828
829
0
  const bool two_write_queues =
830
0
      db_impl_->immutable_db_options().two_write_queues;
831
0
  assert(!two_write_queues || !disable_memtable);
832
0
#ifdef NDEBUG
833
0
  (void)two_write_queues;
834
0
  (void)disable_memtable;
835
0
#endif
836
837
0
  const bool seq_per_batch = db_impl_->seq_per_batch();
838
0
  if (!seq_per_batch) {
839
0
    assert(db_impl_->GetLastPublishedSequence() <= seq);
840
0
  } else {
841
0
    assert(db_impl_->GetLastPublishedSequence() < seq);
842
0
  }
843
844
  // Create a snapshot which can also be used for write conflict checking.
845
0
  auto ret = db_impl_->CreateTimestampedSnapshot(seq, commit_ts_);
846
0
  snapshot_creation_status_ = ret.first;
847
0
  snapshot_ = ret.second;
848
0
  if (snapshot_creation_status_.ok()) {
849
0
    assert(snapshot_);
850
0
  } else {
851
0
    assert(!snapshot_);
852
0
  }
853
0
  if (snapshot_ && snapshot_notifier_) {
854
0
    snapshot_notifier_->SnapshotCreated(snapshot_.get());
855
0
  }
856
0
  return Status::OK();
857
0
}
858
859
}  // namespace ROCKSDB_NAMESPACE