Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/utilities/transactions/pessimistic_transaction_db.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "utilities/transactions/pessimistic_transaction_db.h"
7
8
#include <cinttypes>
9
#include <memory>
10
#include <sstream>
11
#include <string>
12
#include <unordered_set>
13
#include <vector>
14
15
#include "db/db_impl/db_impl.h"
16
#include "logging/logging.h"
17
#include "rocksdb/db.h"
18
#include "rocksdb/options.h"
19
#include "rocksdb/utilities/transaction_db.h"
20
#include "test_util/sync_point.h"
21
#include "util/cast_util.h"
22
#include "util/mutexlock.h"
23
#include "utilities/transactions/pessimistic_transaction.h"
24
#include "utilities/transactions/transaction_db_mutex_impl.h"
25
#include "utilities/transactions/write_prepared_txn_db.h"
26
#include "utilities/transactions/write_unprepared_txn_db.h"
27
28
namespace ROCKSDB_NAMESPACE {
29
30
PessimisticTransactionDB::PessimisticTransactionDB(
31
    DB* db, const TransactionDBOptions& txn_db_options)
32
    : TransactionDB(db),
33
      db_impl_(static_cast_with_check<DBImpl>(db)),
34
      txn_db_options_(txn_db_options),
35
0
      lock_manager_(NewLockManager(this, txn_db_options)) {
36
0
  assert(db_impl_ != nullptr);
37
0
  info_log_ = db_impl_->GetDBOptions().info_log;
38
0
}
39
40
// Support initiliazing PessimisticTransactionDB from a stackable db
41
//
42
//    PessimisticTransactionDB
43
//     ^        ^
44
//     |        |
45
//     |        +
46
//     |   StackableDB
47
//     |   ^
48
//     |   |
49
//     +   +
50
//     DBImpl
51
//       ^
52
//       |(inherit)
53
//       +
54
//       DB
55
//
56
PessimisticTransactionDB::PessimisticTransactionDB(
57
    StackableDB* db, const TransactionDBOptions& txn_db_options)
58
    : TransactionDB(db),
59
      db_impl_(static_cast_with_check<DBImpl>(db->GetRootDB())),
60
      txn_db_options_(txn_db_options),
61
0
      lock_manager_(NewLockManager(this, txn_db_options)) {
62
0
  assert(db_impl_ != nullptr);
63
0
}
64
65
0
PessimisticTransactionDB::~PessimisticTransactionDB() {
66
0
  while (!transactions_.empty()) {
67
0
    delete transactions_.begin()->second;
68
    // TODO(myabandeh): this seems to be an unsafe approach as it is not quite
69
    // clear whether delete would also remove the entry from transactions_.
70
0
  }
71
0
}
72
73
Status PessimisticTransactionDB::VerifyCFOptions(
74
0
    const ColumnFamilyOptions& cf_options) {
75
0
  const Comparator* const ucmp = cf_options.comparator;
76
0
  assert(ucmp);
77
0
  size_t ts_sz = ucmp->timestamp_size();
78
0
  if (0 == ts_sz) {
79
0
    return Status::OK();
80
0
  }
81
0
  if (ts_sz != sizeof(TxnTimestamp)) {
82
0
    std::ostringstream oss;
83
0
    oss << "Timestamp of transaction must have " << sizeof(TxnTimestamp)
84
0
        << " bytes. CF comparator " << std::string(ucmp->Name())
85
0
        << " timestamp size is " << ts_sz << " bytes";
86
0
    return Status::InvalidArgument(oss.str());
87
0
  }
88
0
  if (txn_db_options_.write_policy != WRITE_COMMITTED) {
89
0
    return Status::NotSupported("Only WriteCommittedTxn supports timestamp");
90
0
  }
91
0
  return Status::OK();
92
0
}
93
94
Status PessimisticTransactionDB::Initialize(
95
    const std::vector<size_t>& compaction_enabled_cf_indices,
96
0
    const std::vector<ColumnFamilyHandle*>& handles) {
97
0
  for (auto cf_ptr : handles) {
98
0
    AddColumnFamily(cf_ptr);
99
0
  }
100
  // Verify cf options
101
0
  for (auto handle : handles) {
102
0
    ColumnFamilyDescriptor cfd;
103
0
    Status s = handle->GetDescriptor(&cfd);
104
0
    if (!s.ok()) {
105
0
      return s;
106
0
    }
107
0
    s = VerifyCFOptions(cfd.options);
108
0
    if (!s.ok()) {
109
0
      return s;
110
0
    }
111
0
  }
112
113
  // Re-enable compaction for the column families that initially had
114
  // compaction enabled.
115
0
  std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
116
0
  compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
117
0
  for (auto index : compaction_enabled_cf_indices) {
118
0
    compaction_enabled_cf_handles.push_back(handles[index]);
119
0
  }
120
121
0
  Status s = EnableAutoCompaction(compaction_enabled_cf_handles);
122
123
  // create 'real' transactions from recovered shell transactions
124
0
  auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
125
0
  assert(dbimpl != nullptr);
126
0
  auto rtrxs = dbimpl->recovered_transactions();
127
128
0
  for (auto it = rtrxs.begin(); it != rtrxs.end(); ++it) {
129
0
    auto recovered_trx = it->second;
130
0
    assert(recovered_trx);
131
0
    assert(recovered_trx->batches_.size() == 1);
132
0
    const auto& seq = recovered_trx->batches_.begin()->first;
133
0
    const auto& batch_info = recovered_trx->batches_.begin()->second;
134
0
    assert(batch_info.log_number_);
135
0
    assert(recovered_trx->name_.length());
136
137
    // TODO: plumb Env::IOActivity, Env::IOPriority
138
0
    WriteOptions w_options;
139
0
    w_options.sync = true;
140
0
    TransactionOptions t_options;
141
    // This would help avoiding deadlock for keys that although exist in the WAL
142
    // did not go through concurrency control. This includes the merge that
143
    // MyRocks uses for auto-inc columns. It is safe to do so, since (i) if
144
    // there is a conflict between the keys of two transactions that must be
145
    // avoided, it is already avoided by the application, MyRocks, before the
146
    // restart (ii) application, MyRocks, guarntees to rollback/commit the
147
    // recovered transactions before new transactions start.
148
0
    t_options.skip_concurrency_control = true;
149
150
0
    Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
151
0
    assert(real_trx);
152
0
    real_trx->SetLogNumber(batch_info.log_number_);
153
0
    assert(seq != kMaxSequenceNumber);
154
0
    if (GetTxnDBOptions().write_policy != WRITE_COMMITTED) {
155
0
      real_trx->SetId(seq);
156
0
    }
157
158
0
    s = real_trx->SetName(recovered_trx->name_);
159
0
    if (!s.ok()) {
160
0
      break;
161
0
    }
162
163
0
    s = real_trx->RebuildFromWriteBatch(batch_info.batch_);
164
    // WriteCommitted set this to to disable this check that is specific to
165
    // WritePrepared txns
166
0
    assert(batch_info.batch_cnt_ == 0 ||
167
0
           real_trx->GetWriteBatch()->SubBatchCnt() == batch_info.batch_cnt_);
168
0
    real_trx->SetState(Transaction::PREPARED);
169
0
    if (!s.ok()) {
170
0
      break;
171
0
    }
172
0
  }
173
0
  if (s.ok()) {
174
0
    dbimpl->DeleteAllRecoveredTransactions();
175
0
  }
176
0
  return s;
177
0
}
178
179
Transaction* WriteCommittedTxnDB::BeginTransaction(
180
    const WriteOptions& write_options, const TransactionOptions& txn_options,
181
0
    Transaction* old_txn) {
182
0
  if (old_txn != nullptr) {
183
0
    ReinitializeTransaction(old_txn, write_options, txn_options);
184
0
    return old_txn;
185
0
  } else {
186
0
    return new WriteCommittedTxn(this, write_options, txn_options);
187
0
  }
188
0
}
189
190
TransactionDBOptions PessimisticTransactionDB::ValidateTxnDBOptions(
191
0
    const TransactionDBOptions& txn_db_options) {
192
0
  TransactionDBOptions validated = txn_db_options;
193
194
0
  if (txn_db_options.num_stripes == 0) {
195
0
    validated.num_stripes = 1;
196
0
  }
197
198
0
  return validated;
199
0
}
200
201
Status TransactionDB::Open(const Options& options,
202
                           const TransactionDBOptions& txn_db_options,
203
0
                           const std::string& dbname, TransactionDB** dbptr) {
204
0
  DBOptions db_options(options);
205
0
  ColumnFamilyOptions cf_options(options);
206
0
  std::vector<ColumnFamilyDescriptor> column_families;
207
0
  column_families.emplace_back(kDefaultColumnFamilyName, cf_options);
208
0
  std::vector<ColumnFamilyHandle*> handles;
209
0
  Status s = TransactionDB::Open(db_options, txn_db_options, dbname,
210
0
                                 column_families, &handles, dbptr);
211
0
  if (s.ok()) {
212
0
    assert(handles.size() == 1);
213
    // i can delete the handle since DBImpl is always holding a reference to
214
    // default column family
215
0
    delete handles[0];
216
0
  }
217
218
0
  return s;
219
0
}
220
221
Status TransactionDB::Open(
222
    const DBOptions& db_options, const TransactionDBOptions& txn_db_options,
223
    const std::string& dbname,
224
    const std::vector<ColumnFamilyDescriptor>& column_families,
225
0
    std::vector<ColumnFamilyHandle*>* handles, TransactionDB** dbptr) {
226
0
  Status s;
227
0
  DB* db = nullptr;
228
0
  if (txn_db_options.write_policy == WRITE_COMMITTED &&
229
0
      db_options.unordered_write) {
230
0
    return Status::NotSupported(
231
0
        "WRITE_COMMITTED is incompatible with unordered_writes");
232
0
  }
233
0
  if (txn_db_options.write_policy == WRITE_UNPREPARED &&
234
0
      db_options.unordered_write) {
235
    // TODO(lth): support it
236
0
    return Status::NotSupported(
237
0
        "WRITE_UNPREPARED is currently incompatible with unordered_writes");
238
0
  }
239
0
  if (txn_db_options.write_policy == WRITE_PREPARED &&
240
0
      db_options.unordered_write && !db_options.two_write_queues) {
241
0
    return Status::NotSupported(
242
0
        "WRITE_PREPARED is incompatible with unordered_writes if "
243
0
        "two_write_queues is not enabled.");
244
0
  }
245
246
0
  std::vector<ColumnFamilyDescriptor> column_families_copy = column_families;
247
0
  std::vector<size_t> compaction_enabled_cf_indices;
248
0
  DBOptions db_options_2pc = db_options;
249
0
  PrepareWrap(&db_options_2pc, &column_families_copy,
250
0
              &compaction_enabled_cf_indices);
251
0
  const bool use_seq_per_batch =
252
0
      txn_db_options.write_policy == WRITE_PREPARED ||
253
0
      txn_db_options.write_policy == WRITE_UNPREPARED;
254
0
  const bool use_batch_per_txn =
255
0
      txn_db_options.write_policy == WRITE_COMMITTED ||
256
0
      txn_db_options.write_policy == WRITE_PREPARED;
257
0
  s = DBImpl::Open(db_options_2pc, dbname, column_families_copy, handles, &db,
258
0
                   use_seq_per_batch, use_batch_per_txn,
259
0
                   /*is_retry=*/false, /*can_retry=*/nullptr);
260
0
  if (s.ok()) {
261
0
    ROCKS_LOG_WARN(db->GetDBOptions().info_log,
262
0
                   "Transaction write_policy is %" PRId32,
263
0
                   static_cast<int>(txn_db_options.write_policy));
264
    // if WrapDB return non-ok, db will be deleted in WrapDB() via
265
    // ~StackableDB().
266
0
    s = WrapDB(db, txn_db_options, compaction_enabled_cf_indices, *handles,
267
0
               dbptr);
268
0
  }
269
0
  return s;
270
0
}
271
272
void TransactionDB::PrepareWrap(
273
    DBOptions* db_options, std::vector<ColumnFamilyDescriptor>* column_families,
274
0
    std::vector<size_t>* compaction_enabled_cf_indices) {
275
0
  compaction_enabled_cf_indices->clear();
276
277
  // Enable MemTable History if not already enabled
278
0
  for (size_t i = 0; i < column_families->size(); i++) {
279
0
    ColumnFamilyOptions* cf_options = &(*column_families)[i].options;
280
281
0
    if (cf_options->max_write_buffer_size_to_maintain == 0 &&
282
0
        cf_options->max_write_buffer_number_to_maintain == 0) {
283
      // Setting to -1 will set the History size to
284
      // max_write_buffer_number * write_buffer_size.
285
0
      cf_options->max_write_buffer_size_to_maintain = -1;
286
0
    }
287
0
    if (!cf_options->disable_auto_compactions) {
288
      // Disable compactions momentarily to prevent race with DB::Open
289
0
      cf_options->disable_auto_compactions = true;
290
0
      compaction_enabled_cf_indices->push_back(i);
291
0
    }
292
0
  }
293
0
  db_options->allow_2pc = true;
294
0
}
295
296
namespace {
297
template <typename DBType>
298
Status WrapAnotherDBInternal(
299
    DBType* db, const TransactionDBOptions& txn_db_options,
300
    const std::vector<size_t>& compaction_enabled_cf_indices,
301
0
    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
302
0
  assert(db != nullptr);
303
0
  assert(dbptr != nullptr);
304
0
  *dbptr = nullptr;
305
0
  std::unique_ptr<PessimisticTransactionDB> txn_db;
306
  // txn_db owns object pointed to by the raw db pointer.
307
0
  switch (txn_db_options.write_policy) {
308
0
    case WRITE_UNPREPARED:
309
0
      txn_db.reset(new WriteUnpreparedTxnDB(
310
0
          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
311
0
      break;
312
0
    case WRITE_PREPARED:
313
0
      txn_db.reset(new WritePreparedTxnDB(
314
0
          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
315
0
      break;
316
0
    case WRITE_COMMITTED:
317
0
    default:
318
0
      txn_db.reset(new WriteCommittedTxnDB(
319
0
          db, PessimisticTransactionDB::ValidateTxnDBOptions(txn_db_options)));
320
0
  }
321
0
  txn_db->UpdateCFComparatorMap(handles);
322
0
  Status s = txn_db->Initialize(compaction_enabled_cf_indices, handles);
323
  // In case of a failure at this point, db is deleted via the txn_db destructor
324
  // and set to nullptr.
325
0
  if (s.ok()) {
326
0
    *dbptr = txn_db.release();
327
0
  } else {
328
0
    for (auto* h : handles) {
329
0
      delete h;
330
0
    }
331
    // txn_db still owns db, and ~StackableDB() will be called when txn_db goes
332
    // out of scope, deleting the input db pointer.
333
0
    ROCKS_LOG_FATAL(db->GetDBOptions().info_log,
334
0
                    "Failed to initialize txn_db: %s", s.ToString().c_str());
335
0
  }
336
0
  return s;
337
0
}
Unexecuted instantiation: pessimistic_transaction_db.cc:rocksdb::Status rocksdb::(anonymous namespace)::WrapAnotherDBInternal<rocksdb::DB>(rocksdb::DB*, rocksdb::TransactionDBOptions const&, std::__1::vector<unsigned long, std::__1::allocator<unsigned long> > const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionDB**)
Unexecuted instantiation: pessimistic_transaction_db.cc:rocksdb::Status rocksdb::(anonymous namespace)::WrapAnotherDBInternal<rocksdb::StackableDB>(rocksdb::StackableDB*, rocksdb::TransactionDBOptions const&, std::__1::vector<unsigned long, std::__1::allocator<unsigned long> > const&, std::__1::vector<rocksdb::ColumnFamilyHandle*, std::__1::allocator<rocksdb::ColumnFamilyHandle*> > const&, rocksdb::TransactionDB**)
338
}  // namespace
339
340
Status TransactionDB::WrapDB(
341
    // make sure this db is already opened with memtable history enabled,
342
    // auto compaction distabled and 2 phase commit enabled
343
    DB* db, const TransactionDBOptions& txn_db_options,
344
    const std::vector<size_t>& compaction_enabled_cf_indices,
345
0
    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
346
0
  return WrapAnotherDBInternal(db, txn_db_options,
347
0
                               compaction_enabled_cf_indices, handles, dbptr);
348
0
}
349
350
Status TransactionDB::WrapStackableDB(
351
    // make sure this stackable_db is already opened with memtable history
352
    // enabled, auto compaction distabled and 2 phase commit enabled
353
    StackableDB* db, const TransactionDBOptions& txn_db_options,
354
    const std::vector<size_t>& compaction_enabled_cf_indices,
355
0
    const std::vector<ColumnFamilyHandle*>& handles, TransactionDB** dbptr) {
356
0
  return WrapAnotherDBInternal(db, txn_db_options,
357
0
                               compaction_enabled_cf_indices, handles, dbptr);
358
0
}
359
360
// Let LockManager know that this column family exists so it can
361
// allocate a LockMap for it.
362
void PessimisticTransactionDB::AddColumnFamily(
363
0
    const ColumnFamilyHandle* handle) {
364
0
  lock_manager_->AddColumnFamily(handle);
365
0
}
366
367
Status PessimisticTransactionDB::CreateColumnFamily(
368
    const ColumnFamilyOptions& options, const std::string& column_family_name,
369
0
    ColumnFamilyHandle** handle) {
370
0
  InstrumentedMutexLock l(&column_family_mutex_);
371
0
  Status s = VerifyCFOptions(options);
372
0
  if (!s.ok()) {
373
0
    return s;
374
0
  }
375
376
0
  s = db_->CreateColumnFamily(options, column_family_name, handle);
377
0
  if (s.ok()) {
378
0
    lock_manager_->AddColumnFamily(*handle);
379
0
    UpdateCFComparatorMap(*handle);
380
0
  }
381
382
0
  return s;
383
0
}
384
385
Status PessimisticTransactionDB::CreateColumnFamilies(
386
    const ColumnFamilyOptions& options,
387
    const std::vector<std::string>& column_family_names,
388
0
    std::vector<ColumnFamilyHandle*>* handles) {
389
0
  InstrumentedMutexLock l(&column_family_mutex_);
390
391
0
  Status s = VerifyCFOptions(options);
392
0
  if (!s.ok()) {
393
0
    return s;
394
0
  }
395
396
0
  s = db_->CreateColumnFamilies(options, column_family_names, handles);
397
0
  if (s.ok()) {
398
0
    for (auto* handle : *handles) {
399
0
      lock_manager_->AddColumnFamily(handle);
400
0
      UpdateCFComparatorMap(handle);
401
0
    }
402
0
  }
403
404
0
  return s;
405
0
}
406
407
Status PessimisticTransactionDB::CreateColumnFamilies(
408
    const std::vector<ColumnFamilyDescriptor>& column_families,
409
0
    std::vector<ColumnFamilyHandle*>* handles) {
410
0
  InstrumentedMutexLock l(&column_family_mutex_);
411
412
0
  for (auto& cf_desc : column_families) {
413
0
    Status s = VerifyCFOptions(cf_desc.options);
414
0
    if (!s.ok()) {
415
0
      return s;
416
0
    }
417
0
  }
418
419
0
  Status s = db_->CreateColumnFamilies(column_families, handles);
420
0
  if (s.ok()) {
421
0
    for (auto* handle : *handles) {
422
0
      lock_manager_->AddColumnFamily(handle);
423
0
      UpdateCFComparatorMap(handle);
424
0
    }
425
0
  }
426
427
0
  return s;
428
0
}
429
430
Status PessimisticTransactionDB::CreateColumnFamilyWithImport(
431
    const ColumnFamilyOptions& options, const std::string& column_family_name,
432
    const ImportColumnFamilyOptions& import_options,
433
    const std::vector<const ExportImportFilesMetaData*>& metadatas,
434
0
    ColumnFamilyHandle** handle) {
435
0
  InstrumentedMutexLock l(&column_family_mutex_);
436
0
  Status s = VerifyCFOptions(options);
437
0
  if (!s.ok()) {
438
0
    return s;
439
0
  }
440
441
0
  s = db_->CreateColumnFamilyWithImport(options, column_family_name,
442
0
                                        import_options, metadatas, handle);
443
0
  if (s.ok()) {
444
0
    lock_manager_->AddColumnFamily(*handle);
445
0
    UpdateCFComparatorMap(*handle);
446
0
  }
447
448
0
  return s;
449
0
}
450
451
// Let LockManager know that it can deallocate the LockMap for this
452
// column family.
453
Status PessimisticTransactionDB::DropColumnFamily(
454
0
    ColumnFamilyHandle* column_family) {
455
0
  InstrumentedMutexLock l(&column_family_mutex_);
456
457
0
  Status s = db_->DropColumnFamily(column_family);
458
0
  if (s.ok()) {
459
0
    lock_manager_->RemoveColumnFamily(column_family);
460
0
  }
461
462
0
  return s;
463
0
}
464
465
Status PessimisticTransactionDB::DropColumnFamilies(
466
0
    const std::vector<ColumnFamilyHandle*>& column_families) {
467
0
  InstrumentedMutexLock l(&column_family_mutex_);
468
469
0
  Status s = db_->DropColumnFamilies(column_families);
470
0
  if (s.ok()) {
471
0
    for (auto* handle : column_families) {
472
0
      lock_manager_->RemoveColumnFamily(handle);
473
0
    }
474
0
  }
475
476
0
  return s;
477
0
}
478
479
Status PessimisticTransactionDB::TryLock(PessimisticTransaction* txn,
480
                                         uint32_t cfh_id,
481
                                         const std::string& key,
482
0
                                         bool exclusive) {
483
0
  return lock_manager_->TryLock(txn, cfh_id, key, GetEnv(), exclusive);
484
0
}
485
486
Status PessimisticTransactionDB::TryRangeLock(PessimisticTransaction* txn,
487
                                              uint32_t cfh_id,
488
                                              const Endpoint& start_endp,
489
0
                                              const Endpoint& end_endp) {
490
0
  return lock_manager_->TryLock(txn, cfh_id, start_endp, end_endp, GetEnv(),
491
0
                                /*exclusive=*/true);
492
0
}
493
494
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
495
0
                                      const LockTracker& keys) {
496
0
  lock_manager_->UnLock(txn, keys, GetEnv());
497
0
}
498
499
void PessimisticTransactionDB::UnLock(PessimisticTransaction* txn,
500
0
                                      uint32_t cfh_id, const std::string& key) {
501
0
  lock_manager_->UnLock(txn, cfh_id, key, GetEnv());
502
0
}
503
504
// Used when wrapping DB write operations in a transaction
505
Transaction* PessimisticTransactionDB::BeginInternalTransaction(
506
0
    const WriteOptions& options) {
507
0
  TransactionOptions txn_options;
508
0
  Transaction* txn = BeginTransaction(options, txn_options, nullptr);
509
510
  // Use default timeout for non-transactional writes
511
0
  txn->SetLockTimeout(txn_db_options_.default_lock_timeout);
512
0
  return txn;
513
0
}
514
515
// All user Put, PutEntity, Merge, Delete, and Write requests must be
516
// intercepted to make sure that they lock all keys that they are writing to
517
// avoid causing conflicts with any concurrent transactions. The easiest way to
518
// do this is to wrap all write operations in a transaction.
519
//
520
// Put(), PutEntity(), Merge(), and Delete() only lock a single key per call.
521
// Write() will sort its keys before locking them.  This guarantees that
522
// TransactionDB write methods cannot deadlock with each other (but still could
523
// deadlock with a Transaction).
524
Status PessimisticTransactionDB::Put(const WriteOptions& options,
525
                                     ColumnFamilyHandle* column_family,
526
0
                                     const Slice& key, const Slice& val) {
527
0
  Status s = FailIfCfEnablesTs(this, column_family);
528
0
  if (!s.ok()) {
529
0
    return s;
530
0
  }
531
532
0
  Transaction* txn = BeginInternalTransaction(options);
533
0
  txn->DisableIndexing();
534
535
  // Since the client didn't create a transaction, they don't care about
536
  // conflict checking for this write.  So we just need to do PutUntracked().
537
0
  s = txn->PutUntracked(column_family, key, val);
538
539
0
  if (s.ok()) {
540
0
    s = txn->Commit();
541
0
  }
542
543
0
  delete txn;
544
545
0
  return s;
546
0
}
547
548
Status PessimisticTransactionDB::PutEntity(const WriteOptions& options,
549
                                           ColumnFamilyHandle* column_family,
550
                                           const Slice& key,
551
0
                                           const WideColumns& columns) {
552
0
  {
553
0
    const Status s = FailIfCfEnablesTs(this, column_family);
554
0
    if (!s.ok()) {
555
0
      return s;
556
0
    }
557
0
  }
558
559
0
  {
560
0
    std::unique_ptr<Transaction> txn(BeginInternalTransaction(options));
561
0
    txn->DisableIndexing();
562
563
    // Since the client didn't create a transaction, they don't care about
564
    // conflict checking for this write.  So we just need to do
565
    // PutEntityUntracked().
566
0
    {
567
0
      const Status s = txn->PutEntityUntracked(column_family, key, columns);
568
0
      if (!s.ok()) {
569
0
        return s;
570
0
      }
571
0
    }
572
573
0
    {
574
0
      const Status s = txn->Commit();
575
0
      if (!s.ok()) {
576
0
        return s;
577
0
      }
578
0
    }
579
0
  }
580
581
0
  return Status::OK();
582
0
}
583
584
Status PessimisticTransactionDB::Delete(const WriteOptions& wopts,
585
                                        ColumnFamilyHandle* column_family,
586
0
                                        const Slice& key) {
587
0
  Status s = FailIfCfEnablesTs(this, column_family);
588
0
  if (!s.ok()) {
589
0
    return s;
590
0
  }
591
592
0
  Transaction* txn = BeginInternalTransaction(wopts);
593
0
  txn->DisableIndexing();
594
595
  // Since the client didn't create a transaction, they don't care about
596
  // conflict checking for this write.  So we just need to do
597
  // DeleteUntracked().
598
0
  s = txn->DeleteUntracked(column_family, key);
599
600
0
  if (s.ok()) {
601
0
    s = txn->Commit();
602
0
  }
603
604
0
  delete txn;
605
606
0
  return s;
607
0
}
608
609
Status PessimisticTransactionDB::SingleDelete(const WriteOptions& wopts,
610
                                              ColumnFamilyHandle* column_family,
611
0
                                              const Slice& key) {
612
0
  Status s = FailIfCfEnablesTs(this, column_family);
613
0
  if (!s.ok()) {
614
0
    return s;
615
0
  }
616
617
0
  Transaction* txn = BeginInternalTransaction(wopts);
618
0
  txn->DisableIndexing();
619
620
  // Since the client didn't create a transaction, they don't care about
621
  // conflict checking for this write.  So we just need to do
622
  // SingleDeleteUntracked().
623
0
  s = txn->SingleDeleteUntracked(column_family, key);
624
625
0
  if (s.ok()) {
626
0
    s = txn->Commit();
627
0
  }
628
629
0
  delete txn;
630
631
0
  return s;
632
0
}
633
634
Status PessimisticTransactionDB::Merge(const WriteOptions& options,
635
                                       ColumnFamilyHandle* column_family,
636
0
                                       const Slice& key, const Slice& value) {
637
0
  Status s = FailIfCfEnablesTs(this, column_family);
638
0
  if (!s.ok()) {
639
0
    return s;
640
0
  }
641
642
0
  Transaction* txn = BeginInternalTransaction(options);
643
0
  txn->DisableIndexing();
644
645
  // Since the client didn't create a transaction, they don't care about
646
  // conflict checking for this write.  So we just need to do
647
  // MergeUntracked().
648
0
  s = txn->MergeUntracked(column_family, key, value);
649
650
0
  if (s.ok()) {
651
0
    s = txn->Commit();
652
0
  }
653
654
0
  delete txn;
655
656
0
  return s;
657
0
}
658
659
Status PessimisticTransactionDB::Write(const WriteOptions& opts,
660
0
                                       WriteBatch* updates) {
661
0
  return WriteWithConcurrencyControl(opts, updates);
662
0
}
663
664
Status WriteCommittedTxnDB::Write(const WriteOptions& opts,
665
0
                                  WriteBatch* updates) {
666
0
  Status s = FailIfBatchHasTs(updates);
667
0
  if (!s.ok()) {
668
0
    return s;
669
0
  }
670
0
  if (txn_db_options_.skip_concurrency_control) {
671
0
    return db_impl_->Write(opts, updates);
672
0
  } else {
673
0
    return WriteWithConcurrencyControl(opts, updates);
674
0
  }
675
0
}
676
677
Status WriteCommittedTxnDB::Write(
678
    const WriteOptions& opts,
679
0
    const TransactionDBWriteOptimizations& optimizations, WriteBatch* updates) {
680
0
  Status s = FailIfBatchHasTs(updates);
681
0
  if (!s.ok()) {
682
0
    return s;
683
0
  }
684
0
  if (optimizations.skip_concurrency_control) {
685
0
    return db_impl_->Write(opts, updates);
686
0
  } else {
687
0
    return WriteWithConcurrencyControl(opts, updates);
688
0
  }
689
0
}
690
691
void PessimisticTransactionDB::InsertExpirableTransaction(
692
0
    TransactionID tx_id, PessimisticTransaction* tx) {
693
0
  assert(tx->GetExpirationTime() > 0);
694
0
  std::lock_guard<std::mutex> lock(map_mutex_);
695
0
  expirable_transactions_map_.insert({tx_id, tx});
696
0
}
697
698
0
void PessimisticTransactionDB::RemoveExpirableTransaction(TransactionID tx_id) {
699
0
  std::lock_guard<std::mutex> lock(map_mutex_);
700
0
  expirable_transactions_map_.erase(tx_id);
701
0
}
702
703
bool PessimisticTransactionDB::TryStealingExpiredTransactionLocks(
704
0
    TransactionID tx_id) {
705
0
  std::lock_guard<std::mutex> lock(map_mutex_);
706
707
0
  auto tx_it = expirable_transactions_map_.find(tx_id);
708
0
  if (tx_it == expirable_transactions_map_.end()) {
709
0
    return true;
710
0
  }
711
0
  PessimisticTransaction& tx = *(tx_it->second);
712
0
  return tx.TryStealingLocks();
713
0
}
714
715
void PessimisticTransactionDB::ReinitializeTransaction(
716
    Transaction* txn, const WriteOptions& write_options,
717
0
    const TransactionOptions& txn_options) {
718
0
  auto txn_impl = static_cast_with_check<PessimisticTransaction>(txn);
719
720
0
  txn_impl->Reinitialize(this, write_options, txn_options);
721
0
}
722
723
Transaction* PessimisticTransactionDB::GetTransactionByName(
724
0
    const TransactionName& name) {
725
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
726
0
  return GetTransactionByNameLocked(name);
727
0
}
728
729
Transaction* PessimisticTransactionDB::GetTransactionByNameLocked(
730
0
    const TransactionName& name) {
731
0
  auto it = transactions_.find(name);
732
0
  if (it == transactions_.end()) {
733
0
    return nullptr;
734
0
  } else {
735
0
    return it->second;
736
0
  }
737
0
}
738
739
void PessimisticTransactionDB::GetAllPreparedTransactions(
740
0
    std::vector<Transaction*>* transv) {
741
0
  assert(transv);
742
0
  transv->clear();
743
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
744
0
  for (auto it = transactions_.begin(); it != transactions_.end(); ++it) {
745
0
    if (it->second->GetState() == Transaction::PREPARED) {
746
0
      transv->push_back(it->second);
747
0
    }
748
0
  }
749
0
}
750
751
0
LockManager::PointLockStatus PessimisticTransactionDB::GetLockStatusData() {
752
0
  return lock_manager_->GetPointLockStatus();
753
0
}
754
755
0
std::vector<DeadlockPath> PessimisticTransactionDB::GetDeadlockInfoBuffer() {
756
0
  return lock_manager_->GetDeadlockInfoBuffer();
757
0
}
758
759
0
void PessimisticTransactionDB::SetDeadlockInfoBufferSize(uint32_t target_size) {
760
0
  lock_manager_->Resize(target_size);
761
0
}
762
763
0
Status PessimisticTransactionDB::RegisterTransaction(Transaction* txn) {
764
0
  assert(txn);
765
0
  assert(txn->GetName().length() > 0);
766
0
  assert(txn->GetState() == Transaction::STARTED);
767
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
768
0
  if (!transactions_.insert({txn->GetName(), txn}).second) {
769
0
    return Status::InvalidArgument("Duplicate txn name " + txn->GetName());
770
0
  }
771
0
  return Status::OK();
772
0
}
773
774
0
void PessimisticTransactionDB::UnregisterTransaction(Transaction* txn) {
775
0
  assert(txn);
776
0
  std::lock_guard<std::mutex> lock(name_map_mutex_);
777
0
  auto it = transactions_.find(txn->GetName());
778
0
  assert(it != transactions_.end());
779
0
  transactions_.erase(it);
780
0
}
781
782
std::pair<Status, std::shared_ptr<const Snapshot>>
783
0
PessimisticTransactionDB::CreateTimestampedSnapshot(TxnTimestamp ts) {
784
0
  if (kMaxTxnTimestamp == ts) {
785
0
    return std::make_pair(Status::InvalidArgument("invalid ts"), nullptr);
786
0
  }
787
0
  assert(db_impl_);
788
0
  return db_impl_->CreateTimestampedSnapshot(kMaxSequenceNumber, ts);
789
0
}
790
791
std::shared_ptr<const Snapshot>
792
0
PessimisticTransactionDB::GetTimestampedSnapshot(TxnTimestamp ts) const {
793
0
  assert(db_impl_);
794
0
  return db_impl_->GetTimestampedSnapshot(ts);
795
0
}
796
797
void PessimisticTransactionDB::ReleaseTimestampedSnapshotsOlderThan(
798
0
    TxnTimestamp ts) {
799
0
  assert(db_impl_);
800
0
  db_impl_->ReleaseTimestampedSnapshotsOlderThan(ts);
801
0
}
802
803
Status PessimisticTransactionDB::GetTimestampedSnapshots(
804
    TxnTimestamp ts_lb, TxnTimestamp ts_ub,
805
0
    std::vector<std::shared_ptr<const Snapshot>>& timestamped_snapshots) const {
806
0
  assert(db_impl_);
807
0
  return db_impl_->GetTimestampedSnapshots(ts_lb, ts_ub, timestamped_snapshots);
808
0
}
809
810
Status SnapshotCreationCallback::operator()(SequenceNumber seq,
811
0
                                            bool disable_memtable) {
812
0
  assert(db_impl_);
813
0
  assert(commit_ts_ != kMaxTxnTimestamp);
814
815
0
  const bool two_write_queues =
816
0
      db_impl_->immutable_db_options().two_write_queues;
817
0
  assert(!two_write_queues || !disable_memtable);
818
0
#ifdef NDEBUG
819
0
  (void)two_write_queues;
820
0
  (void)disable_memtable;
821
0
#endif
822
823
0
  const bool seq_per_batch = db_impl_->seq_per_batch();
824
0
  if (!seq_per_batch) {
825
0
    assert(db_impl_->GetLastPublishedSequence() <= seq);
826
0
  } else {
827
0
    assert(db_impl_->GetLastPublishedSequence() < seq);
828
0
  }
829
830
  // Create a snapshot which can also be used for write conflict checking.
831
0
  auto ret = db_impl_->CreateTimestampedSnapshot(seq, commit_ts_);
832
0
  snapshot_creation_status_ = ret.first;
833
0
  snapshot_ = ret.second;
834
0
  if (snapshot_creation_status_.ok()) {
835
0
    assert(snapshot_);
836
0
  } else {
837
0
    assert(!snapshot_);
838
0
  }
839
0
  if (snapshot_ && snapshot_notifier_) {
840
0
    snapshot_notifier_->SnapshotCreated(snapshot_.get());
841
0
  }
842
0
  return Status::OK();
843
0
}
844
845
}  // namespace ROCKSDB_NAMESPACE