Coverage Report

Created: 2026-03-31 07:51

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/write_prepared_txn.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "utilities/transactions/write_prepared_txn.h"
7
8
#include <cinttypes>
9
#include <map>
10
#include <set>
11
12
#include "db/attribute_group_iterator_impl.h"
13
#include "db/column_family.h"
14
#include "db/db_impl/db_impl.h"
15
#include "rocksdb/db.h"
16
#include "rocksdb/status.h"
17
#include "rocksdb/utilities/transaction_db.h"
18
#include "util/cast_util.h"
19
#include "utilities/transactions/pessimistic_transaction.h"
20
#include "utilities/transactions/write_prepared_txn_db.h"
21
22
namespace ROCKSDB_NAMESPACE {
23
24
struct WriteOptions;
25
26
WritePreparedTxn::WritePreparedTxn(WritePreparedTxnDB* txn_db,
27
                                   const WriteOptions& write_options,
28
                                   const TransactionOptions& txn_options)
29
0
    : PessimisticTransaction(txn_db, write_options, txn_options, false),
30
0
      wpt_db_(txn_db) {
31
  // Call Initialize outside PessimisticTransaction constructor otherwise it
32
  // would skip overridden functions in WritePreparedTxn since they are not
33
  // defined yet in the constructor of PessimisticTransaction
34
0
  Initialize(txn_options);
35
0
}
36
37
0
void WritePreparedTxn::Initialize(const TransactionOptions& txn_options) {
38
0
  PessimisticTransaction::Initialize(txn_options);
39
0
  prepare_batch_cnt_ = 0;
40
0
}
41
42
void WritePreparedTxn::MultiGet(const ReadOptions& _read_options,
43
                                ColumnFamilyHandle* column_family,
44
                                const size_t num_keys, const Slice* keys,
45
                                PinnableSlice* values, Status* statuses,
46
0
                                const bool sorted_input) {
47
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
48
0
      _read_options.io_activity != Env::IOActivity::kMultiGet) {
49
0
    Status s = Status::InvalidArgument(
50
0
        "Can only call MultiGet with `ReadOptions::io_activity` is "
51
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kMultiGet`");
52
53
0
    for (size_t i = 0; i < num_keys; ++i) {
54
0
      if (statuses[i].ok()) {
55
0
        statuses[i] = s;
56
0
      }
57
0
    }
58
0
    return;
59
0
  }
60
0
  ReadOptions read_options(_read_options);
61
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
62
0
    read_options.io_activity = Env::IOActivity::kMultiGet;
63
0
  }
64
65
0
  SequenceNumber min_uncommitted, snap_seq;
66
0
  const SnapshotBackup backed_by_snapshot = wpt_db_->AssignMinMaxSeqs(
67
0
      read_options.snapshot, &min_uncommitted, &snap_seq);
68
0
  WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
69
0
                                        backed_by_snapshot);
70
0
  write_batch_.MultiGetFromBatchAndDB(db_, read_options, column_family,
71
0
                                      num_keys, keys, values, statuses,
72
0
                                      sorted_input, &callback);
73
0
  if (UNLIKELY(!callback.valid() ||
74
0
               !wpt_db_->ValidateSnapshot(snap_seq, backed_by_snapshot))) {
75
0
    wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
76
0
    for (size_t i = 0; i < num_keys; i++) {
77
0
      statuses[i] = Status::TryAgain();
78
0
    }
79
0
  }
80
0
}
81
82
Status WritePreparedTxn::Get(const ReadOptions& _read_options,
83
                             ColumnFamilyHandle* column_family,
84
0
                             const Slice& key, PinnableSlice* pinnable_val) {
85
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
86
0
      _read_options.io_activity != Env::IOActivity::kGet) {
87
0
    return Status::InvalidArgument(
88
0
        "Can only call Get with `ReadOptions::io_activity` is "
89
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kGet`");
90
0
  }
91
0
  ReadOptions read_options(_read_options);
92
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
93
0
    read_options.io_activity = Env::IOActivity::kGet;
94
0
  }
95
96
0
  return GetImpl(read_options, column_family, key, pinnable_val);
97
0
}
98
99
Status WritePreparedTxn::GetImpl(const ReadOptions& options,
100
                                 ColumnFamilyHandle* column_family,
101
                                 const Slice& key,
102
0
                                 PinnableSlice* pinnable_val) {
103
0
  SequenceNumber min_uncommitted, snap_seq;
104
0
  const SnapshotBackup backed_by_snapshot =
105
0
      wpt_db_->AssignMinMaxSeqs(options.snapshot, &min_uncommitted, &snap_seq);
106
0
  WritePreparedTxnReadCallback callback(wpt_db_, snap_seq, min_uncommitted,
107
0
                                        backed_by_snapshot);
108
0
  Status res = write_batch_.GetFromBatchAndDB(db_, options, column_family, key,
109
0
                                              pinnable_val, &callback);
110
0
  const bool callback_valid =
111
0
      callback.valid();  // NOTE: validity of callback must always be checked
112
                         // before it is destructed
113
0
  if (res.ok()) {
114
0
    if (!LIKELY(callback_valid &&
115
0
                wpt_db_->ValidateSnapshot(callback.max_visible_seq(),
116
0
                                          backed_by_snapshot))) {
117
0
      wpt_db_->WPRecordTick(TXN_GET_TRY_AGAIN);
118
0
      res = Status::TryAgain();
119
0
    }
120
0
  }
121
122
0
  return res;
123
0
}
124
125
0
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options) {
126
0
  return GetIterator(options, wpt_db_->DefaultColumnFamily());
127
0
}
128
129
Iterator* WritePreparedTxn::GetIterator(const ReadOptions& options,
130
0
                                        ColumnFamilyHandle* column_family) {
131
  // Make sure to get iterator from WritePrepareTxnDB, not the root db.
132
0
  Iterator* db_iter = wpt_db_->NewIterator(options, column_family);
133
0
  assert(db_iter);
134
135
0
  return write_batch_.NewIteratorWithBase(column_family, db_iter, &options);
136
0
}
137
138
std::unique_ptr<Iterator> WritePreparedTxn::GetCoalescingIterator(
139
    const ReadOptions& /* read_options */,
140
0
    const std::vector<ColumnFamilyHandle*>& /* column_families */) {
141
0
  return std::unique_ptr<Iterator>(NewErrorIterator(
142
0
      Status::NotSupported("GetCoalescingIterator not supported for "
143
0
                           "write-prepared/write-unprepared transactions")));
144
0
}
145
146
std::unique_ptr<AttributeGroupIterator>
147
WritePreparedTxn::GetAttributeGroupIterator(
148
    const ReadOptions& /* read_options */,
149
0
    const std::vector<ColumnFamilyHandle*>& /* column_families */) {
150
0
  return NewAttributeGroupErrorIterator(
151
0
      Status::NotSupported("GetAttributeGroupIterator not supported for "
152
0
                           "write-prepared/write-unprepared transactions"));
153
0
}
154
155
0
Status WritePreparedTxn::PrepareInternal() {
156
0
  WriteOptions write_options = write_options_;
157
0
  write_options.disableWAL = false;
158
0
  const bool WRITE_AFTER_COMMIT = true;
159
0
  const bool kFirstPrepareBatch = true;
160
0
  auto s = WriteBatchInternal::MarkEndPrepare(GetWriteBatch()->GetWriteBatch(),
161
0
                                              name_, !WRITE_AFTER_COMMIT);
162
0
  assert(s.ok());
163
  // For each duplicate key we account for a new sub-batch
164
0
  prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
165
  // Having AddPrepared in the PreReleaseCallback allows in-order addition of
166
  // prepared entries to PreparedHeap and hence enables an optimization. Refer
167
  // to SmallestUnCommittedSeq for more details.
168
0
  AddPreparedCallback add_prepared_callback(
169
0
      wpt_db_, db_impl_, prepare_batch_cnt_,
170
0
      db_impl_->immutable_db_options().two_write_queues, kFirstPrepareBatch);
171
0
  const bool DISABLE_MEMTABLE = true;
172
0
  uint64_t seq_used = kMaxSequenceNumber;
173
0
  s = db_impl_->WriteImpl(write_options, GetWriteBatch()->GetWriteBatch(),
174
0
                          /*callback*/ nullptr, /*user_write_cb=*/nullptr,
175
0
                          &log_number_, /*log ref*/ 0, !DISABLE_MEMTABLE,
176
0
                          &seq_used, prepare_batch_cnt_,
177
0
                          &add_prepared_callback);
178
0
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
179
0
  auto prepare_seq = seq_used;
180
0
  SetId(prepare_seq);
181
0
  return s;
182
0
}
183
184
0
Status WritePreparedTxn::CommitWithoutPrepareInternal() {
185
  // For each duplicate key we account for a new sub-batch
186
0
  const size_t batch_cnt = GetWriteBatch()->SubBatchCnt();
187
0
  return CommitBatchInternal(GetWriteBatch()->GetWriteBatch(), batch_cnt);
188
0
}
189
190
Status WritePreparedTxn::CommitBatchInternal(WriteBatch* batch,
191
0
                                             size_t batch_cnt) {
192
0
  return wpt_db_->WriteInternal(write_options_, batch, batch_cnt, this);
193
0
}
194
195
0
Status WritePreparedTxn::CommitInternal() {
196
0
  ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
197
0
                    "CommitInternal prepare_seq: %" PRIu64, GetID());
