Coverage Report

Created: 2024-09-08 07:17

/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
Line
Count
Source (jump to first uncovered line)
1
//  Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
7
#include "utilities/transactions/write_unprepared_txn_db.h"
8
9
#include "db/arena_wrapped_db_iter.h"
10
#include "rocksdb/utilities/transaction_db.h"
11
#include "util/cast_util.h"
12
13
namespace ROCKSDB_NAMESPACE {
14
15
// Instead of reconstructing a Transaction object, and calling rollback on it,
16
// we can be more efficient with RollbackRecoveredTransaction by skipping
17
// unnecessary steps (eg. updating CommitMap, reconstructing keyset)
18
Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction(
19
0
    const DBImpl::RecoveredTransaction* rtxn) {
20
  // TODO(lth): Reduce duplicate code with WritePrepared rollback logic.
21
0
  assert(rtxn->unprepared_);
22
0
  auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap();
23
0
  auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap();
24
  // In theory we could write with disableWAL = true during recovery, and
25
  // assume that if we crash again during recovery, we can just replay from
26
  // the very beginning. Unfortunately, the XIDs from the application may not
27
  // necessarily be unique across restarts, potentially leading to situations
28
  // like this:
29
  //
30
  // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1)
31
  // -- crash and recover with Put(a) rolled back as it was not prepared
32
  // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1)
33
  // COMMIT(xid = 1)
34
  // -- crash and recover with both a, b
35
  //
36
  // We could just write the rollback marker, but then we would have to extend
37
  // MemTableInserter during recovery to actually do writes into the DB
38
  // instead of just dropping the in-memory write batch.
39
  //
40
  // TODO: plumb Env::IOActivity, Env::IOPriority
41
0
  WriteOptions w_options;
42
43
0
  class InvalidSnapshotReadCallback : public ReadCallback {
44
0
   public:
45
0
    InvalidSnapshotReadCallback(SequenceNumber snapshot)
46
0
        : ReadCallback(snapshot) {}
47
48
0
    inline bool IsVisibleFullCheck(SequenceNumber) override {
49
      // The seq provided as snapshot is the seq right before we have locked and
50
      // wrote to it, so whatever is there, it is committed.
51
0
      return true;
52
0
    }
53
54
    // Ignore the refresh request since we are confident that our snapshot seq
55
    // is not going to be affected by concurrent compactions (not enabled yet.)
56
0
    void Refresh(SequenceNumber) override {}
57
0
  };
58
59
  // Iterate starting with largest sequence number.
60
0
  for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) {
61
0
    auto last_visible_txn = it->first - 1;
62
0
    const auto& batch = it->second.batch_;
63
0
    WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */,
64
0
                              w_options.protection_bytes_per_key,
65
0
                              0 /* default_cf_ts_sz */);
66
67
0
    struct RollbackWriteBatchBuilder : public WriteBatch::Handler {
68
0
      DBImpl* db_;
69
0
      ReadOptions roptions;
70
0
      InvalidSnapshotReadCallback callback;
71
0
      WriteBatch* rollback_batch_;
72
0
      std::map<uint32_t, const Comparator*>& comparators_;
73
0
      std::map<uint32_t, ColumnFamilyHandle*>& handles_;
74
0
      using CFKeys = std::set<Slice, SetComparator>;
75
0
      std::map<uint32_t, CFKeys> keys_;
76
0
      bool rollback_merge_operands_;
77
0
      RollbackWriteBatchBuilder(
78
0
          DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch,
79
0
          std::map<uint32_t, const Comparator*>& comparators,
80
0
          std::map<uint32_t, ColumnFamilyHandle*>& handles,
81
0
          bool rollback_merge_operands)
82
0
          : db_(db),
83
0
            callback(snap_seq),
84
            // disable min_uncommitted optimization
85
0
            rollback_batch_(dst_batch),
86
0
            comparators_(comparators),
87
0
            handles_(handles),
88
0
            rollback_merge_operands_(rollback_merge_operands) {}
89
90
0
      Status Rollback(uint32_t cf, const Slice& key) {
91
0
        Status s;
92
0
        CFKeys& cf_keys = keys_[cf];
93
0
        if (cf_keys.size() == 0) {  // just inserted
94
0
          auto cmp = comparators_[cf];
95
0
          keys_[cf] = CFKeys(SetComparator(cmp));
96
0
        }
97
0
        auto res = cf_keys.insert(key);
98
0
        if (res.second ==
99
0
            false) {  // second is false if a element already existed.
100
0
          return s;
101
0
        }
102
103
0
        PinnableSlice pinnable_val;
104
0
        bool not_used;
105
0
        auto cf_handle = handles_[cf];
106
0
        DBImpl::GetImplOptions get_impl_options;
107
0
        get_impl_options.column_family = cf_handle;
108
0
        get_impl_options.value = &pinnable_val;
109
0
        get_impl_options.value_found = &not_used;
110
0
        get_impl_options.callback = &callback;
111
0
        s = db_->GetImpl(roptions, key, get_impl_options);
112
0
        assert(s.ok() || s.IsNotFound());
113
0
        if (s.ok()) {
114
0
          s = rollback_batch_->Put(cf_handle, key, pinnable_val);
115
0
          assert(s.ok());
116
0
        } else if (s.IsNotFound()) {
117
          // There has been no readable value before txn. By adding a delete we
118
          // make sure that there will be none afterwards either.
119
0
          s = rollback_batch_->Delete(cf_handle, key);
120
0
          assert(s.ok());
121
0
        } else {
122
          // Unexpected status. Return it to the user.
123
0
        }
124
0
        return s;
125
0
      }
126
127
0
      Status PutCF(uint32_t cf, const Slice& key,
128
0
                   const Slice& /*val*/) override {
129
0
        return Rollback(cf, key);
130
0
      }
131
132
0
      Status DeleteCF(uint32_t cf, const Slice& key) override {
133
0
        return Rollback(cf, key);
134
0
      }
135
136
0
      Status SingleDeleteCF(uint32_t cf, const Slice& key) override {
137
0
        return Rollback(cf, key);
138
0
      }
139
140
0
      Status MergeCF(uint32_t cf, const Slice& key,
141
0
                     const Slice& /*val*/) override {
142
0
        if (rollback_merge_operands_) {
143
0
          return Rollback(cf, key);
144
0
        } else {
145
0
          return Status::OK();
146
0
        }
147
0
      }
148
149
      // Recovered batches do not contain 2PC markers.
150
0
      Status MarkNoop(bool) override { return Status::InvalidArgument(); }
151
0
      Status MarkBeginPrepare(bool) override {
152
0
        return Status::InvalidArgument();
153
0
      }
154
0
      Status MarkEndPrepare(const Slice&) override {
155
0
        return Status::InvalidArgument();
156
0
      }
157
0
      Status MarkCommit(const Slice&) override {
158
0
        return Status::InvalidArgument();
159
0
      }
160
0
      Status MarkRollback(const Slice&) override {
161
0
        return Status::InvalidArgument();
162
0
      }
163
0
    } rollback_handler(db_impl_, last_visible_txn, &rollback_batch,
164
0
                       *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(),
165
0
                       txn_db_options_.rollback_merge_operands);
166
167
0
    auto s = batch->Iterate(&rollback_handler);
168
0
    if (!s.ok()) {
169
0
      return s;
170
0
    }
171
172
    // The Rollback marker will be used as a batch separator
173
0
    s = WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_);
