/src/rocksdb/utilities/transactions/write_unprepared_txn.h
Line | Count | Source |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | #pragma once |
7 | | |
8 | | #include <set> |
9 | | |
10 | | #include "utilities/transactions/write_prepared_txn.h" |
11 | | #include "utilities/transactions/write_unprepared_txn_db.h" |
12 | | |
13 | | namespace ROCKSDB_NAMESPACE { |
14 | | |
15 | | class WriteUnpreparedTxnDB; |
16 | | class WriteUnpreparedTxn; |
17 | | |
18 | | // WriteUnprepared transactions needs to be able to read their own uncommitted |
19 | | // writes, and supporting this requires some careful consideration. Because |
20 | | // writes in the current transaction may be flushed to DB already, we cannot |
21 | | // rely on the contents of WriteBatchWithIndex to determine whether a key should |
22 | | // be visible or not, so we have to remember to check the DB for any uncommitted |
23 | | // keys that should be visible to us. First, we will need to change the seek to |
24 | | // snapshot logic, to seek to max_visible_seq = max(snap_seq, max_unprep_seq). |
25 | | // Any key greater than max_visible_seq should not be visible because they |
26 | | // cannot be unprepared by the current transaction and they are not in its |
27 | | // snapshot. |
28 | | // |
29 | | // When we seek to max_visible_seq, one of these cases will happen: |
30 | | // 1. We hit a unprepared key from the current transaction. |
31 | | // 2. We hit a unprepared key from the another transaction. |
32 | | // 3. We hit a committed key with snap_seq < seq < max_unprep_seq. |
33 | | // 4. We hit a committed key with seq <= snap_seq. |
34 | | // |
35 | | // IsVisibleFullCheck handles all cases correctly. |
36 | | // |
37 | | // Other notes: |
38 | | // Note that max_visible_seq is only calculated once at iterator construction |
39 | | // time, meaning if the same transaction is adding more unprep seqs through |
40 | | // writes during iteration, these newer writes may not be visible. This is not a |
41 | | // problem for MySQL though because it avoids modifying the index as it is |
42 | | // scanning through it to avoid the Halloween Problem. Instead, it scans the |
43 | | // index once up front, and modifies based on a temporary copy. |
44 | | // |
45 | | // In DBIter, there is a "reseek" optimization if the iterator skips over too |
46 | | // many keys. However, this assumes that the reseek seeks exactly to the |
47 | | // required key. In write unprepared, even after seeking directly to |
48 | | // max_visible_seq, some iteration may be required before hitting a visible key, |
49 | | // and special precautions must be taken to avoid performing another reseek, |
50 | | // leading to an infinite loop. |
51 | | // |
52 | | class WriteUnpreparedTxnReadCallback : public ReadCallback { |
53 | | public: |
54 | | WriteUnpreparedTxnReadCallback( |
55 | | WritePreparedTxnDB* db, SequenceNumber snapshot, |
56 | | SequenceNumber min_uncommitted, |
57 | | const std::map<SequenceNumber, size_t>& unprep_seqs, |
58 | | SnapshotBackup backed_by_snapshot) |
59 | | // Pass our last uncommitted seq as the snapshot to the parent class to |
60 | | // ensure that the parent will not prematurely filter out own writes. We |
61 | | // will do the exact comparison against snapshots in IsVisibleFullCheck |
62 | | // override. |
63 | 0 | : ReadCallback(CalcMaxVisibleSeq(unprep_seqs, snapshot), min_uncommitted), |
64 | 0 | db_(db), |
65 | 0 | unprep_seqs_(unprep_seqs), |
66 | 0 | wup_snapshot_(snapshot), |
67 | 0 | backed_by_snapshot_(backed_by_snapshot) { |
68 | 0 | (void)backed_by_snapshot_; // to silence unused private field warning |
69 | 0 | } |
70 | | |
71 | 0 | virtual ~WriteUnpreparedTxnReadCallback() { |
72 | | // If it is not backed by snapshot, the caller must check validity |
73 | 0 | assert(valid_checked_ || backed_by_snapshot_ == kBackedByDBSnapshot); |
74 | 0 | } |
75 | | |
76 | | bool IsVisibleFullCheck(SequenceNumber seq) override; |
77 | | |
78 | 0 | inline bool valid() { |
79 | 0 | valid_checked_ = true; |
80 | 0 | return snap_released_ == false; |
81 | 0 | } |
82 | | |
83 | 0 | void Refresh(SequenceNumber seq) override { |
84 | 0 | max_visible_seq_ = std::max(max_visible_seq_, seq); |
85 | 0 | wup_snapshot_ = seq; |
86 | 0 | } |
87 | | |
88 | | static SequenceNumber CalcMaxVisibleSeq( |
89 | | const std::map<SequenceNumber, size_t>& unprep_seqs, |
90 | 0 | SequenceNumber snapshot_seq) { |
91 | 0 | SequenceNumber max_unprepared = 0; |
92 | 0 | if (unprep_seqs.size()) { |
93 | 0 | max_unprepared = |
94 | 0 | unprep_seqs.rbegin()->first + unprep_seqs.rbegin()->second - 1; |
95 | 0 | } |
96 | 0 | return std::max(max_unprepared, snapshot_seq); |
97 | 0 | } |
98 | | |
99 | | private: |
100 | | WritePreparedTxnDB* db_; |
101 | | const std::map<SequenceNumber, size_t>& unprep_seqs_; |
102 | | SequenceNumber wup_snapshot_; |
103 | | // Whether max_visible_seq_ is backed by a snapshot |
104 | | const SnapshotBackup backed_by_snapshot_; |
105 | | bool snap_released_ = false; |
106 | | // Safety check to ensure that the caller has checked invalid statuses |
107 | | bool valid_checked_ = false; |
108 | | }; |
109 | | |
110 | | class WriteUnpreparedTxn : public WritePreparedTxn { |
111 | | public: |
112 | | WriteUnpreparedTxn(WriteUnpreparedTxnDB* db, |
113 | | const WriteOptions& write_options, |
114 | | const TransactionOptions& txn_options); |
115 | | |
116 | | virtual ~WriteUnpreparedTxn(); |
117 | | |
118 | | using TransactionBaseImpl::Put; |
119 | | Status Put(ColumnFamilyHandle* column_family, const Slice& key, |
120 | | const Slice& value, const bool assume_tracked = false) override; |
121 | | Status Put(ColumnFamilyHandle* column_family, const SliceParts& key, |
122 | | const SliceParts& value, |
123 | | const bool assume_tracked = false) override; |
124 | | |
125 | | using TransactionBaseImpl::Merge; |
126 | | Status Merge(ColumnFamilyHandle* column_family, const Slice& key, |
127 | | const Slice& value, const bool assume_tracked = false) override; |
128 | | |
129 | | using TransactionBaseImpl::Delete; |
130 | | Status Delete(ColumnFamilyHandle* column_family, const Slice& key, |
131 | | const bool assume_tracked = false) override; |
132 | | Status Delete(ColumnFamilyHandle* column_family, const SliceParts& key, |
133 | | const bool assume_tracked = false) override; |
134 | | |
135 | | using TransactionBaseImpl::SingleDelete; |
136 | | Status SingleDelete(ColumnFamilyHandle* column_family, const Slice& key, |
137 | | const bool assume_tracked = false) override; |
138 | | Status SingleDelete(ColumnFamilyHandle* column_family, const SliceParts& key, |
139 | | const bool assume_tracked = false) override; |
140 | | |
141 | | // In WriteUnprepared, untracked writes will break snapshot validation logic. |
142 | | // Snapshot validation will only check the largest sequence number of a key to |
143 | | // see if it was committed or not. However, an untracked unprepared write will |
144 | | // hide smaller committed sequence numbers. |
145 | | // |
146 | | // TODO(lth): Investigate whether it is worth having snapshot validation |
147 | | // validate all values larger than snap_seq. Otherwise, we should return |
148 | | // Status::NotSupported for untracked writes. |
149 | | |
150 | | Status RebuildFromWriteBatch(WriteBatch*) override; |
151 | | |
152 | 0 | uint64_t GetLastLogNumber() const override { return last_log_number_; } |
153 | | |
154 | 0 | void RemoveActiveIterator(Iterator* iter) { |
155 | 0 | active_iterators_.erase( |
156 | 0 | std::remove(active_iterators_.begin(), active_iterators_.end(), iter), |
157 | 0 | active_iterators_.end()); |
158 | 0 | } |
159 | | |
160 | | protected: |
161 | | void Initialize(const TransactionOptions& txn_options) override; |
162 | | |
163 | | Status PrepareInternal() override; |
164 | | |
165 | | Status CommitWithoutPrepareInternal() override; |
166 | | Status CommitInternal() override; |
167 | | |
168 | | Status RollbackInternal() override; |
169 | | |
170 | | void Clear() override; |
171 | | |
172 | | void SetSavePoint() override; |
173 | | Status RollbackToSavePoint() override; |
174 | | Status PopSavePoint() override; |
175 | | |
176 | | // Get and GetIterator needs to be overridden so that a ReadCallback to |
177 | | // handle read-your-own-write is used. |
178 | | using Transaction::Get; |
179 | | Status Get(const ReadOptions& _read_options, |
180 | | ColumnFamilyHandle* column_family, const Slice& key, |
181 | | PinnableSlice* value) override; |
182 | | |
183 | | using Transaction::MultiGet; |
184 | | void MultiGet(const ReadOptions& _read_options, |
185 | | ColumnFamilyHandle* column_family, const size_t num_keys, |
186 | | const Slice* keys, PinnableSlice* values, Status* statuses, |
187 | | const bool sorted_input = false) override; |
188 | | |
189 | | using Transaction::GetIterator; |
190 | | Iterator* GetIterator(const ReadOptions& options) override; |
191 | | Iterator* GetIterator(const ReadOptions& options, |
192 | | ColumnFamilyHandle* column_family) override; |
193 | | |
194 | | Status ValidateSnapshot(ColumnFamilyHandle* column_family, const Slice& key, |
195 | | SequenceNumber* tracked_at_seq) override; |
196 | | |
197 | | private: |
198 | | friend class WriteUnpreparedTransactionTest_ReadYourOwnWrite_Test; |
199 | | friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; |
200 | | friend class WriteUnpreparedTransactionTest_UnpreparedBatch_Test; |
201 | | friend class WriteUnpreparedTxnDB; |
202 | | |
203 | | const std::map<SequenceNumber, size_t>& GetUnpreparedSequenceNumbers(); |
204 | | using Transaction::GetImpl; |
205 | | Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, |
206 | | const Slice& key, PinnableSlice* value) override; |
207 | | |
208 | | Status WriteRollbackKeys(const LockTracker& tracked_keys, |
209 | | WriteBatchWithIndex* rollback_batch, |
210 | | ReadCallback* callback, const ReadOptions& roptions); |
211 | | |
212 | | Status MaybeFlushWriteBatchToDB(); |
213 | | Status FlushWriteBatchToDB(bool prepared); |
214 | | Status FlushWriteBatchToDBInternal(bool prepared); |
215 | | Status FlushWriteBatchWithSavePointToDB(); |
216 | | Status RollbackToSavePointInternal(); |
217 | | Status HandleWrite(std::function<Status()> do_write); |
218 | | |
219 | | // For write unprepared, we check on every writebatch append to see if |
220 | | // write_batch_flush_threshold_ has been exceeded, and then call |
221 | | // FlushWriteBatchToDB if so. This logic is encapsulated in |
222 | | // MaybeFlushWriteBatchToDB. |
223 | | int64_t write_batch_flush_threshold_; |
224 | | WriteUnpreparedTxnDB* wupt_db_; |
225 | | |
226 | | // Ordered list of unprep_seq sequence numbers that we have already written |
227 | | // to DB. |
228 | | // |
229 | | // This maps unprep_seq => prepare_batch_cnt for each unprepared batch |
230 | | // written by this transaction. |
231 | | // |
232 | | // Note that this contains both prepared and unprepared batches, since they |
233 | | // are treated similarily in prepare heap/commit map, so it simplifies the |
234 | | // commit callbacks. |
235 | | std::map<SequenceNumber, size_t> unprep_seqs_; |
236 | | |
237 | | uint64_t last_log_number_; |
238 | | |
239 | | // Recovered transactions have tracked_keys_ populated, but are not actually |
240 | | // locked for efficiency reasons. For recovered transactions, skip unlocking |
241 | | // keys when transaction ends. |
242 | | bool recovered_txn_; |
243 | | |
244 | | // Track the largest sequence number at which we performed snapshot |
245 | | // validation. If snapshot validation was skipped because no snapshot was set, |
246 | | // then this is set to GetLastPublishedSequence. This value is useful because |
247 | | // it means that for keys that have unprepared seqnos, we can guarantee that |
248 | | // no committed keys by other transactions can exist between |
249 | | // largest_validated_seq_ and max_unprep_seq. See |
250 | | // WriteUnpreparedTxnDB::NewIterator for an explanation for why this is |
251 | | // necessary for iterator Prev(). |
252 | | // |
253 | | // Currently this value only increases during the lifetime of a transaction, |
254 | | // but in some cases, we should be able to restore the previously largest |
255 | | // value when calling RollbackToSavepoint. |
256 | | SequenceNumber largest_validated_seq_; |
257 | | |
258 | | struct SavePoint { |
259 | | // Record of unprep_seqs_ at this savepoint. The set of unprep_seq is |
260 | | // used during RollbackToSavepoint to determine visibility when restoring |
261 | | // old values. |
262 | | // |
263 | | // TODO(lth): Since all unprep_seqs_ sets further down the stack must be |
264 | | // subsets, this can potentially be deduplicated by just storing set |
265 | | // difference. Investigate if this is worth it. |
266 | | std::map<SequenceNumber, size_t> unprep_seqs_; |
267 | | |
268 | | // This snapshot will be used to read keys at this savepoint if we call |
269 | | // RollbackToSavePoint. |
270 | | std::unique_ptr<ManagedSnapshot> snapshot_; |
271 | | |
272 | | SavePoint(const std::map<SequenceNumber, size_t>& seqs, |
273 | | ManagedSnapshot* snapshot) |
274 | 0 | : unprep_seqs_(seqs), snapshot_(snapshot) {} |
275 | | }; |
276 | | |
277 | | // We have 3 data structures holding savepoint information: |
278 | | // 1. TransactionBaseImpl::save_points_ |
279 | | // 2. WriteUnpreparedTxn::flushed_save_points_ |
280 | | // 3. WriteUnpreparecTxn::unflushed_save_points_ |
281 | | // |
282 | | // TransactionBaseImpl::save_points_ holds information about all write |
283 | | // batches, including the current in-memory write_batch_, or unprepared |
284 | | // batches that have been written out. Its responsibility is just to track |
285 | | // which keys have been modified in every savepoint. |
286 | | // |
287 | | // WriteUnpreparedTxn::flushed_save_points_ holds information about savepoints |
288 | | // set on unprepared batches that have already flushed. It holds the snapshot |
289 | | // and unprep_seqs at that savepoint, so that the rollback process can |
290 | | // determine which keys were visible at that point in time. |
291 | | // |
292 | | // WriteUnpreparecTxn::unflushed_save_points_ holds information about |
293 | | // savepoints on the current in-memory write_batch_. It simply records the |
294 | | // size of the write batch at every savepoint. |
295 | | // |
296 | | // TODO(lth): Remove the redundancy between save_point_boundaries_ and |
297 | | // write_batch_.save_points_. |
298 | | // |
299 | | // Based on this information, here are some invariants: |
300 | | // size(unflushed_save_points_) = size(write_batch_.save_points_) |
301 | | // size(flushed_save_points_) + size(unflushed_save_points_) |
302 | | // = size(save_points_) |
303 | | // |
304 | | std::unique_ptr<autovector<WriteUnpreparedTxn::SavePoint>> |
305 | | flushed_save_points_; |
306 | | std::unique_ptr<autovector<size_t>> unflushed_save_points_; |
307 | | |
308 | | // It is currently unsafe to flush a write batch if there are active iterators |
309 | | // created from this transaction. This is because we use WriteBatchWithIndex |
310 | | // to do merging reads from the DB and the write batch. If we flush the write |
311 | | // batch, it is possible that the delta iterator on the iterator will point to |
312 | | // invalid memory. |
313 | | std::vector<Iterator*> active_iterators_; |
314 | | |
315 | | // Untracked keys that we have to rollback. |
316 | | // |
317 | | // TODO(lth): Currently we we do not record untracked keys per-savepoint. |
318 | | // This means that when rolling back to savepoints, we have to check all |
319 | | // keys in the current transaction for rollback. Note that this is only |
320 | | // inefficient, but still correct because we take a snapshot at every |
321 | | // savepoint, and we will use that snapshot to construct the rollback batch. |
322 | | // The rollback batch will then contain a reissue of the same marker. |
323 | | // |
324 | | // A more optimal solution would be to only check keys changed since the |
325 | | // last savepoint. Also, it may make sense to merge this into tracked_keys_ |
326 | | // and differentiate between tracked but not locked keys to avoid having two |
327 | | // very similar data structures. |
328 | | using KeySet = std::unordered_map<uint32_t, std::vector<std::string>>; |
329 | | KeySet untracked_keys_; |
330 | | }; |
331 | | |
332 | | } // namespace ROCKSDB_NAMESPACE |