Coverage Report

Created: 2026-02-14 06:58

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
Line
Count
Source
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#include "utilities/transactions/write_unprepared_txn_db.h"
7
8
#include "db/arena_wrapped_db_iter.h"
9
#include "rocksdb/utilities/transaction_db.h"
10
#include "util/cast_util.h"
11
12
namespace ROCKSDB_NAMESPACE {
13
14
// Instead of reconstructing a Transaction object, and calling rollback on it,
15
// we can be more efficient with RollbackRecoveredTransaction by skipping
16
// unnecessary steps (eg. updating CommitMap, reconstructing keyset)
17
Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
18
0
    const DBImpl::RecoveredTransaction* rtxn) {
19
  // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
20
0
  assert(rtxn->unprepared_);
21
0
  auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
22
0
  auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
23
  // In theory we could write with disableWAL = true during recovery, and
24
  // assume that if we crash again during recovery, we can just replay from
25
  // the very beginning. Unfortunately, the XIDs from the application may not
26
  // necessarily be unique across restarts, potentially leading to situations
27
  // like this:
28
  //
29
  // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
30
  // -- crash and recover with Put(a) rolled back as it was not prepared
31
  // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
32
  // COMMIT(xid = 1)
33
  // -- crash and recover with both a, b
34
  //
35
  // We could just write the rollback marker, but then we would have to extend
36
  // MemTableInserter during recovery to actually do writes into the DB
37
  // instead of just dropping the in-memory write batch.
38
  //
39
  // TODO: plumb Env::IOActivity, Env::IOPriority
40
0
  WriteOptions w_options;
41
42
0
  class InvalidSnapshotReadCallback : public ReadCallback {
43
0
   public:
44
0
    InvalidSnapshotReadCallback(SequenceNumber snapshot)
45
0
        : ReadCallback(snapshot) {}
46
47
0
    inline bool IsVisibleFullCheck(SequenceNumber) override {
48
      // The seq provided as snapshot is the seq right before we have locked and
49
      // wrote to it, so whatever is there, it is committed.
50
0
      return true;
51
0
    }
52
53
    // Ignore the refresh request since we are confident that our snapshot seq
54
    // is not going to be affected by concurrent compactions (not enabled yet.)
55
0
    void Refresh(SequenceNumber) override {}
56
0
  };
57
58
  // Iterate starting with largest sequence number.
59
0
  for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
60
0
    auto last_visible_txn = it->first - 1;
61
0
    const auto& batch = it->second.batch_;
62
0
    WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
63
0
                              w_options.protection_bytes_per_key,
64
0
                              0 /* default_cf_ts_sz */);
65
66
0
    struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
67
0
      DBImpl* db_;
68
0
      ReadOptions roptions;
69
0
      InvalidSnapshotReadCallback callback;
70
0
      WriteBatch* rollback_batch_;
71
0
      std::map<uint32_t, const Comparator*>& comparators_;
72
0
      std::map<uint32_t, ColumnFamilyHandle*>& handles_;
73
0
      using CFKeys = std::set<Slice, SetComparator>;
74
0
      std::map<uint32_t, CFKeys> keys_;
75
0
      bool rollback_merge_operands_;
76
0
      RollbackWriteBatchBuilder(
77
0
          DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
78
0
          std::map<uint32_t, const Comparator*>& comparators,
79
0
          std::map<uint32_t, ColumnFamilyHandle*>& handles,
80
0
          bool rollback_merge_operands)
81
0
          : db_(db),
82
0
            callback(snap_seq),
83
            // disable min_uncommitted optimization
84
0
            rollback_batch_(dst_batch),
85
0
            comparators_(comparators),
86
0
            handles_(handles),
87
0
            rollback_merge_operands_(rollback_merge_operands) {}