174
0
    if (!s.ok()) {
175
0
      return s;
176
0
    }
177
178
0
    const uint64_t kNoLogRef = 0;
179
0
    const bool kDisableMemtable = true;
180
0
    const size_t kOneBatch = 1;
181
0
    uint64_t seq_used = kMaxSequenceNumber;
182
0
    s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr,
183
0
                            nullptr, kNoLogRef, !kDisableMemtable, &seq_used,
184
0
                            kOneBatch);
185
0
    if (!s.ok()) {
186
0
      return s;
187
0
    }
188
189
    // If two_write_queues, we must manually release the sequence number to
190
    // readers.
191
0
    if (db_impl_->immutable_db_options().two_write_queues) {
192
0
      db_impl_->SetLastPublishedSequence(seq_used);
193
0
    }
194
0
  }
195
196
0
  return Status::OK();
197
0
}
198
199
Status WriteUnpreparedTxnDB::Initialize(
200
    const std::vector<size_t>& compaction_enabled_cf_indices,
201
0
    const std::vector<ColumnFamilyHandle*>& handles) {
202
  // TODO(lth): Reduce code duplication in this function.
203
0
  auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB());
204
0
  assert(dbimpl != nullptr);
205
206
0
  db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this));
207
  // A callback to commit a single sub-batch