198
  // We take the commit-time batch and append the Commit marker.
199
  // The Memtable will ignore the Commit marker in non-recovery mode
200
0
  WriteBatch* working_batch = GetCommitTimeWriteBatch();
201
0
  const bool empty = working_batch->Count() == 0;
202
0
  auto s = WriteBatchInternal::MarkCommit(working_batch, name_);
203
0
  assert(s.ok());
204
205
0
  const bool for_recovery = use_only_the_last_commit_time_batch_for_recovery_;
206
0
  if (!empty) {
207
    // When not writing to memtable, we can still cache the latest write batch.
208
    // The cached batch will be written to memtable in WriteRecoverableState
209
    // during FlushMemTable
210
0
    if (for_recovery) {
211
0
      WriteBatchInternal::SetAsLatestPersistentState(working_batch);
212
0
    } else {
213
0
      return Status::InvalidArgument(
214
0
          "Commit-time-batch can only be used if "
215
0
          "use_only_the_last_commit_time_batch_for_recovery is true");
216
0
    }
217
0
  }
218
219
0
  auto prepare_seq = GetId();
220
0
  const bool includes_data = !empty && !for_recovery;
221
0
  assert(prepare_batch_cnt_);
222
0
  size_t commit_batch_cnt = 0;
223
0
  if (UNLIKELY(includes_data)) {
224
0
    ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
225
0
                   "Duplicate key overhead");