88
89
0
      Status Rollback(uint32_t cf, const Slice& key) {
90
0
        Status s;
91
0
        CFKeys& cf_keys = keys_[cf];
92
0
        if (cf_keys.size() == 0) {  // just inserted
93
0
          auto cmp = comparators_[cf];
94
0
          keys_[cf] = CFKeys(SetComparator(cmp));
95
0
        }
96
0
        auto res = cf_keys.insert(key);
97
0
        if (res.second ==
98
0
            false) {  // second is false if a element already existed.
99
0
          return s;
100
0
        }
101
102
0
        PinnableSlice pinnable_val;
103
0
        bool not_used;
104
0
        auto cf_handle = handles_[cf];
105
0
        DBImpl::GetImplOptions get_impl_options;
106
0
        get_impl_options.column_family = cf_handle;
107
0
        get_impl_options.value = &pinnable_val;
108
0
        get_impl_options.value_found = &not_used;
109
0
        get_impl_options.callback = &callback;
110
0
        s = db_->GetImpl(roptions, key, get_impl_options);
111
0
        assert(s.ok() || s.IsNotFound());
112
0
        if (s.ok()) {
113
0
          s = rollback_batch_->Put(cf_handle, key, pinnable_val);
114
0
          assert(s.ok());
115
0
        } else if (s.IsNotFound()) {
116
          // There has been no readable value before txn. By adding a delete we
117
          // make sure that there will be none afterwards either.
118
0
          s = rollback_batch_->Delete(cf_handle, key);
119
0
          assert(s.ok());
120
0
        } else {
121
          // Unexpected status. Return it to the user.
122
0
        }
123
0
        return s;
124
0
      }
125
126
0
      Status PutCF(uint32_t cf, const Slice& key,
127
0
                   const Slice& /*val*/) override {
128
0
        return Rollback(cf, key);
129
0
      }
130
131
0
      Status DeleteCF(uint32_t cf, const Slice& key) override {
132
0
        return Rollback(cf, key);
133
0
      }
134
135
0
      Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
136
0
        return Rollback(cf, key);
137
0
      }
138
139
0
      Status MergeCF(uint32_t cf, const Slice& key,
140
0
                     const Slice& /*val*/) override {
141
0
        if (rollback_merge_operands_) {
142
0
          return Rollback(cf, key);
143
0
        } else {
144
0
          return Status::OK();
145
0
        }
146
0
      }
147
148
      // Recovered batches do not contain 2PC markers.
149
0
      Status MarkNoop(bool) override { return Status::InvalidArgument(); }
150
0
      Status MarkBeginPrepare(bool) override {
151
0
        return Status::InvalidArgument();
152
0
      }
153
0
      Status MarkEndPrepare(const Slice&) override {
154
0
        return Status::InvalidArgument();
155
0
      }
156
0
      Status MarkCommit(const Slice&) override {
157
0
        return Status::InvalidArgument();
158
0
      }
159
0
      Status MarkRollback(const Slice&) override {
160
0
        return Status::InvalidArgument();
161
0
      }
162
0
    } rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
163
0
                       *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
164
0
                       txn_db_options_.rollback_merge_operands);
165
166
0
    auto s = batch->Iterate(&rollback_handler);
167
0
    if (!s.ok()) {
168
0
      return s;
169
0
    }
170
171
    // The Rollback marker will be used as a batch separator
172
0
    s = WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
173
0
    if (!s.ok()) {
174
0
      return s;
175
0
    }
176
177
0
    const uint64_t kNoLogRef = 0;
178
0
    const bool kDisableMemtable = true;
179
0
    const size_t kOneBatch = 1;
180
0
    uint64_t seq_used = kMaxSequenceNumber;
181
0
    s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
182
0
                            nullptr, kNoLogRef, !kDisableMemtable, &seq_used,
183
0
                            kOneBatch);
184
0
    if (!s.ok()) {
185
0
      return s;
186
0
    }
187
188
    // If two_write_queues, we must manually release the sequence number to
189
    // readers.