208
0
  class CommitSubBatchPreReleaseCallback : public PreReleaseCallback {
209
0
   public:
210
0
    explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db)
211
0
        : db_(db) {}
212
0
    Status Callback(SequenceNumber commit_seq,
213
0
                    bool is_mem_disabled __attribute__((__unused__)), uint64_t,
214
0
                    size_t /*index*/, size_t /*total*/) override {
215
0
      assert(!is_mem_disabled);
216
0
      db_->AddCommitted(commit_seq, commit_seq);
217
0
      return Status::OK();
218
0
    }
219
220
0
   private:
221
0
    WritePreparedTxnDB* db_;
222
0
  };
223
0
  db_impl_->SetRecoverableStatePreReleaseCallback(
224
0
      new CommitSubBatchPreReleaseCallback(this));
225
226
  // PessimisticTransactionDB::Initialize
227
0
  for (auto cf_ptr : handles) {
228
0
    AddColumnFamily(cf_ptr);
229
0
  }
230
  // Verify cf options
231
0
  for (auto handle : handles) {
232
0
    ColumnFamilyDescriptor cfd;
233
0
    Status s = handle->GetDescriptor(&cfd);
234
0
    if (!s.ok()) {
235
0
      return s;
236
0
    }
237
0
    s = VerifyCFOptions(cfd.options);
238
0
    if (!s.ok()) {
239
0
      return s;
240
0
    }
241
0
  }
242
243
  // Re-enable compaction for the column families that initially had
244
  // compaction enabled.
245
0
  std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles;
246
0
  compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size());
247
0
  for (auto index : compaction_enabled_cf_indices) {
248
0
    compaction_enabled_cf_handles.push_back(handles[index]);
249
0
  }
250
251
  // create 'real' transactions from recovered shell transactions
252
0
  auto rtxns = dbimpl->recovered_transactions();
253
0
  std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt;
254
0
  for (const auto& rtxn : rtxns) {
255
0
    auto recovered_trx = rtxn.second;
256
0
    assert(recovered_trx);
257
0
    assert(recovered_trx->batches_.size() >= 1);
258
0
    assert(recovered_trx->name_.length());
259
260
    // We can only rollback transactions after AdvanceMaxEvictedSeq is called,
261
    // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why
262
    // two iterations is required.
263
0
    if (recovered_trx->unprepared_) {
264
0
      continue;
265
0
    }
266
267
    // TODO: plumb Env::IOActivity, Env::IOPriority
268
0
    WriteOptions w_options;
269
0
    w_options.sync = true;
270
0
    TransactionOptions t_options;
271
272
0
    auto first_log_number = recovered_trx->batches_.begin()->second.log_number_;
273
0
    auto first_seq = recovered_trx->batches_.begin()->first;
274
0
    auto last_prepare_batch_cnt =
275
0
        recovered_trx->batches_.begin()->second.batch_cnt_;
276
277
0
    Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr);
278
0
    assert(real_trx);
279
0
    auto wupt = static_cast_with_check<WriteUnpreparedTxn>(real_trx);
280
0
    wupt->recovered_txn_ = true;
281
282
0
    real_trx->SetLogNumber(first_log_number);
283
0
    real_trx->SetId(first_seq);
284
0
    Status s = real_trx->SetName(recovered_trx->name_);
285
0
    if (!s.ok()) {
286
0
      return s;
287
0
    }
288
0
    wupt->prepare_batch_cnt_ = last_prepare_batch_cnt;