226
0
    SubBatchCounter counter(*wpt_db_->GetCFComparatorMap());
227
0
    s = working_batch->Iterate(&counter);
228
0
    assert(s.ok());
229
0
    commit_batch_cnt = counter.BatchCount();
230
0
  }
231
0
  const bool disable_memtable = !includes_data;
232
0
  const bool do_one_write =
233
0
      !db_impl_->immutable_db_options().two_write_queues || disable_memtable;
234
0
  WritePreparedCommitEntryPreReleaseCallback update_commit_map(
235
0
      wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, commit_batch_cnt);
236
  // This is to call AddPrepared on CommitTimeWriteBatch
237
0
  const bool kFirstPrepareBatch = true;
238
0
  AddPreparedCallback add_prepared_callback(
239
0
      wpt_db_, db_impl_, commit_batch_cnt,
240
0
      db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
241
0
  PreReleaseCallback* pre_release_callback;
242
0
  if (do_one_write) {
243
0
    pre_release_callback = &update_commit_map;
244
0
  } else {
245
0
    pre_release_callback = &add_prepared_callback;
246
0
  }
247
0
  uint64_t seq_used = kMaxSequenceNumber;
248
  // Since the prepared batch is directly written to memtable, there is already
249
  // a connection between the memtable and its WAL, so there is no need to
250
  // redundantly reference the log that contains the prepared data.
251
0
  const uint64_t zero_log_number = 0ull;
252
0
  size_t batch_cnt = UNLIKELY(commit_batch_cnt) ? commit_batch_cnt : 1;
253
  // If `two_write_queues && includes_data`, then `do_one_write` is false. The
254
  // following `WriteImpl` will insert the data of the commit-time-batch into
255
  // the database before updating the commit cache. Therefore, the data of the
256
  // commmit-time-batch is considered uncommitted. Furthermore, since data of
257
  // the commit-time-batch are not locked, it is possible for two uncommitted
258
  // versions of the same key to co-exist for a (short) period of time until
259
  // the commit cache is updated by the second write. If the two uncommitted
260
  // keys are compacted to the bottommost level in the meantime, it is possible
261
  // that compaction iterator will zero out the sequence numbers of both, thus
262
  // violating the invariant that an SST does not have two identical internal
263
  // keys. To prevent this situation, we should allow the usage of
264
  // commit-time-batch only if the user sets
265
  // TransactionOptions::use_only_the_last_commit_time_batch_for_recovery to
266
  // true. See the comments about GetCommitTimeWriteBatch() in
267
  // include/rocksdb/utilities/transaction.h.
268
0
  s = db_impl_->WriteImpl(write_options_, working_batch, nullptr,
269
0
                          /*user_write_cb=*/nullptr, nullptr, zero_log_number,
270
0
                          disable_memtable, &seq_used, batch_cnt,
271
0
                          pre_release_callback);
272
0
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
273
0
  const SequenceNumber commit_batch_seq = seq_used;
274
0
  if (LIKELY(do_one_write || !s.ok())) {
275
0
    if (UNLIKELY(!db_impl_->immutable_db_options().two_write_queues &&
276
0
                 s.ok())) {
277
      // Note: RemovePrepared should be called after WriteImpl that publishsed
278
      // the seq. Otherwise SmallestUnCommittedSeq optimization breaks.
279
0
      wpt_db_->RemovePrepared(prepare_seq, prepare_batch_cnt_);
280
0
    }  // else RemovePrepared is called from within PreReleaseCallback
281
0
    if (UNLIKELY(!do_one_write)) {
282
0
      assert(!s.ok());
283
      // Cleanup the prepared entry we added with add_prepared_callback
284
0
      wpt_db_->RemovePrepared(commit_batch_seq, commit_batch_cnt);
285
0
    }
286
0
    return s;
287
0
  }  // else do the 2nd write to publish seq
288
  // Note: the 2nd write comes with a performance penality. So if we have too
289
  // many of commits accompanied with ComitTimeWriteBatch and yet we cannot
290
  // enable use_only_the_last_commit_time_batch_for_recovery_ optimization,
291
  // two_write_queues should be disabled to avoid many additional writes here.
292
0
  const size_t kZeroData = 0;
293
  // Update commit map only from the 2nd queue
294
0
  WritePreparedCommitEntryPreReleaseCallback update_commit_map_with_aux_batch(
295
0
      wpt_db_, db_impl_, prepare_seq, prepare_batch_cnt_, kZeroData,
296
0
      commit_batch_seq, commit_batch_cnt);
297
0
  WriteBatch empty_batch;
298
0
  s = empty_batch.PutLogData(Slice());
299
0
  assert(s.ok());
300
  // In the absence of Prepare markers, use Noop as a batch separator
301
0
  s = WriteBatchInternal::InsertNoop(&empty_batch);
302
0
  assert(s.ok());
303
0
  const bool DISABLE_MEMTABLE = true;
304
0
  const size_t ONE_BATCH = 1;
305
0
  const uint64_t NO_REF_LOG = 0;
306
0
  s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr,
307
0
                          /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
308
0
                          DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
309
0
                          &update_commit_map_with_aux_batch);
