Coverage Report

Created: 2026-04-10 07:52

next uncovered line (L), next uncovered region (R), next uncovered branch (B)
/src/rocksdb/utilities/transactions/write_unprepared_txn.h
Line
Count
Source
1
// Copyright (c) 2011-present, Facebook, Inc.  All rights reserved.
2
//  This source code is licensed under both the GPLv2 (found in the
3
//  COPYING file in the root directory) and Apache 2.0 License
4
//  (found in the LICENSE.Apache file in the root directory).
5
6
#pragma once
7
8
#include <set>
9
10
#include "utilities/transactions/write_prepared_txn.h"
11
#include "utilities/transactions/write_unprepared_txn_db.h"
12
13
namespace ROCKSDB_NAMESPACE {
14
15
class WriteUnpreparedTxnDB;
16
class WriteUnpreparedTxn;
17
18
// WriteUnprepared transactions needs to be able to read their own uncommitted
19
// writes, and supporting this requires some careful consideration. Because
20
// writes in the current transaction may be flushed to DB already, we cannot
21
// rely on the contents of WriteBatchWithIndex to determine whether a key should
22
// be visible or not, so we have to remember to check the DB for any uncommitted
23
// keys that should be visible to us. First, we will need to change the seek to
24
// snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq).
25
// Any key greater than max_visible_seq should not be visible because they
26
// cannot be unprepared by the current transaction and they are not in its
27
// snapshot.
28
//
29
// When we seek to max_visible_seq, one of these cases will happen:
30
// 1. We hit a unprepared key from the current transaction.
31
// 2. We hit a unprepared key from the another transaction.
32
// 3. We hit a committed key with snap_seq < seq < max_unprep_seq.
33
// 4. We hit a committed key with seq <= snap_seq.
34
//
35
// IsVisibleFullCheck handles all cases correctly.
36
//
37
// Other notes:
38
// Note that max_visible_seq is only calculated once at iterator construction
39
// time, meaning if the same transaction is adding more unprep seqs through
40
// writes during iteration, these newer writes may not be visible. This is not a
41
// problem for MySQL though because it avoids modifying the index as it is
42
// scanning through it to avoid the Halloween Problem. Instead, it scans the
43
// index once up front, and modifies based on a temporary copy.
44
//
45
// In DBIter, there is a "reseek" optimization if the iterator skips over too
46
// many keys. However, this assumes that the reseek seeks exactly to the
47
// required key. In write unprepared, even after seeking directly to
48
// max_visible_seq, some iteration may be required before hitting a visible key,
49
// and special precautions must be taken to avoid performing another reseek,
50
// leading to an infinite loop.
51
//
52
class WriteUnpreparedTxnReadCallback : public ReadCallback {
53
 public:
54
  WriteUnpreparedTxnReadCallback(
55
      WritePreparedTxnDB* db, SequenceNumber snapshot,
56
      SequenceNumber min_uncommitted,
57
      const std::map<SequenceNumber, size_t>& unprep_seqs,
58
      SnapshotBackup backed_by_snapshot)
59
      // Pass our last uncommitted seq as the snapshot to the parent class to
60
      // ensure that the parent will not prematurely filter out own writes. We
61
      // will do the exact comparison against snapshots in IsVisibleFullCheck
62
      // override.
63
0
      : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted),
64
0
        db_(db),
65
0
        unprep_seqs_(unprep_seqs),
66
0
        wup_snapshot_(snapshot),
67
0
        backed_by_snapshot_(backed_by_snapshot) {
68
0
    (void)backed_by_snapshot_;  // to silence unused private field warning
69
0
  }
70
71
0
  virtual ~WriteUnpreparedTxnReadCallback() {
72
    // If it is not backed by snapshot, the caller must check validity
73
0
    assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot);
74
0
  }
75
76
  bool IsVisibleFullCheck(SequenceNumber seq) override;
77
78
0
  inline bool valid() {
79
0
    valid_checked_ = true;
80
0
    return snap_released_ == false;
81
0
  }
82
83
0
  void Refresh(SequenceNumber seq) override {
84
0
    max_visible_seq_ = std::max(max_visible_seq_, seq);
85
0
    wup_snapshot_ = seq;
86
0
  }
87
88
  static SequenceNumber CalcMaxVisibleSeq(
89
      const std::map<SequenceNumber, size_t>& unprep_seqs,
90
0
      SequenceNumber snapshot_seq) {
91
0
    SequenceNumber max_unprepared = 0;
92
0
    if (unprep_seqs.size()) {
93
0
      max_unprepared =
94
0
          unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1;
95
0
    }
96
0
    return std::max(max_unprepared, snapshot_seq);
97
0
  }
98
99
 private:
100
  WritePreparedTxnDB* db_;
101
  const std::map<SequenceNumber, size_t>& unprep_seqs_;
102
  SequenceNumber wup_snapshot_;
103
  // Whether max_visible_seq_ is backed by a snapshot
104
  const SnapshotBackup backed_by_snapshot_;