289
290
0
    for (auto batch : recovered_trx->batches_) {
291
0
      const auto& seq = batch.first;
292
0
      const auto& batch_info = batch.second;
293
0
      auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1;
294
0
      assert(batch_info.log_number_);
295
296
0
      ordered_seq_cnt[seq] = cnt;
297
0
      assert(wupt->unprep_seqs_.count(seq) == 0);
298
0
      wupt->unprep_seqs_[seq] = cnt;
299
300
0
      s = wupt->RebuildFromWriteBatch(batch_info.batch_);
301
0
      assert(s.ok());
302
0
      if (!s.ok()) {
303
0
        return s;
304
0
      }
305
0
    }
306
307
0
    const bool kClear = true;
308
0
    wupt->InitWriteBatch(kClear);
309
310
0
    real_trx->SetState(Transaction::PREPARED);
311
0
    if (!s.ok()) {
312
0
      return s;
313
0
    }
314
0
  }
315
  // AddPrepared must be called in order
316
0
  for (auto seq_cnt : ordered_seq_cnt) {
317
0
    auto seq = seq_cnt.first;
318
0
    auto cnt = seq_cnt.second;
319
0
    for (size_t i = 0; i < cnt; i++) {
320
0
      AddPrepared(seq + i);
321
0
    }
322
0
  }
323
324
0
  SequenceNumber prev_max = max_evicted_seq_;
325
0
  SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber();
326
0
  AdvanceMaxEvictedSeq(prev_max, last_seq);
327
  // Create a gap between max and the next snapshot. This simplifies the logic
328
  // in IsInSnapshot by not having to consider the special case of max ==
329
  // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest.
330
0
  if (last_seq) {
331
0
    db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1);
332
0
    db_impl_->versions_->SetLastSequence(last_seq + 1);
333
0
    db_impl_->versions_->SetLastPublishedSequence(last_seq + 1);
334
0
  }
335
336
0
  Status s;
337
  // Rollback unprepared transactions.
338
0
  for (const auto& rtxn : rtxns) {
339
0
    auto recovered_trx = rtxn.second;
340
0
    if (recovered_trx->unprepared_) {
341
0
      s = RollbackRecoveredTransaction(recovered_trx);
342
0
      if (!s.ok()) {
343
0
        return s;
344
0
      }
345
0
      continue;
346
0
    }
347
0
  }
348
349
0
  if (s.ok()) {
350
0
    dbimpl->DeleteAllRecoveredTransactions();
351
352
    // Compaction should start only after max_evicted_seq_ is set AND recovered
353
    // transactions are either added to PrepareHeap or rolled back.
354
0
    s = EnableAutoCompaction(compaction_enabled_cf_handles);
355
0
  }
356
357
0
  return s;
358
0
}
359
360
Transaction* WriteUnpreparedTxnDB::BeginTransaction(
361
    const WriteOptions& write_options, const TransactionOptions& txn_options,
362
0
    Transaction* old_txn) {
363
0
  if (old_txn != nullptr) {
364
0
    ReinitializeTransaction(old_txn, write_options, txn_options);
365
0
    return old_txn;
366
0
  } else {
367
0
    return new WriteUnpreparedTxn(this, write_options, txn_options);
368
0
  }
369
0
}
370
371
// Struct to hold ownership of snapshot and read callback for iterator cleanup.
372
struct WriteUnpreparedTxnDB::IteratorState {
373
  IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence,
374
                std::shared_ptr<ManagedSnapshot> s,
375
                SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn)
376
      : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_,
377
                 kBackedByDBSnapshot),
378
0
        snapshot(s) {}
379
0
  SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); }
380
381
  WriteUnpreparedTxnReadCallback callback;
382
  std::shared_ptr<ManagedSnapshot> snapshot;
383
};
384
385
namespace {
386
0
static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) {
387
0
  delete static_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1);
388
0
}
389
}  // anonymous namespace
390
391
Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options,
392
                                            ColumnFamilyHandle* column_family,