310
0
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
311
0
  return s;
312
0
}
313
314
0
Status WritePreparedTxn::RollbackInternal() {
315
0
  ROCKS_LOG_WARN(db_impl_->immutable_db_options().info_log,
316
0
                 "RollbackInternal prepare_seq: %" PRIu64, GetId());
317
318
0
  assert(db_impl_);
319
0
  assert(wpt_db_);
320
321
0
  WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
322
0
                            write_options_.protection_bytes_per_key,
323
0
                            0 /* default_cf_ts_sz */);
324
0
  assert(GetId() != kMaxSequenceNumber);
325
0
  assert(GetId() > 0);
326
0
  auto cf_map_shared_ptr = wpt_db_->GetCFHandleMap();
327
0
  auto cf_comp_map_shared_ptr = wpt_db_->GetCFComparatorMap();
328
0
  auto read_at_seq = kMaxSequenceNumber;
329
  // TODO: plumb Env::IOActivity, Env::IOPriority
330
0
  ReadOptions roptions;
331
  // to prevent callback's seq to be overrriden inside DBImpk::Get
332
0
  roptions.snapshot = wpt_db_->GetMaxSnapshot();
333
0
  struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
334
0
    DBImpl* const db_;
335
0
    WritePreparedTxnDB* const wpt_db_;