105
  bool snap_released_ = false;
106
  // Safety check to ensure that the caller has checked invalid statuses
107
  bool valid_checked_ = false;
108
};
109
110
class WriteUnpreparedTxn : public WritePreparedTxn {
111
 public:
112
  WriteUnpreparedTxn(WriteUnpreparedTxnDB* db,
113
                     const WriteOptions& write_options,
114
                     const TransactionOptions& txn_options);
115
116
  virtual ~WriteUnpreparedTxn();
117
118
  using TransactionBaseImpl::Put;
119
  Status Put(ColumnFamilyHandle* column_family, const Slice& key,
120
             const Slice& value, const bool assume_tracked = false) override;
121
  Status Put(ColumnFamilyHandle* column_family, const SliceParts& key,
122
             const SliceParts& value,
123
             const bool assume_tracked = false) override;
124
125
  using TransactionBaseImpl::Merge;
126
  Status Merge(ColumnFamilyHandle* column_family, const Slice& key,
127
               const Slice& value, const bool assume_tracked = false) override;
128
129
  using TransactionBaseImpl::Delete;
130
  Status Delete(ColumnFamilyHandle* column_family, const Slice& key,
131
                const bool assume_tracked = false) override;
132
  Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key,
133
                const bool assume_tracked = false) override;
134
135
  using TransactionBaseImpl::SingleDelete;
136
  Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key,
137
                      const bool assume_tracked = false) override;
138
  Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key,
139
                      const bool assume_tracked = false) override;
140
141
  // In WriteUnprepared, untracked writes will break snapshot validation logic.
142
  // Snapshot validation will only check the largest sequence number of a key to
143
  // see if it was committed or not. However, an untracked unprepared write will
144
  // hide smaller committed sequence numbers.
145
  //
146
  // TODO(lth): Investigate whether it is worth having snapshot validation
147
  // validate all values larger than snap_seq. Otherwise, we should return
148
  // Status::NotSupported for untracked writes.
149
150
  Status RebuildFromWriteBatch(WriteBatch*) override;
151
152
0
  uint64_t GetLastLogNumber() const override { return last_log_number_; }
153
154
0
  void RemoveActiveIterator(Iterator* iter) {
155
0
    active_iterators_.erase(
156
0
        std::remove(active_iterators_.begin(), active_iterators_.end(), iter),
157
0
        active_iterators_.end());
158
0
  }
159
160
 protected:
161
  void Initialize(const TransactionOptions& txn_options) override;
162
163
  Status PrepareInternal() override;
164
165
  Status CommitWithoutPrepareInternal() override;
166
  Status CommitInternal() override;
167
168
  Status RollbackInternal() override;
169
170
  void Clear() override;
171
172
  void SetSavePoint() override;
173
  Status RollbackToSavePoint() override;
174
  Status PopSavePoint() override;
175
176
  // Get and GetIterator needs to be overridden so that a ReadCallback to
177
  // handle read-your-own-write is used.
178
  using Transaction::Get;
179
  Status Get(const ReadOptions& _read_options,
180
             ColumnFamilyHandle* column_family, const Slice& key,
181
             PinnableSlice* value) override;
182
183
  using Transaction::MultiGet;
184
  void MultiGet(const ReadOptions& _read_options,
185
                ColumnFamilyHandle* column_family, const size_t num_keys,
186
                const Slice* keys, PinnableSlice* values, Status* statuses,
187
                const bool sorted_input = false) override;
188
189
  using Transaction::GetIterator;
190
  Iterator* GetIterator(const ReadOptions& options) override;
191
  Iterator* GetIterator(const ReadOptions& options,
192
                        ColumnFamilyHandle* column_family) override;
193
194
  Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key,
195
                          SequenceNumber* tracked_at_seq) override;
196
197
 private:
198
  friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test;
199
  friend class WriteUnpreparedTransactionTest_RecoveryTest_Test;
200
  friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test;
201
  friend class WriteUnpreparedTxnDB;
202
203
  const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers();
204
  using Transaction::GetImpl;
205
  Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family,
206
                 const Slice& key, PinnableSlice* value) override;
207
208
  Status WriteRollbackKeys(const LockTracker& tracked_keys,
209
                           WriteBatchWithIndex* rollback_batch,
210
                           ReadCallback* callback, const ReadOptions& roptions);
211
212
  Status MaybeFlushWriteBatchToDB();
213
  Status FlushWriteBatchToDB(bool prepared);
214
  Status FlushWriteBatchToDBInternal(bool prepared);
215
  Status FlushWriteBatchWithSavePointToDB();
216
  Status RollbackToSavePointInternal();
217
  Status HandleWrite(std::function<Status()> do_write);
218
219
  // For write unprepared, we check on every writebatch append to see if
220
  // write_batch_flush_threshold_ has been exceeded, and then call
221
  // FlushWriteBatchToDB if so. This logic is encapsulated in
222
  // MaybeFlushWriteBatchToDB.
223
  int64_t write_batch_flush_threshold_;