190
0
    if (db_impl_->immutable_db_options().two_write_queues) {
191
0
      db_impl_->SetLastPublishedSequence(seq_used);
192
0
    }
193
0
  }
194
195
0
  return Status::OK();
196
0
}
197
198
Status WriteUnpreparedTxnDB::Initialize(
199
    const std::vector<size_t>& compaction_enabled_cf_indices,
200
0
    const std::vector<ColumnFamilyHandle*>& handles) {
201
  // TODO(lth): Reduce code duplication in this function.
202
0
  auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
203
0
  assert(dbimpl != nullptr);
204
205
0
  db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
206
  // A callback to commit a single sub-batch
207
0
  class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
208
0
   public:
209
0
    explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
210
0
        : db_(db) {}
211
0
    Status Callback(SequenceNumber commit_seq,
212
0
                    bool is_mem_disabled __attribute__((__unused__)), uint64_t,
213
0
                    size_t /*index*/, size_t /*total*/) override {
214
0
      assert(!is_mem_disabled);
215
0
      db_->AddCommitted(commit_seq, commit_seq);
216
0
      return Status::OK();
217
0
    }
218
219
0
   private:
220
0
    WritePreparedTxnDB* db_;
221
0
  };
222
0
  db_impl_->SetRecoverableStatePreReleaseCallback(
223
0
      new CommitSubBatchPreReleaseCallback(this));
224
225
  // PessimisticTransactionDB::Initialize
226
0
  for (auto cf_ptr : handles) {
227
0
    AddColumnFamily(cf_ptr);
228
0
  }
229
  // Verify cf options
230
0
  for (auto handle : handles) {
231
0
    ColumnFamilyDescriptor cfd;
232
0
    Status s = handle->GetDescriptor(&cfd);
233
0
    if (!s.ok()) {
234
0
      return s;
235
0
    }
236
0
    s = VerifyCFOptions(cfd.options);
237
0
    if (!s.ok()) {
238
0
      return s;
239
0
    }
240
0
  }
241
242
  // Re-enable compaction for the column families that initially had
243
  // compaction enabled.
244
0
  std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
245
0
  compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
246
0
  for (auto index : compaction_enabled_cf_indices) {
247
0
    compaction_enabled_cf_handles.push_back(handles[index]);
248
0
  }
249
250
  // create 'real' transactions from recovered shell transactions
251
0
  auto rtxns = dbimpl->recovered_transactions();
252
0
  std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
253
0
  for (const auto& rtxn : rtxns) {
254
0
    auto recovered_trx = rtxn.second;
255
0
    assert(recovered_trx);
256
0
    assert(recovered_trx->batches_.size() >= 1);
257
0
    assert(recovered_trx->name_.length());
258
259
    // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
260
    // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
261
    // two iterations is required.
262
0
    if (recovered_trx->unprepared_) {
263
0
      continue;
264
0
    }
265
266
    // TODO: plumb Env::IOActivity, Env::IOPriority
267
0
    WriteOptions w_options;
268
0
    w_options.sync = true;
269
0
    TransactionOptions t_options;
270
271
0
    auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
272
0
    auto first_seq = recovered_trx->batches_.begin()->first;
273
0
    auto last_prepare_batch_cnt =
274
0
        recovered_trx->batches_.begin()->second.batch_cnt_;
275
276
0
    Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
277
0
    assert(real_trx);
278
0
    auto wupt = static_cast_with_check<WriteUnpreparedTxn>(real_trx);
279
0
    wupt->recovered_txn_ = true;
280
281
0
    real_trx->SetLogNumber(first_log_number);
282
0
    real_trx->SetId(first_seq);
283
0
    Status s = real_trx->SetName(recovered_trx->name_);
284
0
    if (!s.ok()) {
285
0
      return s;
286
0
    }
287
0
    wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
288
289
0
    for (auto batch : recovered_trx->batches_) {
290
0
      const auto& seq = batch.first;
291
0
      const auto& batch_info = batch.second;
292
0
      auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
293
0
      assert(batch_info.log_number_);
294
295
0
      ordered_seq_cnt[seq] = cnt;
296
0
      assert(wupt->unprep_seqs_.count(seq) == 0);
297
0
      wupt->unprep_seqs_[seq] = cnt;
298
299
0
      s = wupt->RebuildFromWriteBatch(batch_info.batch_);
300
0
      assert(s.ok());
301
0
      if (!s.ok()) {
302
0
        return s;
303
0
      }
304
0
    }
305
306
0
    const bool kClear = true;
307
0
    wupt->InitWriteBatch(kClear);
308
309
0
    real_trx->SetState(Transaction::PREPARED);
310
0
    if (!s.ok()) {
311
0
      return s;
312
0
    }
313
0
  }