336
0
    WritePreparedTxnReadCallback callback_;
337
0
    WriteBatch* rollback_batch_;
338
0
    std::map<uint32_t, const Comparator*>& comparators_;
339
0
    std::map<uint32_t, ColumnFamilyHandle*>& handles_;
340
0
    using CFKeys = std::set<Slice, SetComparator>;
341
0
    std::map<uint32_t, CFKeys> keys_;
342
0
    bool rollback_merge_operands_;
343
0
    ReadOptions roptions_;
344
345
0
    RollbackWriteBatchBuilder(
346
0
        DBImpl* db, WritePreparedTxnDB* wpt_db, SequenceNumber snap_seq,
347
0
        WriteBatch* dst_batch,
348
0
        std::map<uint32_t, const Comparator*>& comparators,
349
0
        std::map<uint32_t, ColumnFamilyHandle*>& handles,
350
0
        bool rollback_merge_operands, const ReadOptions& _roptions)
351
0
        : db_(db),
352
0
          wpt_db_(wpt_db),
353
0
          callback_(wpt_db, snap_seq),  // disable min_uncommitted optimization
354
0
          rollback_batch_(dst_batch),
355
0
          comparators_(comparators),
356
0
          handles_(handles),
357
0
          rollback_merge_operands_(rollback_merge_operands),
358
0
          roptions_(_roptions) {}
359
360
0
    Status Rollback(uint32_t cf, const Slice& key) {
361
0
      Status s;
362
0
      CFKeys& cf_keys = keys_[cf];
363
0
      if (cf_keys.size() == 0) {  // just inserted
364
0
        auto cmp = comparators_[cf];
365
0
        keys_[cf] = CFKeys(SetComparator(cmp));
366
0
      }
367
0
      auto it = cf_keys.insert(key);
368
      // second is false if a element already existed.
369
0
      if (it.second == false) {
370
0
        return s;
371
0
      }
372
373
0
      PinnableSlice pinnable_val;
374
0
      bool not_used;
375
0
      auto cf_handle = handles_[cf];
376
0
      DBImpl::GetImplOptions get_impl_options;
377
0
      get_impl_options.column_family = cf_handle;
378
0
      get_impl_options.value = &pinnable_val;
379
0
      get_impl_options.value_found = &not_used;
380
0
      get_impl_options.callback = &callback_;
381
0
      s = db_->GetImpl(roptions_, key, get_impl_options);
382
0
      assert(s.ok() || s.IsNotFound());
383
0
      if (s.ok()) {
384
0
        s = rollback_batch_->Put(cf_handle, key, pinnable_val);
385
0
        assert(s.ok());
386
0
      } else if (s.IsNotFound()) {
387
        // There has been no readable value before txn. By adding a delete we
388
        // make sure that there will be none afterwards either.
389
0
        if (wpt_db_->ShouldRollbackWithSingleDelete(cf_handle, key)) {
390
0
          s = rollback_batch_->SingleDelete(cf_handle, key);
391
0
        } else {
392
0
          s = rollback_batch_->Delete(cf_handle, key);
393
0
        }
394
0
        assert(s.ok());
395
0
      } else {
396
        // Unexpected status. Return it to the user.
397
0
      }
398
0
      return s;
399
0
    }
400
401
0
    Status PutCF(uint32_t cf, const Slice& key, const Slice& /*val*/) override {
402
0
      return Rollback(cf, key);
403
0
    }
404
405
0
    Status DeleteCF(uint32_t cf, const Slice& key) override {
406
0
      return Rollback(cf, key);
407
0
    }
408
409
0
    Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
410
0
      return Rollback(cf, key);