224
  WriteUnpreparedTxnDB* wupt_db_;
225
226
  // Ordered list of unprep_seq sequence numbers that we have already written
227
  // to DB.
228
  //
229
  // This maps unprep_seq => prepare_batch_cnt for each unprepared batch
230
  // written by this transaction.
231
  //
232
  // Note that this contains both prepared and unprepared batches, since they
233
  // are treated similarily in prepare heap/commit map, so it simplifies the
234
  // commit callbacks.
235
  std::map<SequenceNumber, size_t> unprep_seqs_;
236
237
  uint64_t last_log_number_;
238
239
  // Recovered transactions have tracked_keys_ populated, but are not actually
240
  // locked for efficiency reasons. For recovered transactions, skip unlocking
241
  // keys when transaction ends.
242
  bool recovered_txn_;
243
244
  // Track the largest sequence number at which we performed snapshot
245
  // validation. If snapshot validation was skipped because no snapshot was set,
246
  // then this is set to GetLastPublishedSequence. This value is useful because
247
  // it means that for keys that have unprepared seqnos, we can guarantee that
248
  // no committed keys by other transactions can exist between
249
  // largest_validated_seq_ and max_unprep_seq. See
250
  // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is
251
  // necessary for iterator Prev().
252
  //
253
  // Currently this value only increases during the lifetime of a transaction,
254
  // but in some cases, we should be able to restore the previously largest
255
  // value when calling RollbackToSavepoint.
256
  SequenceNumber largest_validated_seq_;
257
258
  struct SavePoint {
259
    // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is
260
    // used during RollbackToSavepoint to determine visibility when restoring
261
    // old values.
262
    //
263
    // TODO(lth): Since all unprep_seqs_ sets further down the stack must be
264
    // subsets, this can potentially be deduplicated by just storing set
265
    // difference. Investigate if this is worth it.
266
    std::map<SequenceNumber, size_t> unprep_seqs_;
267
268
    // This snapshot will be used to read keys at this savepoint if we call
269
    // RollbackToSavePoint.
270
    std::unique_ptr<ManagedSnapshot> snapshot_;
271
272
    SavePoint(const std::map<SequenceNumber, size_t>& seqs,
273
              ManagedSnapshot* snapshot)
274
0
        : unprep_seqs_(seqs), snapshot_(snapshot) {}
275
  };
276
277
  // We have 3 data structures holding savepoint information:
278
  // 1. TransactionBaseImpl::save_points_
279
  // 2. WriteUnpreparedTxn::flushed_save_points_
280
  // 3. WriteUnpreparecTxn::unflushed_save_points_
281
  //
282
  // TransactionBaseImpl::save_points_ holds information about all write
283
  // batches, including the current in-memory write_batch_, or unprepared
284
  // batches that have been written out. Its responsibility is just to track
285
  // which keys have been modified in every savepoint.
286
  //
287
  // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints
288
  // set on unprepared batches that have already flushed. It holds the snapshot
289
  // and unprep_seqs at that savepoint, so that the rollback process can
290
  // determine which keys were visible at that point in time.
291
  //
292
  // WriteUnpreparecTxn::unflushed_save_points_ holds information about
293
  // savepoints on the current in-memory write_batch_. It simply records the
294
  // size of the write batch at every savepoint.
295
  //
296
  // TODO(lth): Remove the redundancy between save_point_boundaries_ and
297
  // write_batch_.save_points_.
298
  //
299
  // Based on this information, here are some invariants:
300
  // size(unflushed_save_points_) = size(write_batch_.save_points_)
301
  // size(flushed_save_points_) + size(unflushed_save_points_)
302
  //   = size(save_points_)
303
  //
304
  std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>>
305
      flushed_save_points_;
306
  std::unique_ptr<autovector<size_t>> unflushed_save_points_;
307
308
  // It is currently unsafe to flush a write batch if there are active iterators
309
  // created from this transaction. This is because we use WriteBatchWithIndex
310
  // to do merging reads from the DB and the write batch. If we flush the write
311
  // batch, it is possible that the delta iterator on the iterator will point to
312
  // invalid memory.
313
  std::vector<Iterator*> active_iterators_;
314
315
  // Untracked keys that we have to rollback.
316
  //
317
  // TODO(lth): Currently we we do not record untracked keys per-savepoint.
318
  // This means that when rolling back to savepoints, we have to check all
319
  // keys in the current transaction for rollback. Note that this is only
320
  // inefficient, but still correct because we take a snapshot at every
321
  // savepoint, and we will use that snapshot to construct the rollback batch.
322
  // The rollback batch will then contain a reissue of the same marker.
323
  //
324
  // A more optimal solution would be to only check keys changed since the
325
  // last savepoint. Also, it may make sense to merge this into tracked_keys_
326
  // and differentiate between tracked but not locked keys to avoid having two
327
  // very similar data structures.
328
  using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>;
329
  KeySet untracked_keys_;
330
};
331
332
}  // namespace ROCKSDB_NAMESPACE