/src/rocksdb/utilities/transactions/write_unprepared_txn_db.cc
Line | Count | Source (jump to first uncovered line) |
1 | | // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
2 | | // This source code is licensed under both the GPLv2 (found in the |
3 | | // COPYING file in the root directory) and Apache 2.0 License |
4 | | // (found in the LICENSE.Apache file in the root directory). |
5 | | |
6 | | |
7 | | #include "utilities/transactions/write_unprepared_txn_db.h" |
8 | | |
9 | | #include "db/arena_wrapped_db_iter.h" |
10 | | #include "rocksdb/utilities/transaction_db.h" |
11 | | #include "util/cast_util.h" |
12 | | |
13 | | namespace ROCKSDB_NAMESPACE { |
14 | | |
15 | | // Instead of reconstructing a Transaction object, and calling rollback on it, |
16 | | // we can be more efficient with RollbackRecoveredTransaction by skipping |
17 | | // unnecessary steps (eg. updating CommitMap, reconstructing keyset) |
18 | | Status WriteUnpreparedTxnDB::RollbackRecoveredTransaction( |
19 | 0 | const DBImpl::RecoveredTransaction* rtxn) { |
20 | | // TODO(lth): Reduce duplicate code with WritePrepared rollback logic. |
21 | 0 | assert(rtxn->unprepared_); |
22 | 0 | auto cf_map_shared_ptr = WritePreparedTxnDB::GetCFHandleMap(); |
23 | 0 | auto cf_comp_map_shared_ptr = WritePreparedTxnDB::GetCFComparatorMap(); |
24 | | // In theory we could write with disableWAL = true during recovery, and |
25 | | // assume that if we crash again during recovery, we can just replay from |
26 | | // the very beginning. Unfortunately, the XIDs from the application may not |
27 | | // necessarily be unique across restarts, potentially leading to situations |
28 | | // like this: |
29 | | // |
30 | | // BEGIN_PREPARE(unprepared) Put(a) END_PREPARE(xid = 1) |
31 | | // -- crash and recover with Put(a) rolled back as it was not prepared |
32 | | // BEGIN_PREPARE(prepared) Put(b) END_PREPARE(xid = 1) |
33 | | // COMMIT(xid = 1) |
34 | | // -- crash and recover with both a, b |
35 | | // |
36 | | // We could just write the rollback marker, but then we would have to extend |
37 | | // MemTableInserter during recovery to actually do writes into the DB |
38 | | // instead of just dropping the in-memory write batch. |
39 | | // |
40 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
41 | 0 | WriteOptions w_options; |
42 | |
|
43 | 0 | class InvalidSnapshotReadCallback : public ReadCallback { |
44 | 0 | public: |
45 | 0 | InvalidSnapshotReadCallback(SequenceNumber snapshot) |
46 | 0 | : ReadCallback(snapshot) {} |
47 | |
|
48 | 0 | inline bool IsVisibleFullCheck(SequenceNumber) override { |
49 | | // The seq provided as snapshot is the seq right before we have locked and |
50 | | // wrote to it, so whatever is there, it is committed. |
51 | 0 | return true; |
52 | 0 | } |
53 | | |
54 | | // Ignore the refresh request since we are confident that our snapshot seq |
55 | | // is not going to be affected by concurrent compactions (not enabled yet.) |
56 | 0 | void Refresh(SequenceNumber) override {} |
57 | 0 | }; |
58 | | |
59 | | // Iterate starting with largest sequence number. |
60 | 0 | for (auto it = rtxn->batches_.rbegin(); it != rtxn->batches_.rend(); ++it) { |
61 | 0 | auto last_visible_txn = it->first - 1; |
62 | 0 | const auto& batch = it->second.batch_; |
63 | 0 | WriteBatch rollback_batch(0 /* reserved_bytes */, 0 /* max_bytes */, |
64 | 0 | w_options.protection_bytes_per_key, |
65 | 0 | 0 /* default_cf_ts_sz */); |
66 | |
|
67 | 0 | struct RollbackWriteBatchBuilder : public WriteBatch::Handler { |
68 | 0 | DBImpl* db_; |
69 | 0 | ReadOptions roptions; |
70 | 0 | InvalidSnapshotReadCallback callback; |
71 | 0 | WriteBatch* rollback_batch_; |
72 | 0 | std::map<uint32_t, const Comparator*>& comparators_; |
73 | 0 | std::map<uint32_t, ColumnFamilyHandle*>& handles_; |
74 | 0 | using CFKeys = std::set<Slice, SetComparator>; |
75 | 0 | std::map<uint32_t, CFKeys> keys_; |
76 | 0 | bool rollback_merge_operands_; |
77 | 0 | RollbackWriteBatchBuilder( |
78 | 0 | DBImpl* db, SequenceNumber snap_seq, WriteBatch* dst_batch, |
79 | 0 | std::map<uint32_t, const Comparator*>& comparators, |
80 | 0 | std::map<uint32_t, ColumnFamilyHandle*>& handles, |
81 | 0 | bool rollback_merge_operands) |
82 | 0 | : db_(db), |
83 | 0 | callback(snap_seq), |
84 | | // disable min_uncommitted optimization |
85 | 0 | rollback_batch_(dst_batch), |
86 | 0 | comparators_(comparators), |
87 | 0 | handles_(handles), |
88 | 0 | rollback_merge_operands_(rollback_merge_operands) {} |
89 | |
|
90 | 0 | Status Rollback(uint32_t cf, const Slice& key) { |
91 | 0 | Status s; |
92 | 0 | CFKeys& cf_keys = keys_[cf]; |
93 | 0 | if (cf_keys.size() == 0) { // just inserted |
94 | 0 | auto cmp = comparators_[cf]; |
95 | 0 | keys_[cf] = CFKeys(SetComparator(cmp)); |
96 | 0 | } |
97 | 0 | auto res = cf_keys.insert(key); |
98 | 0 | if (res.second == |
99 | 0 | false) { // second is false if a element already existed. |
100 | 0 | return s; |
101 | 0 | } |
102 | | |
103 | 0 | PinnableSlice pinnable_val; |
104 | 0 | bool not_used; |
105 | 0 | auto cf_handle = handles_[cf]; |
106 | 0 | DBImpl::GetImplOptions get_impl_options; |
107 | 0 | get_impl_options.column_family = cf_handle; |
108 | 0 | get_impl_options.value = &pinnable_val; |
109 | 0 | get_impl_options.value_found = ¬_used; |
110 | 0 | get_impl_options.callback = &callback; |
111 | 0 | s = db_->GetImpl(roptions, key, get_impl_options); |
112 | 0 | assert(s.ok() || s.IsNotFound()); |
113 | 0 | if (s.ok()) { |
114 | 0 | s = rollback_batch_->Put(cf_handle, key, pinnable_val); |
115 | 0 | assert(s.ok()); |
116 | 0 | } else if (s.IsNotFound()) { |
117 | | // There has been no readable value before txn. By adding a delete we |
118 | | // make sure that there will be none afterwards either. |
119 | 0 | s = rollback_batch_->Delete(cf_handle, key); |
120 | 0 | assert(s.ok()); |
121 | 0 | } else { |
122 | | // Unexpected status. Return it to the user. |
123 | 0 | } |
124 | 0 | return s; |
125 | 0 | } |
126 | |
|
127 | 0 | Status PutCF(uint32_t cf, const Slice& key, |
128 | 0 | const Slice& /*val*/) override { |
129 | 0 | return Rollback(cf, key); |
130 | 0 | } |
131 | |
|
132 | 0 | Status DeleteCF(uint32_t cf, const Slice& key) override { |
133 | 0 | return Rollback(cf, key); |
134 | 0 | } |
135 | |
|
136 | 0 | Status SingleDeleteCF(uint32_t cf, const Slice& key) override { |
137 | 0 | return Rollback(cf, key); |
138 | 0 | } |
139 | |
|
140 | 0 | Status MergeCF(uint32_t cf, const Slice& key, |
141 | 0 | const Slice& /*val*/) override { |
142 | 0 | if (rollback_merge_operands_) { |
143 | 0 | return Rollback(cf, key); |
144 | 0 | } else { |
145 | 0 | return Status::OK(); |
146 | 0 | } |
147 | 0 | } |
148 | | |
149 | | // Recovered batches do not contain 2PC markers. |
150 | 0 | Status MarkNoop(bool) override { return Status::InvalidArgument(); } |
151 | 0 | Status MarkBeginPrepare(bool) override { |
152 | 0 | return Status::InvalidArgument(); |
153 | 0 | } |
154 | 0 | Status MarkEndPrepare(const Slice&) override { |
155 | 0 | return Status::InvalidArgument(); |
156 | 0 | } |
157 | 0 | Status MarkCommit(const Slice&) override { |
158 | 0 | return Status::InvalidArgument(); |
159 | 0 | } |
160 | 0 | Status MarkRollback(const Slice&) override { |
161 | 0 | return Status::InvalidArgument(); |
162 | 0 | } |
163 | 0 | } rollback_handler(db_impl_, last_visible_txn, &rollback_batch, |
164 | 0 | *cf_comp_map_shared_ptr.get(), *cf_map_shared_ptr.get(), |
165 | 0 | txn_db_options_.rollback_merge_operands); |
166 | |
|
167 | 0 | auto s = batch->Iterate(&rollback_handler); |
168 | 0 | if (!s.ok()) { |
169 | 0 | return s; |
170 | 0 | } |
171 | | |
172 | | // The Rollback marker will be used as a batch separator |
173 | 0 | s = WriteBatchInternal::MarkRollback(&rollback_batch, rtxn->name_); |
174 | 0 | if (!s.ok()) { |
175 | 0 | return s; |
176 | 0 | } |
177 | | |
178 | 0 | const uint64_t kNoLogRef = 0; |
179 | 0 | const bool kDisableMemtable = true; |
180 | 0 | const size_t kOneBatch = 1; |
181 | 0 | uint64_t seq_used = kMaxSequenceNumber; |
182 | 0 | s = db_impl_->WriteImpl(w_options, &rollback_batch, nullptr, nullptr, |
183 | 0 | nullptr, kNoLogRef, !kDisableMemtable, &seq_used, |
184 | 0 | kOneBatch); |
185 | 0 | if (!s.ok()) { |
186 | 0 | return s; |
187 | 0 | } |
188 | | |
189 | | // If two_write_queues, we must manually release the sequence number to |
190 | | // readers. |
191 | 0 | if (db_impl_->immutable_db_options().two_write_queues) { |
192 | 0 | db_impl_->SetLastPublishedSequence(seq_used); |
193 | 0 | } |
194 | 0 | } |
195 | | |
196 | 0 | return Status::OK(); |
197 | 0 | } |
198 | | |
199 | | Status WriteUnpreparedTxnDB::Initialize( |
200 | | const std::vector<size_t>& compaction_enabled_cf_indices, |
201 | 0 | const std::vector<ColumnFamilyHandle*>& handles) { |
202 | | // TODO(lth): Reduce code duplication in this function. |
203 | 0 | auto dbimpl = static_cast_with_check<DBImpl>(GetRootDB()); |
204 | 0 | assert(dbimpl != nullptr); |
205 | |
|
206 | 0 | db_impl_->SetSnapshotChecker(new WritePreparedSnapshotChecker(this)); |
207 | | // A callback to commit a single sub-batch |
208 | 0 | class CommitSubBatchPreReleaseCallback : public PreReleaseCallback { |
209 | 0 | public: |
210 | 0 | explicit CommitSubBatchPreReleaseCallback(WritePreparedTxnDB* db) |
211 | 0 | : db_(db) {} |
212 | 0 | Status Callback(SequenceNumber commit_seq, |
213 | 0 | bool is_mem_disabled __attribute__((__unused__)), uint64_t, |
214 | 0 | size_t /*index*/, size_t /*total*/) override { |
215 | 0 | assert(!is_mem_disabled); |
216 | 0 | db_->AddCommitted(commit_seq, commit_seq); |
217 | 0 | return Status::OK(); |
218 | 0 | } |
219 | |
|
220 | 0 | private: |
221 | 0 | WritePreparedTxnDB* db_; |
222 | 0 | }; |
223 | 0 | db_impl_->SetRecoverableStatePreReleaseCallback( |
224 | 0 | new CommitSubBatchPreReleaseCallback(this)); |
225 | | |
226 | | // PessimisticTransactionDB::Initialize |
227 | 0 | for (auto cf_ptr : handles) { |
228 | 0 | AddColumnFamily(cf_ptr); |
229 | 0 | } |
230 | | // Verify cf options |
231 | 0 | for (auto handle : handles) { |
232 | 0 | ColumnFamilyDescriptor cfd; |
233 | 0 | Status s = handle->GetDescriptor(&cfd); |
234 | 0 | if (!s.ok()) { |
235 | 0 | return s; |
236 | 0 | } |
237 | 0 | s = VerifyCFOptions(cfd.options); |
238 | 0 | if (!s.ok()) { |
239 | 0 | return s; |
240 | 0 | } |
241 | 0 | } |
242 | | |
243 | | // Re-enable compaction for the column families that initially had |
244 | | // compaction enabled. |
245 | 0 | std::vector<ColumnFamilyHandle*> compaction_enabled_cf_handles; |
246 | 0 | compaction_enabled_cf_handles.reserve(compaction_enabled_cf_indices.size()); |
247 | 0 | for (auto index : compaction_enabled_cf_indices) { |
248 | 0 | compaction_enabled_cf_handles.push_back(handles[index]); |
249 | 0 | } |
250 | | |
251 | | // create 'real' transactions from recovered shell transactions |
252 | 0 | auto rtxns = dbimpl->recovered_transactions(); |
253 | 0 | std::map<SequenceNumber, SequenceNumber> ordered_seq_cnt; |
254 | 0 | for (const auto& rtxn : rtxns) { |
255 | 0 | auto recovered_trx = rtxn.second; |
256 | 0 | assert(recovered_trx); |
257 | 0 | assert(recovered_trx->batches_.size() >= 1); |
258 | 0 | assert(recovered_trx->name_.length()); |
259 | | |
260 | | // We can only rollback transactions after AdvanceMaxEvictedSeq is called, |
261 | | // but AddPrepared must occur before AdvanceMaxEvictedSeq, which is why |
262 | | // two iterations is required. |
263 | 0 | if (recovered_trx->unprepared_) { |
264 | 0 | continue; |
265 | 0 | } |
266 | | |
267 | | // TODO: plumb Env::IOActivity, Env::IOPriority |
268 | 0 | WriteOptions w_options; |
269 | 0 | w_options.sync = true; |
270 | 0 | TransactionOptions t_options; |
271 | |
|
272 | 0 | auto first_log_number = recovered_trx->batches_.begin()->second.log_number_; |
273 | 0 | auto first_seq = recovered_trx->batches_.begin()->first; |
274 | 0 | auto last_prepare_batch_cnt = |
275 | 0 | recovered_trx->batches_.begin()->second.batch_cnt_; |
276 | |
|
277 | 0 | Transaction* real_trx = BeginTransaction(w_options, t_options, nullptr); |
278 | 0 | assert(real_trx); |
279 | 0 | auto wupt = static_cast_with_check<WriteUnpreparedTxn>(real_trx); |
280 | 0 | wupt->recovered_txn_ = true; |
281 | |
|
282 | 0 | real_trx->SetLogNumber(first_log_number); |
283 | 0 | real_trx->SetId(first_seq); |
284 | 0 | Status s = real_trx->SetName(recovered_trx->name_); |
285 | 0 | if (!s.ok()) { |
286 | 0 | return s; |
287 | 0 | } |
288 | 0 | wupt->prepare_batch_cnt_ = last_prepare_batch_cnt; |
289 | |
|
290 | 0 | for (auto batch : recovered_trx->batches_) { |
291 | 0 | const auto& seq = batch.first; |
292 | 0 | const auto& batch_info = batch.second; |
293 | 0 | auto cnt = batch_info.batch_cnt_ ? batch_info.batch_cnt_ : 1; |
294 | 0 | assert(batch_info.log_number_); |
295 | |
|
296 | 0 | ordered_seq_cnt[seq] = cnt; |
297 | 0 | assert(wupt->unprep_seqs_.count(seq) == 0); |
298 | 0 | wupt->unprep_seqs_[seq] = cnt; |
299 | |
|
300 | 0 | s = wupt->RebuildFromWriteBatch(batch_info.batch_); |
301 | 0 | assert(s.ok()); |
302 | 0 | if (!s.ok()) { |
303 | 0 | return s; |
304 | 0 | } |
305 | 0 | } |
306 | | |
307 | 0 | const bool kClear = true; |
308 | 0 | wupt->InitWriteBatch(kClear); |
309 | |
|
310 | 0 | real_trx->SetState(Transaction::PREPARED); |
311 | 0 | if (!s.ok()) { |
312 | 0 | return s; |
313 | 0 | } |
314 | 0 | } |
315 | | // AddPrepared must be called in order |
316 | 0 | for (auto seq_cnt : ordered_seq_cnt) { |
317 | 0 | auto seq = seq_cnt.first; |
318 | 0 | auto cnt = seq_cnt.second; |
319 | 0 | for (size_t i = 0; i < cnt; i++) { |
320 | 0 | AddPrepared(seq + i); |
321 | 0 | } |
322 | 0 | } |
323 | |
|
324 | 0 | SequenceNumber prev_max = max_evicted_seq_; |
325 | 0 | SequenceNumber last_seq = db_impl_->GetLatestSequenceNumber(); |
326 | 0 | AdvanceMaxEvictedSeq(prev_max, last_seq); |
327 | | // Create a gap between max and the next snapshot. This simplifies the logic |
328 | | // in IsInSnapshot by not having to consider the special case of max == |
329 | | // snapshot after recovery. This is tested in IsInSnapshotEmptyMapTest. |
330 | 0 | if (last_seq) { |
331 | 0 | db_impl_->versions_->SetLastAllocatedSequence(last_seq + 1); |
332 | 0 | db_impl_->versions_->SetLastSequence(last_seq + 1); |
333 | 0 | db_impl_->versions_->SetLastPublishedSequence(last_seq + 1); |
334 | 0 | } |
335 | |
|
336 | 0 | Status s; |
337 | | // Rollback unprepared transactions. |
338 | 0 | for (const auto& rtxn : rtxns) { |
339 | 0 | auto recovered_trx = rtxn.second; |
340 | 0 | if (recovered_trx->unprepared_) { |
341 | 0 | s = RollbackRecoveredTransaction(recovered_trx); |
342 | 0 | if (!s.ok()) { |
343 | 0 | return s; |
344 | 0 | } |
345 | 0 | continue; |
346 | 0 | } |
347 | 0 | } |
348 | | |
349 | 0 | if (s.ok()) { |
350 | 0 | dbimpl->DeleteAllRecoveredTransactions(); |
351 | | |
352 | | // Compaction should start only after max_evicted_seq_ is set AND recovered |
353 | | // transactions are either added to PrepareHeap or rolled back. |
354 | 0 | s = EnableAutoCompaction(compaction_enabled_cf_handles); |
355 | 0 | } |
356 | |
|
357 | 0 | return s; |
358 | 0 | } |
359 | | |
360 | | Transaction* WriteUnpreparedTxnDB::BeginTransaction( |
361 | | const WriteOptions& write_options, const TransactionOptions& txn_options, |
362 | 0 | Transaction* old_txn) { |
363 | 0 | if (old_txn != nullptr) { |
364 | 0 | ReinitializeTransaction(old_txn, write_options, txn_options); |
365 | 0 | return old_txn; |
366 | 0 | } else { |
367 | 0 | return new WriteUnpreparedTxn(this, write_options, txn_options); |
368 | 0 | } |
369 | 0 | } |
370 | | |
371 | | // Struct to hold ownership of snapshot and read callback for iterator cleanup. |
372 | | struct WriteUnpreparedTxnDB::IteratorState { |
373 | | IteratorState(WritePreparedTxnDB* txn_db, SequenceNumber sequence, |
374 | | std::shared_ptr<ManagedSnapshot> s, |
375 | | SequenceNumber min_uncommitted, WriteUnpreparedTxn* txn) |
376 | | : callback(txn_db, sequence, min_uncommitted, txn->unprep_seqs_, |
377 | | kBackedByDBSnapshot), |
378 | 0 | snapshot(s) {} |
379 | 0 | SequenceNumber MaxVisibleSeq() { return callback.max_visible_seq(); } |
380 | | |
381 | | WriteUnpreparedTxnReadCallback callback; |
382 | | std::shared_ptr<ManagedSnapshot> snapshot; |
383 | | }; |
384 | | |
385 | | namespace { |
386 | 0 | static void CleanupWriteUnpreparedTxnDBIterator(void* arg1, void* /*arg2*/) { |
387 | 0 | delete static_cast<WriteUnpreparedTxnDB::IteratorState*>(arg1); |
388 | 0 | } |
389 | | } // anonymous namespace |
390 | | |
391 | | Iterator* WriteUnpreparedTxnDB::NewIterator(const ReadOptions& _read_options, |
392 | | ColumnFamilyHandle* column_family, |
393 | 0 | WriteUnpreparedTxn* txn) { |
394 | 0 | if (_read_options.io_activity != Env::IOActivity::kUnknown && |
395 | 0 | _read_options.io_activity != Env::IOActivity::kDBIterator) { |
396 | 0 | return NewErrorIterator(Status::InvalidArgument( |
397 | 0 | "Can only call NewIterator with `ReadOptions::io_activity` is " |
398 | 0 | "`Env::IOActivity::kUnknown` or `Env::IOActivity::kDBIterator`")); |
399 | 0 | } |
400 | | |
401 | 0 | ReadOptions read_options(_read_options); |
402 | 0 | if (read_options.io_activity == Env::IOActivity::kUnknown) { |
403 | 0 | read_options.io_activity = Env::IOActivity::kDBIterator; |
404 | 0 | } |
405 | | // TODO(lth): Refactor so that this logic is shared with WritePrepared. |
406 | 0 | constexpr bool expose_blob_index = false; |
407 | 0 | constexpr bool allow_refresh = false; |
408 | 0 | std::shared_ptr<ManagedSnapshot> own_snapshot = nullptr; |
409 | 0 | SequenceNumber snapshot_seq = kMaxSequenceNumber; |
410 | 0 | SequenceNumber min_uncommitted = 0; |
411 | | |
412 | | // Currently, the Prev() iterator logic does not work well without snapshot |
413 | | // validation. The logic simply iterates through values of a key in |
414 | | // ascending seqno order, stopping at the first non-visible value and |
415 | | // returning the last visible value. |
416 | | // |
417 | | // For example, if snapshot sequence is 3, and we have the following keys: |
418 | | // foo: v1 1 |
419 | | // foo: v2 2 |
420 | | // foo: v3 3 |
421 | | // foo: v4 4 |
422 | | // foo: v5 5 |
423 | | // |
424 | | // Then 1, 2, 3 will be visible, but 4 will be non-visible, so we return v3, |
425 | | // which is the last visible value. |
426 | | // |
427 | | // For unprepared transactions, if we have snap_seq = 3, but the current |
428 | | // transaction has unprep_seq 5, then returning the first non-visible value |
429 | | // would be incorrect, as we should return v5, and not v3. The problem is that |
430 | | // there are committed values at snapshot_seq < commit_seq < unprep_seq. |
431 | | // |
432 | | // Snapshot validation can prevent this problem by ensuring that no committed |
433 | | // values exist at snapshot_seq < commit_seq, and thus any value with a |
434 | | // sequence number greater than snapshot_seq must be unprepared values. For |
435 | | // example, if the transaction had a snapshot at 3, then snapshot validation |
436 | | // would be performed during the Put(v5) call. It would find v4, and the Put |
437 | | // would fail with snapshot validation failure. |
438 | | // |
439 | | // TODO(lth): Improve Prev() logic to continue iterating until |
440 | | // max_visible_seq, and then return the last visible value, so that this |
441 | | // restriction can be lifted. |
442 | 0 | const Snapshot* snapshot = nullptr; |
443 | 0 | if (read_options.snapshot == nullptr) { |
444 | 0 | snapshot = GetSnapshot(); |
445 | 0 | own_snapshot = std::make_shared<ManagedSnapshot>(db_impl_, snapshot); |
446 | 0 | } else { |
447 | 0 | snapshot = read_options.snapshot; |
448 | 0 | } |
449 | |
|
450 | 0 | snapshot_seq = snapshot->GetSequenceNumber(); |
451 | 0 | assert(snapshot_seq != kMaxSequenceNumber); |
452 | | // Iteration is safe as long as largest_validated_seq <= snapshot_seq. We are |
453 | | // guaranteed that for keys that were modified by this transaction (and thus |
454 | | // might have unprepared values), no committed values exist at |
455 | | // largest_validated_seq < commit_seq (or the contrapositive: any committed |
456 | | // value must exist at commit_seq <= largest_validated_seq). This implies |
457 | | // that commit_seq <= largest_validated_seq <= snapshot_seq or commit_seq <= |
458 | | // snapshot_seq. As explained above, the problem with Prev() only happens when |
459 | | // snapshot_seq < commit_seq. |
460 | | // |
461 | | // For keys that were not modified by this transaction, largest_validated_seq_ |
462 | | // is meaningless, and Prev() should just work with the existing visibility |
463 | | // logic. |
464 | 0 | if (txn->largest_validated_seq_ > snapshot->GetSequenceNumber() && |
465 | 0 | !txn->unprep_seqs_.empty()) { |
466 | 0 | ROCKS_LOG_ERROR(info_log_, |
467 | 0 | "WriteUnprepared iterator creation failed since the " |
468 | 0 | "transaction has performed unvalidated writes"); |
469 | 0 | return nullptr; |
470 | 0 | } |
471 | 0 | min_uncommitted = |
472 | 0 | static_cast_with_check<const SnapshotImpl>(snapshot)->min_uncommitted_; |
473 | |
|
474 | 0 | auto* cfh = static_cast_with_check<ColumnFamilyHandleImpl>(column_family); |
475 | 0 | auto* cfd = cfh->cfd(); |
476 | 0 | auto* state = |
477 | 0 | new IteratorState(this, snapshot_seq, own_snapshot, min_uncommitted, txn); |
478 | 0 | SuperVersion* super_version = cfd->GetReferencedSuperVersion(db_impl_); |
479 | 0 | auto* db_iter = db_impl_->NewIteratorImpl( |
480 | 0 | read_options, cfh, super_version, state->MaxVisibleSeq(), |
481 | 0 | &state->callback, expose_blob_index, allow_refresh); |
482 | 0 | db_iter->RegisterCleanup(CleanupWriteUnpreparedTxnDBIterator, state, nullptr); |
483 | 0 | return db_iter; |
484 | 0 | } |
485 | | |
486 | | } // namespace ROCKSDB_NAMESPACE |