411
0
    }
412
413
0
    Status MergeCF(uint32_t cf, const Slice& key,
414
0
                   const Slice& /*val*/) override {
415
0
      if (rollback_merge_operands_) {
416
0
        return Rollback(cf, key);
417
0
      } else {
418
0
        return Status::OK();
419
0
      }
420
0
    }
421
422
0
    Status MarkNoop(bool) override { return Status::OK(); }
423
0
    Status MarkBeginPrepare(bool) override { return Status::OK(); }
424
0
    Status MarkEndPrepare(const Slice&) override { return Status::OK(); }
425
0
    Status MarkCommit(const Slice&) override { return Status::OK(); }
426
0
    Status MarkRollback(const Slice&) override {
427
0
      return Status::InvalidArgument();
428
0
    }
429
430
0
   protected:
431
0
    Handler::OptionState WriteAfterCommit() const override {
432
0
      return Handler::OptionState::kDisabled;
433
0
    }
434
0
  } rollback_handler(db_impl_, wpt_db_, read_at_seq, &rollback_batch,
435
0
                     *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
436
0
                     wpt_db_->txn_db_options_.rollback_merge_operands,
437
0
                     roptions);
438
0
  auto s = GetWriteBatch()->GetWriteBatch()->Iterate(&rollback_handler);
439
0
  if (!s.ok()) {
440
0
    return s;
441
0
  }
442
  // The Rollback marker will be used as a batch separator
443
0
  s = WriteBatchInternal::MarkRollback(&rollback_batch, name_);
444
0
  assert(s.ok());
445
0
  bool do_one_write = !db_impl_->immutable_db_options().two_write_queues;
446
0
  const bool DISABLE_MEMTABLE = true;
447
0
  const uint64_t NO_REF_LOG = 0;
448
0
  uint64_t seq_used = kMaxSequenceNumber;
449
0
  const size_t ONE_BATCH = 1;
450
0
  const bool kFirstPrepareBatch = true;
451
  // We commit the rolled back prepared batches. Although this is
452
  // counter-intuitive, i) it is safe to do so, since the prepared batches are
453
  // already canceled out by the rollback batch, ii) adding the commit entry to
454
  // CommitCache will allow us to benefit from the existing mechanism in
455
  // CommitCache that keeps an entry evicted due to max advance and yet overlaps
456
  // with a live snapshot around so that the live snapshot properly skips the
457
  // entry even if its prepare seq is lower than max_evicted_seq_.
458
0
  AddPreparedCallback add_prepared_callback(
459
0
      wpt_db_, db_impl_, ONE_BATCH,
460
0
      db_impl_->immutable_db_options().two_write_queues, !kFirstPrepareBatch);
461
0
  WritePreparedCommitEntryPreReleaseCallback update_commit_map(
462
0
      wpt_db_, db_impl_, GetId(), prepare_batch_cnt_, ONE_BATCH);
463
0
  PreReleaseCallback* pre_release_callback;
464
0
  if (do_one_write) {
465
0
    pre_release_callback = &update_commit_map;
466
0
  } else {
467
0
    pre_release_callback = &add_prepared_callback;
468
0
  }
469
  // Note: the rollback batch does not need AddPrepared since it is written to
470
  // DB in one shot. min_uncommitted still works since it requires capturing
471
  // data that is written to DB but not yet committed, while
472
  // the rollback batch commits with PreReleaseCallback.
473
0
  s = db_impl_->WriteImpl(write_options_, &rollback_batch, nullptr,
474
0
                          /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
475
0
                          !DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
476
0
                          pre_release_callback);
477
0
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
478
0
  if (!s.ok()) {
479
0
    return s;
480
0
  }
481
0
  if (do_one_write) {
482
0
    assert(!db_impl_->immutable_db_options().two_write_queues);
483
0
    wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
484
0
    return s;
485
0
  }  // else do the 2nd write for commit
486
0
  uint64_t rollback_seq = seq_used;
487
0
  ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
488
0
                    "RollbackInternal 2nd write rollback_seq: %" PRIu64,
489
0
                    rollback_seq);
490
  // Commit the batch by writing an empty batch to the queue that will release
491
  // the commit sequence number to readers.