314
  // AddPrepared must be called in order
315
0
  for (auto seq_cnt : ordered_seq_cnt) {
316
0
    auto seq = seq_cnt.first;
317
0
    auto cnt = seq_cnt.second;
318
0
    for (size_t i = 0; i < cnt; i++) {
319
0
      AddPrepared(seq + i);
320
0
    }
321
0
  }
322
323
0
  SequenceNumber prev_max = max_evicted_seq_;
324
0
  SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
325
0
  AdvanceMaxEvictedSeq(prev_max, last_seq);
326
  // Create a gap between max and the next snapshot. This simplifies the logic
327
  // in IsInSnapshot by not having to consider the special case of max ==
328
  // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
329
0
  if (last_seq) {
330
0
    db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
331
0
    db_impl_->versions_->SetLastSequence(last_seq + 1);
332
0
    db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
333
0
  }
334
335
0
  Status s;
336
  // Rollback unprepared transactions.
337
0
  for (const auto& rtxn : rtxns) {
338
0
    auto recovered_trx = rtxn.second;
339
0
    if (recovered_trx->unprepared_) {
340
0
      s = RollbackRecoveredTransaction(recovered_trx);
341
0
      if (!s.ok()) {
342
0
        return s;
343
0
      }
344
0
      continue;
345
0
    }
346
0
  }
347
348
0
  if (s.ok()) {
349
0
    dbimpl->DeleteAllRecoveredTransactions();
350
351
    // Compaction should start only after max_evicted_seq_ is set AND recovered
352
    // transactions are either added to PrepareHeap or rolled back.
353
0
    s = EnableAutoCompaction(compaction_enabled_cf_handles);
354
0
  }
355
356
0
  return s;
357
0
}
358
359
Transaction* WriteUnpreparedTxnDB::BeginTransaction(
360
    const WriteOptions& write_options, const TransactionOptions& txn_options,
361
0
    Transaction* old_txn) {
362
0
  if (old_txn != nullptr) {
363
0
    ReinitializeTransaction(old_txn, write_options, txn_options);
364
0
    return old_txn;
365
0
  } else {
366
0
    return new WriteUnpreparedTxn(this, write_options, txn_options);
367
0
  }
368
0
}
369
370
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
371
struct WriteUnpreparedTxnDB::IteratorState {
372
  IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
373
                std::shared_ptr<ManagedSnapshot> s,
374
                SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
375
0
      : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
376
0
                 kBackedByDBSnapshot),
377
0
        snapshot(s) {}
378
0
  SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
379
380
  WriteUnpreparedTxnReadCallback callback;
381
  std::shared_ptr<ManagedSnapshot> snapshot;
382
};
383
384
namespace {
385
0
static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
386
0
  delete static_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
387
0
}
388
}  // anonymous namespace
389
390
Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options,
391
                                            ColumnFamilyHandle* column_family,