393
0
                                            WriteUnpreparedTxn* txn) {
394
0
  if (_read_options.io_activity != Env::IOActivity::kUnknown &&
395
0
      _read_options.io_activity != Env::IOActivity::kDBIterator) {
396
0
    return NewErrorIterator(Status::InvalidArgument(
397
0
        "Can only call NewIterator with `ReadOptions::io_activity` is "
398
0
        "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`"));
399
0
  }
400
401
0
  ReadOptions read_options(_read_options);
402
0
  if (read_options.io_activity == Env::IOActivity::kUnknown) {
403
0
    read_options.io_activity = Env::IOActivity::kDBIterator;
404
0
  }
405
  // TODO(lth): Refactor so that this logic is shared with WritePrepared.
406
0
  constexpr bool expose_blob_index = false;
407
0
  constexpr bool allow_refresh = false;
408
0
  std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr;
409
0
  SequenceNumber snapshot_seq = kMaxSequenceNumber;
410
0
  SequenceNumber min_uncommitted = 0;
411
412
  // Currently, the Prev() iterator logic does not work well without snapshot
413
  // validation. The logic simply iterates through values of a key in
414
  // ascending seqno order, stopping at the first non-visible value and
415
  // returning the last visible value.
416
  //
417
  // For example, if snapshot sequence is 3, and we have the following keys:
418
  // foo: v1 1
419
  // foo: v2 2
420
  // foo: v3 3
421
  // foo: v4 4
422
  // foo: v5 5
423
  //
424
  // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3,
425
  // which is the last visible value.
426
  //
427
  // For unprepared transactions, if we have snap_seq = 3, but the current
428
  // transaction has unprep_seq 5, then returning the first non-visible value
429
  // would be incorrect, as we should return v5, and not v3. The problem is that
430
  // there are committed values at snapshot_seq < commit_seq < unprep_seq.
431
  //
432
  // Snapshot validation can prevent this problem by ensuring that no committed
433
  // values exist at snapshot_seq < commit_seq, and thus any value with a
434
  // sequence number greater than snapshot_seq must be unprepared values. For
435
  // example, if the transaction had a snapshot at 3, then snapshot validation
436
  // would be performed during the Put(v5) call. It would find v4, and the Put
437
  // would fail with snapshot validation failure.
438
  //
439
  // TODO(lth): Improve Prev() logic to continue iterating until
440
  // max_visible_seq, and then return the last visible value, so that this
441
  // restriction can be lifted.
442
0
  const Snapshot* snapshot = nullptr;
443
0
  if (read_options.snapshot == nullptr) {
444
0
    snapshot = GetSnapshot();
445
0
    own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot);
446
0
  } else {
447
0
    snapshot = read_options.snapshot;
448
0
  }
449
450
0
  snapshot_seq = snapshot->GetSequenceNumber();
451
0
  assert(snapshot_seq != kMaxSequenceNumber);
452
  // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are
453
  // guaranteed that for keys that were modified by this transaction (and thus
454
  // might have unprepared values), no committed values exist at
455
  // largest_validated_seq < commit_seq (or the contrapositive: any committed
456
  // value must exist at commit_seq <= largest_validated_seq). This implies
457
  // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <=
458
  // snapshot_seq. As explained above, the problem with Prev() only happens when
459
  // snapshot_seq < commit_seq.
460
  //
461
  // For keys that were not modified by this transaction, largest_validated_seq_
462
  // is meaningless, and Prev() should just work with the existing visibility
463
  // logic.
464
0
  if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() &&
465
0
      !txn->unprep_seqs_.empty()) {
466
0
    ROCKS_LOG_ERROR(info_log_,
467
0
                    "WriteUnprepared iterator creation failed since the "
468
0
                    "transaction has performed unvalidated writes");
469
0
    return nullptr;
470
0
  }
471
0
  min_uncommitted =
472
0
      static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_;
473
474
0
  auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family);
475
0
  auto* cfd = cfh->cfd();
476
0
  auto* state =
477
0
      new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn);
478
0
  SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_);
479
0
  auto* db_iter = db_impl_->NewIteratorImpl(
480
0
      read_options, cfh, super_version, state->MaxVisibleSeq(),
481
0
      &state->callback, expose_blob_index, allow_refresh);
482
0
  db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr);
483
0
  return db_iter;
484
0
}
485
486
}  // namespace ROCKSDB_NAMESPACE