492
0
  WritePreparedRollbackPreReleaseCallback update_commit_map_with_prepare(
493
0
      wpt_db_, db_impl_, GetId(), rollback_seq, prepare_batch_cnt_);
494
0
  WriteBatch empty_batch;
495
0
  s = empty_batch.PutLogData(Slice());
496
0
  assert(s.ok());
497
  // In the absence of Prepare markers, use Noop as a batch separator
498
0
  s = WriteBatchInternal::InsertNoop(&empty_batch);
499
0
  assert(s.ok());
500
0
  s = db_impl_->WriteImpl(write_options_, &empty_batch, nullptr,
501
0
                          /*user_write_cb=*/nullptr, nullptr, NO_REF_LOG,
502
0
                          DISABLE_MEMTABLE, &seq_used, ONE_BATCH,
503
0
                          &update_commit_map_with_prepare);
504
0
  assert(!s.ok() || seq_used != kMaxSequenceNumber);
505
0
  ROCKS_LOG_DETAILS(db_impl_->immutable_db_options().info_log,
506
0
                    "RollbackInternal (status=%s) commit: %" PRIu64,
507
0
                    s.ToString().c_str(), GetId());
508
  // TODO(lth): For WriteUnPrepared that rollback is called frequently,
509
  // RemovePrepared could be moved to the callback to reduce lock contention.
510
0
  if (s.ok()) {
511
0
    wpt_db_->RemovePrepared(GetId(), prepare_batch_cnt_);
512
0
  }
513
  // Note: RemovePrepared for prepared batch is called from within
514
  // PreReleaseCallback
515
0
  wpt_db_->RemovePrepared(rollback_seq, ONE_BATCH);
516
517
0
  return s;
518
0
}
519
520
Status WritePreparedTxn::ValidateSnapshot(ColumnFamilyHandle* column_family,
521
                                          const Slice& key,
522
0
                                          SequenceNumber* tracked_at_seq) {
523
0
  assert(snapshot_);
524
525
0
  SequenceNumber min_uncommitted =
526
0
      static_cast_with_check<const SnapshotImpl>(snapshot_.get())
527
0
          ->min_uncommitted_;
528
0
  SequenceNumber snap_seq = snapshot_->GetSequenceNumber();
529
  // tracked_at_seq is either max or the last snapshot with which this key was
530
  // trackeed so there is no need to apply the IsInSnapshot to this comparison
531
  // here as tracked_at_seq is not a prepare seq.
532
0
  if (*tracked_at_seq <= snap_seq) {
533
    // If the key has been previous validated at a sequence number earlier
534
    // than the curent snapshot's sequence number, we already know it has not
535
    // been modified.
536
0
    return Status::OK();
537
0
  }
538
539
0
  *tracked_at_seq = snap_seq;
540
541
0
  ColumnFamilyHandle* cfh =
542
0
      column_family ? column_family : db_impl_->DefaultColumnFamily();
543
544
0
  WritePreparedTxnReadCallback snap_checker(wpt_db_, snap_seq, min_uncommitted,
545
0
                                            kBackedByDBSnapshot);
546
  // TODO(yanqin): support user-defined timestamp
547
0
  return TransactionUtil::CheckKeyForConflicts(
548
0
      db_impl_, cfh, key.ToString(), snap_seq, /*ts=*/nullptr,
549
0
      false /* cache_only */, &snap_checker, min_uncommitted,
550
0
      txn_db_impl_->GetTxnDBOptions().enable_udt_validation);
551
0
}
552
553
0
void WritePreparedTxn::SetSnapshot() {
554
0
  const bool kForWWConflictCheck = true;
555
0
  SnapshotImpl* snapshot = wpt_db_->GetSnapshotInternal(kForWWConflictCheck);
556
0
  SetSnapshotInternal(snapshot);
557
0
}
558
559
0
Status WritePreparedTxn::RebuildFromWriteBatch(WriteBatch* src_batch) {
560
0
  auto ret = PessimisticTransaction::RebuildFromWriteBatch(src_batch);
561
0
  prepare_batch_cnt_ = GetWriteBatch()->SubBatchCnt();
562
0
  return ret;
563
0
}
564
565
}  // namespace ROCKSDB_NAMESPACE