392
0
                                            WriteUnpreparedTxn* txn) {
393
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
394
0
      _read_options.io_activity != Env::IOActivity::kDBIterator) {
395
0
    return NewErrorIterator(Status::InvalidArgument(
396
0
        "Can only call NewIterator with `ReadOptions::io_activity` is "
397
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
398
0
  }
399
400
0
  ReadOptions read_options(_read_options);
401
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
402
0
    read_options.io_activity = Env::IOActivity::kDBIterator;
403
0
  }
404
  // TODO(lth): Refactor so that this logic is shared with WritePrepared.
405
0
  constexpr bool expose_blob_index = false;
406
0
  constexpr bool allow_refresh = false;
407
0
  std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
408
0
  SequenceNumber snapshot_seq = kMaxSequenceNumber;
409
0
  SequenceNumber min_uncommitted = 0;
410
411
  // Currently, the Prev() iterator logic does not work well without snapshot
412
  // validation. The logic simply iterates through values of a key in
413
  // ascending seqno order, stopping at the first non-visible value and
414
  // returning the last visible value.
415
  //
416
  // For example, if snapshot sequence is 3, and we have the following keys:
417
  // foo: v1 1
418
  // foo: v2 2
419
  // foo: v3 3
420
  // foo: v4 4
421
  // foo: v5 5
422
  //
423
  // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
424
  // which is the last visible value.
425
  //
426
  // For unprepared transactions, if we have snap_seq = 3, but the current
427
  // transaction has unprep_seq 5, then returning the first non-visible value
428
  // would be incorrect, as we should return v5, and not v3. The problem is that
429
  // there are committed values at snapshot_seq < commit_seq < unprep_seq.
430
  //
431
  // Snapshot validation can prevent this problem by ensuring that no committed
432
  // values exist at snapshot_seq < commit_seq, and thus any value with a
433
  // sequence number greater than snapshot_seq must be unprepared values. For
434
  // example, if the transaction had a snapshot at 3, then snapshot validation
435
  // would be performed during the Put(v5) call. It would find v4, and the Put
436
  // would fail with snapshot validation failure.
437
  //
438
  // TODO(lth): Improve Prev() logic to continue iterating until
439
  // max_visible_seq, and then return the last visible value, so that this
440
  // restriction can be lifted.
441
0
  const Snapshot* snapshot = nullptr;
442
0
  if (read_options.snapshot == nullptr) {
443
0
    snapshot = GetSnapshot();
444
0
    own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
445
0
  } else {
446
0
    snapshot = read_options.snapshot;
447
0
  }
448
449
0
  snapshot_seq = snapshot->GetSequenceNumber();
450
0
  assert(snapshot_seq != kMaxSequenceNumber);
451
  // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
452
  // guaranteed that for keys that were modified by this transaction (and thus
453
  // might have unprepared values), no committed values exist at
454
  // largest_validated_seq < commit_seq (or the contrapositive: any committed
455
  // value must exist at commit_seq <= largest_validated_seq). This implies
456
  // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
457
  // snapshot_seq. As explained above, the problem with Prev() only happens when
458
  // snapshot_seq < commit_seq.
459
  //
460
  // For keys that were not modified by this transaction, largest_validated_seq_
461
  // is meaningless, and Prev() should just work with the existing visibility
462
  // logic.
463
0
  if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
464
0
      !txn->unprep_seqs_.empty()) {
465
0
    ROCKS_LOG_ERROR(info_log_,
466
0
                    "WriteUnprepared iterator creation failed since the "
467
0
                    "transaction has performed unvalidated writes");
468
0
    return nullptr;
469
0
  }
470
0
  min_uncommitted =
471
0
      static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
472
473
0
  auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
474
0
  auto* cfd = cfh->cfd();
475
0
  auto* state =
476
0
      new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
477
0
  SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
478
0
  auto* db_iter = db_impl_->NewIteratorImpl(
479
0
      read_options, cfh, super_version, state->MaxVisibleSeq(),
480
0
      &state->callback, expose_blob_index, allow_refresh);
481
0
  db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
482
0
  return db_iter;
483
0
}
484
485
}  // namespace ROCKSDB_NAMESPACE